pabis.eu

Testing AWS Lambda Infinite Loop Protection

21 July 2023

Not so long ago AWS announced that Lambda functions will be stopped if they run infinitely or excessively in a loop. A common pattern for that case is when Lambda is executed from SQS queue or SNS topic and submits a message to the same queue or topic. There are also more complex paths for this to happen, such as several Lambda functions one after another. Today, we will test this protection.

Read more about this here: https://docs.aws.amazon.com/lambda/latest/dg/invocation-recursion.html

Repository for this post: https://github.com/ppabis/lambda-infinite-loop

Problematic function

Let's define the simples function that will be called recursively. It will be triggered by SQS event and will send a message to the same queue. The message will contain a counter that will be incremented with each process. To be on the safe side, we will stop sending new messages once the counter reaches 1000. I will implement my function in Python.

import boto3, os, json
QUEUE_URL = os.environ['QUEUE_URL']

# Extract counter from the message
def processMsg(msg):
  body = json.loads(msg['body'])
  if 'counter' in body:
    return body['counter']
  return 0

def handler(event, context):
  counter = 0
  # In case there are multiple messages, select the largest counter
  for msg in event['Records']:
    counterId = processMsg(msg)
    if counterId > counter:
      counter = counterId

  if counter >= 1000:
    print(f"Counter is {counter}, exiting")
    return { 'statusCode': 200, 'body': f"Counter reached {counter}" }

  counter += 1
  print("Sending count {counter}")
  boto3.client('sqs').send_message(QueueUrl=QUEUE_URL, MessageBody=json.dumps({'counter': counter}))
  return { 'statusCode': 200, 'body': f"New counter sent: {counter}" }

Preparing Infrastructure

We will define SQS queue and a Lambda function. Lambda will have permissions to send message to the queue and its role needs to read messages from this queue (for the SQS trigger integration). We will also attach permissions to write logs to CloudWatch. That way we will see what is current status of our system.

# Role for our Lambda function
resource "aws_iam_role" "lambda-role" {
  name = "RecursiveLambdaRole"
  assume_role_policy = jsonencode({
      Version = "2012-10-17"
      Statement = [ {
          Action = "sts:AssumeRole"
          Effect = "Allow"
          Principal = { Service = "lambda.amazonaws.com" }
        } ]
    })
}

# Permissions for our Lambda function to access SQS
resource "aws_iam_policy" "sqs" {
  name = "AllowSQSBasicActions"
  policy = data.aws_iam_policy_document.sqs.json
}

data "aws_iam_policy_document" "sqs" {
  statement {
    effect = "Allow"
    resources = [ aws_sqs_queue.queue.arn ]
    actions = [
      "sqs:SendMessage",
      "sqs:ReceiveMessage",
      "sqs:DeleteMessage",
      "sqs:GetQueueAttributes"
    ]
  }
}

# Attach the policies to the role
resource "aws_iam_role_policy_attachment" "lambda-sqs" {
  role = aws_iam_role.lambda-role.name
  policy_arn = aws_iam_policy.sqs.arn
}

# Attach also CloudWatch Logs policy
resource "aws_iam_role_policy_attachment" "basic-execution-role" {
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
  role = aws_iam_role.lambda-role.name
}

Next we will create an SQS queue. We don't need to attach any policies to it, as we already have it via IAM, as this is a requirement of Lambda event source mapping for SQS.

resource "aws_sqs_queue" "queue" {
  name = "recursive-queue"
}

Finally, we will create a Lambda function. We will use Python 3.8 runtime and add required parameters to it such as role, event source and environment vars. We will also package our function with hashicorp/archive provider.

data "archive_file" "recursive-lambda" {
  type        = "zip"
  source_dir  = "recursive-lambda"
  output_path = "recursive-lambda.zip"
}

resource "aws_lambda_function" "recursive-function" {
  function_name    = "recursive-function"
  role             = aws_iam_role.lambda-role.arn # Attach role
  handler          = "main.handler"               # File: main.py, function: handler
  runtime          = "python3.8"
  filename         = data.archive_file.recursive-lambda.output_path
  source_code_hash = data.archive_file.recursive-lambda.output_base64sha256
  environment { variables = { QUEUE_URL = aws_sqs_queue.queue.id } }
}

# Create source mapping with SQS
resource "aws_lambda_event_source_mapping" "recursive-function" {
  event_source_arn = aws_sqs_queue.queue.arn
  function_name    = aws_lambda_function.recursive-function.function_name
  batch_size       = 1
}

Testing the function

Let's trigger the system using AWS CLI by sending a message to the queue.

$ aws sqs send-message\
 --queue-url=https://sqs.eu-central-1.amazonaws.com/999902222222/recursive-queue\
 --message-body='{"counter": 5}'

In our CloudWatch Logs we should get some output about the state of the counter for each invocation. In my case I received two separate streams because Lambda spun up two concurrent instances.

Lambda Stream 1

Lambda Stream 2

The stream stopped after 21 was reached. I initially sent the message with counter = 5, so it means that Lambda was executed 16 times. This is also confirmed by the graph of invocation count.

Lambda invocation count

But all the invocations were successful, we have zero errors.

All successful

The mechanism seems to work as expected but where to find confirmation? Just below errors and successes graph, we have a graph called "Recursive invocations dropped" in Lambda monitoring.

Dropped invocations

After some time we can also get an e-mail with a warning about the stopped recursion.

Recursion email

Recovering rejected messages

Dropped messages should be sent to dead-letter queue according to the documentation. It has to be on the SQS side. I added another queue with policies to be used as a DLQ for the main queue. Also changed the default retention time of the message to 2 minutes on the main queue.

resource "aws_sqs_queue" "queue" {
  name = "recursive-queue"
  # Edit the main queue to retain message at most for 2 minutes
  message_retention_seconds = 120
}

resource "aws_sqs_queue" "dead-letter-queue" {
  name = "recursive-dead-letter-queue"
  redrive_allow_policy = jsonencode({
    redrivePermission = "byQueue",
    sourceQueueArns   = [aws_sqs_queue.queue.arn]
  })
}

resource "aws_sqs_queue_redrive_policy" "redrive-policy" {
  queue_url = aws_sqs_queue.queue.id
  redrive_policy = jsonencode({
    deadLetterTargetArn = aws_sqs_queue.dead-letter-queue.arn
    maxReceiveCount     = 1 # Redrive immediately
  })
}

Let's try sending a message again.

$ aws sqs send-message\
 --queue-url=https://sqs.eu-central-1.amazonaws.com/999902222222/recursive-queue\
 --message-body='{"counter": 5}'
$ sleep 120 # Wait for 2 minutes
$ aws sqs receive-message\
 --queue-url https://sqs.eu-central-1.amazonaws.com/999902222222/recursive-dead-letter-queue
{
    "Messages": [
        {
            "MessageId": "1d2032f0-be0d-4e75-adef-b56ecbb05a86",
            "ReceiptHandle": "...",
            "MD5OfBody": "b70a2eb9cfa2b0c579154ee76da44a97",
            "Body": "{\"counter\": 26}"
        }
    ]
}

We successfully received the message from the dead-letter queue with the last counter. What would happen then if we used DLQ as another source for Lambda?

Connecting DLQ to Lambda as well

I added DLQ as another source for Lambda. I also changed the IAM permissions so that Lambda can read from the new queue.

resource "aws_lambda_event_source_mapping" "recursive-function-dlq" {
  event_source_arn = aws_sqs_queue.dead-letter-queue.arn
  function_name    = aws_lambda_function.recursive-function.function_name
  batch_size       = 1
}

However, the function stopped the same way as before. The message that landed on the DLQ was tried to be pulled by Lambda and it is stuck in Messages in flight.

Message is stuck in DLQ

Disabling protection

So far, the only mention of disabling this protection or even changing the settings comes from AWS Blog post at the very bottom. It says that we can disable this protection by contacting AWS Support (which is always a wrong answer on any AWS Certification exam).