Scalable webhook listeners in AWS

Using the API Gateway - SQS integration to create scalable webhook listeners and validate the requests at the API Gateway level

Traditionally, when we think about loose coupling and serverless in AWS, it's an API Gateway endpoint triggering a Lambda function that sends messages into SQS.

But API Gateway directly integrates with a handful of other AWS APIs, and SQS is one of them.

We can have an HTTP endpoint send messages directly to SQS without the need for a Lambda function, and we can even perform request validation at the API Gateway level.

This serverless pattern is extremely handy when creating 3rd party integrations for our services. In this article, I will explain how it works and provide a Cloudformation template you can easily adapt to your use case and deploy with one click.

Asynchronous processing

Processing incoming webhook data in real time can be challenging, especially if our service needs to perform additional processing or trigger downstream actions.

In these situations, the producer only needs to know if the data was successfully received, the processing can happen in the background, asynchronously, so we return a successful response and send the request in a queue for later processing.

We often know the schema in advance so we can also validate the request before sending the message to SQS. The backend worker service will consume messages from the queue at its own pace, one by one or in batches, and do the hard lifting.

If the order of processing is important we can use an SQS FIFO queue to process the messages in the exact order they were received. We can also bring an SQS Dead Letter Queue into the mix to collect the processing failures for further investigation or retry.

I used a similar architecture for a service that integrates with Mailgun for marketing email sending and reporting. Each time an action is performed by one of the email recipients (open, click, bounce, unsubscribe) Mailgun triggers the service HTTP endpoint and sends an event notification, the notification is stored in SQS, then a Lambda consumes these events, and creates campaign reports and other actions. It handles millions of events per week.

Main components

  • SQS

  • API Gateway (REST)

  • IAM Role

SQS

In this template, I'm creating two queues, the queue that receives the messages for processing and one that will act as a Dead Letter Queue and receive all the messages that fail to process more than 5 times.

I'm also using a FIFO queue to illustrate how to use the OpenAPI extension to deal with message grouping and content-based deduplication. But for most use cases, the order of processing isn't important and we can default to a normal queue.

# SQS
Queue:
    Type: AWS::SQS::Queue
    Properties:
        QueueName: !Sub ${QueueName}-${StageName}.fifo
        FifoQueue: true
        ContentBasedDeduplication: true
        RedrivePolicy:
            deadLetterTargetArn: !GetAtt QueueDLQ.Arn
            maxReceiveCount: 5
# SQS DLQ
QueueDLQ:
    Type: AWS::SQS::Queue
    Properties:
        QueueName: !Sub ${QueueName}-${StageName}-dlq.fifo
        FifoQueue: true

API Gateway

I'm using the REST API Gateway, not the HTTP type (API Gateway v2). REST API supports more features than HTTP API such as API keys or request validation.

API Gateway will create an /events endpoint that will accept POST requests. It will perform basic validation of the request payload before passing it to SQS.

APIGateway:
    Type: AWS::ApiGateway::RestApi
    Properties:
        Name: !Ref AWS::StackName
        Description: !Sub ${AWS::StackName} API Gateway
        EndpointConfiguration:
            Types:
                - REGIONAL
        Body:
            openapi: 3.0.1
            info:
                title: API Gateway -> SQS
            paths:
                /events:
                    post:
                        description: Send events to SQS
                        operationId: sendEvent
                        requestBody:
                            content:
                                application/json:
                                    schema:
                                        $ref: "#/components/schemas/event"
                        # enable request validator
                        x-amazon-apigateway-request-validator: basic
                        # define request response
                        responses:
                            "200":
                                description: Successful request
                            "400":
                                description: Bad request
                            "500":
                                description: Server Error
                        # enable api gateway integration
                        x-amazon-apigateway-integration:
                            # ... removed for brevity
                            # ... check SQS Integration section

            # define request body schema
            components:
                schemas:
                    event:
                        # ... removed for brevity    
                        # ... check Request Validation section

            # define request validator
            x-amazon-apigateway-request-validators:
                basic:
                    validateRequestParameters: false
                    validateRequestBody: true

SQS Integration

The OpenAPI x-amazon-apigateway-integration extension allows us to configure SQS integration in the Body property for each endpoint. What you need to know is:

  • passthroughBehavior - is set to never so the request is always transformed using requestTemplates value

  • requestParameters - SQS POST request sends query parameters as a form in the body of the HTTP request, we need to set the Content-Type header accordingly

  • requestTemplates - map the request payload, headers, query, or path parameters to the SQS API request, it uses Velocity templates - check API Gateway mapping template & variable reference

  • responses - configures the response, in our case returns the SQS MessageId in case of a successful request

x-amazon-apigateway-integration:
    httpMethod: POST
    type: aws
    credentials: !GetAtt AGW2SQSRole.Arn
    uri: !Sub arn:aws:apigateway:${AWS::Region}:sqs:path/${AWS::AccountId}/${Queue.QueueName}
    passthroughBehavior: never
    requestParameters:
        integration.request.header.Content-Type: "'application/x-www-form-urlencoded'"
    requestTemplates:
        application/json: |
            #set($MsgDedupId = $context.requestId)
            #set($GroupId = $input.json('$.data.event_type'))
            Action=SendMessage&MessageGroupId=$GroupId&MessageDeduplicationId=$MsgDedupId&MessageBody=$input.body
    responses:
        default:
            statusCode: 200
            responseTemplates:
                application/json: |
                    #set($output = $input.path('$'))
                    {
                      "id": "$output.SendMessageResponse.SendMessageResult.MessageId"
                    }

Request Validation

API Gateway allows us to perform a basic request validation before sending the message to SQS. When the validation fails, API Gateway immediately fails the request, returns a 400 error response, and publishes the validation results in CloudWatch Logs.

We're using the OpenAPI components property to define our model schema, in our case, we'll only validate the request body.

event:
    required:
        - event
        - recipient
        - timestamp
    type: object
    properties:
        event:
            type: string
            example: opened
            enum:
                - accepted
                - opened
                - clicked
                - failed
                - rejected
                - unsubscribed
                - complained
        recipient:
            type: string
            example: "john@example.com"
            pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
        timestamp:
            type: string
            format: date-time
            example: "2023-01-01T23:00:00Z"

IAM Role

The IAM Role will be assumed by the API Gateway service and will allow it to send messages to our SQS queue, it'll also allow API Gateway to write logs into CloudWatch.

Note the IAM policy is only granting the service permission to perform this specific task.

# role allowing AGW to send messages to SQS queue
AGW2SQSRole:
    Type: AWS::IAM::Role
    Properties:
        Path: /
        RoleName: !Sub ${AWS::StackName}-agw2sqs
        AssumeRolePolicyDocument:
            Version: "2012-10-17"
            Statement:
                - Effect: Allow
                  Principal:
                      Service:
                          - apigateway.amazonaws.com
                  Action: sts:AssumeRole
        Policies:
            - PolicyName: !Sub ${AWS::StackName}-agw2sqs
              PolicyDocument:
                  Version: "2012-10-17"
                  Statement:
                      # allow AGW to send messages to SQS queue
                      - Effect: Allow
                        Action:
                            - sqs:SendMessage
                        Resource: !GetAtt Queue.Arn
                      # let AGW write to CloudWatch
                      - Effect: Allow
                        Action:
                            - logs:CreateLogGroup
                            - logs:CreateLogStream
                            - logs:PutLogEvents
                        Resource: "*"

If you're having trouble understanding IAM concepts check my article about it - Explain like I'm 5: AWS IAM

One-click Deployment to get going quickly, or click to view the complete self-contained Cloudformation template that can be deployed into any AWS account.