Photo by Damien TUPINIER on Unsplash
Scalable webhook listeners in AWS
Using the API Gateway - SQS integration to create scalable webhook listeners and validate the requests at the API Gateway level
Traditionally, when we think about loose coupling and serverless in AWS, it's an API Gateway endpoint triggering a Lambda function that sends messages into SQS.
But API Gateway directly integrates with a handful of other AWS APIs, and SQS is one of them.
We can have an HTTP endpoint send messages directly to SQS without the need for a Lambda function, and we can even perform request validation at the API Gateway level.
This serverless pattern is extremely handy when creating 3rd party integrations for our services. In this article, I will explain how it works and provide a Cloudformation template you can easily adapt to your use case and deploy with one click.
Asynchronous processing
Processing incoming webhook data in real time can be challenging, especially if our service needs to perform additional processing or trigger downstream actions.
In these situations, the producer only needs to know if the data was successfully received, the processing can happen in the background, asynchronously, so we return a successful response and send the request in a queue for later processing.
We often know the schema in advance so we can also validate the request before sending the message to SQS. The backend worker service will consume messages from the queue at its own pace, one by one or in batches, and do the hard lifting.
If the order of processing is important we can use an SQS FIFO queue to process the messages in the exact order they were received. We can also bring an SQS Dead Letter Queue into the mix to collect the processing failures for further investigation or retry.
I used a similar architecture for a service that integrates with Mailgun for marketing email sending and reporting. Each time an action is performed by one of the email recipients (open, click, bounce, unsubscribe) Mailgun triggers the service HTTP endpoint and sends an event notification, the notification is stored in SQS, then a Lambda consumes these events, and creates campaign reports and other actions. It handles millions of events per week.
Main components
SQS
API Gateway (REST)
IAM Role
SQS
In this template, I'm creating two queues, the queue that receives the messages for processing and one that will act as a Dead Letter Queue and receive all the messages that fail to process more than 5 times.
I'm also using a FIFO queue to illustrate how to use the OpenAPI extension to deal with message grouping and content-based deduplication. But for most use cases, the order of processing isn't important and we can default to a normal queue.
# SQS
Queue:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${QueueName}-${StageName}.fifo
FifoQueue: true
ContentBasedDeduplication: true
RedrivePolicy:
deadLetterTargetArn: !GetAtt QueueDLQ.Arn
maxReceiveCount: 5
# SQS DLQ
QueueDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: !Sub ${QueueName}-${StageName}-dlq.fifo
FifoQueue: true
API Gateway
I'm using the REST API Gateway, not the HTTP type (API Gateway v2). REST API supports more features than HTTP API such as API keys or request validation.
API Gateway will create an /events
endpoint that will accept POST
requests. It will perform basic validation of the request payload before passing it to SQS.
APIGateway:
Type: AWS::ApiGateway::RestApi
Properties:
Name: !Ref AWS::StackName
Description: !Sub ${AWS::StackName} API Gateway
EndpointConfiguration:
Types:
- REGIONAL
Body:
openapi: 3.0.1
info:
title: API Gateway -> SQS
paths:
/events:
post:
description: Send events to SQS
operationId: sendEvent
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/event"
# enable request validator
x-amazon-apigateway-request-validator: basic
# define request response
responses:
"200":
description: Successful request
"400":
description: Bad request
"500":
description: Server Error
# enable api gateway integration
x-amazon-apigateway-integration:
# ... removed for brevity
# ... check SQS Integration section
# define request body schema
components:
schemas:
event:
# ... removed for brevity
# ... check Request Validation section
# define request validator
x-amazon-apigateway-request-validators:
basic:
validateRequestParameters: false
validateRequestBody: true
SQS Integration
The OpenAPI x-amazon-apigateway-integration
extension allows us to configure SQS integration in the Body
property for each endpoint. What you need to know is:
passthroughBehavior - is set to
never
so the request is always transformed usingrequestTemplates
valuerequestParameters - SQS POST request sends query parameters as a form in the body of the HTTP request, we need to set the
Content-Type
header accordinglyrequestTemplates - map the request payload, headers, query, or path parameters to the SQS API request, it uses Velocity templates - check API Gateway mapping template & variable reference
responses - configures the response, in our case returns the SQS MessageId in case of a successful request
x-amazon-apigateway-integration:
httpMethod: POST
type: aws
credentials: !GetAtt AGW2SQSRole.Arn
uri: !Sub arn:aws:apigateway:${AWS::Region}:sqs:path/${AWS::AccountId}/${Queue.QueueName}
passthroughBehavior: never
requestParameters:
integration.request.header.Content-Type: "'application/x-www-form-urlencoded'"
requestTemplates:
application/json: |
#set($MsgDedupId = $context.requestId)
#set($GroupId = $input.json('$.data.event_type'))
Action=SendMessage&MessageGroupId=$GroupId&MessageDeduplicationId=$MsgDedupId&MessageBody=$input.body
responses:
default:
statusCode: 200
responseTemplates:
application/json: |
#set($output = $input.path('$'))
{
"id": "$output.SendMessageResponse.SendMessageResult.MessageId"
}
Request Validation
API Gateway allows us to perform a basic request validation before sending the message to SQS. When the validation fails, API Gateway immediately fails the request, returns a 400 error response, and publishes the validation results in CloudWatch Logs.
We're using the OpenAPI components
property to define our model schema, in our case, we'll only validate the request body.
event:
required:
- event
- recipient
- timestamp
type: object
properties:
event:
type: string
example: opened
enum:
- accepted
- opened
- clicked
- failed
- rejected
- unsubscribed
- complained
recipient:
type: string
example: "john@example.com"
pattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
timestamp:
type: string
format: date-time
example: "2023-01-01T23:00:00Z"
IAM Role
The IAM Role will be assumed by the API Gateway service and will allow it to send messages to our SQS queue, it'll also allow API Gateway to write logs into CloudWatch.
Note the IAM policy is only granting the service permission to perform this specific task.
# role allowing AGW to send messages to SQS queue
AGW2SQSRole:
Type: AWS::IAM::Role
Properties:
Path: /
RoleName: !Sub ${AWS::StackName}-agw2sqs
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
Action: sts:AssumeRole
Policies:
- PolicyName: !Sub ${AWS::StackName}-agw2sqs
PolicyDocument:
Version: "2012-10-17"
Statement:
# allow AGW to send messages to SQS queue
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt Queue.Arn
# let AGW write to CloudWatch
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: "*"
If you're having trouble understanding IAM concepts check my article about it - Explain like I'm 5: AWS IAM
One-click Deployment to get going quickly, or click to view the complete self-contained Cloudformation template that can be deployed into any AWS account.