Serverless - Using Lambda Triggered by SQS For Job Scheduling

The Problem

AWS SQS and Lambda Duet

Imagine your code waits for some external system to respond. In the non-serverless world, you’d most likely just add a Quartz/cron-like job that periodically checks if the external processing finished. That’s easy as you have the server running 24/7.

But how you’d do this in AWS serverless world? There are a couple of solutions like:

However, in this post, I’ll focus on using an AWS SQS (Simple Queue Service).

Solution

The whole idea is built around the SQS feature called Visibility Timeout. This is the time after which an unprocessed message will be visible back in the queue so another listener can see it. Take a look at the following image from Amazon:

AWS SQS Visibility Timeout

How we can use it for job scheduling?

The idea is that:

  1. we will send an SQS message whenever a new job will be scheduled,
  2. after 5 seconds (DelaySeconds: 5) the message will be visible in the queue,
  3. another lambda will be a listener on this queue and will try to process it.

Now, if the external system:

So, on to the code!

Preparation

I’m assuming you have Serverless Framework installed along with AWS CLI. If not, please refer to these guides (Serverless Framework and AWS CLI) on how to prepare your environment.

The AWS Lambda code is written using NodeJS 10.x runtime.

Code

I have put a sample code at my Github repository here.

It consists of:

aws.yaml

It’s a CloudFormation template that you need to invoke using AWS Console or AWS CLI to prepare the SQS queue and accompanying DLQ (Dead Letter Queue.)

(We could be doing this directly in Serverless Framework but I want to get some experience directly with CloudFormation hence such approach.)

AWSTemplateFormatVersion: 2010-09-09
Description: >
  Sample SQS queue with DLQ to be used by Lambda and act like a job that checks
  if some external processing has been finished
Parameters:
  QueueNameParam:
    Type: String
    Default: tasks-queue
    Description: Enter the name of the queue. DLQ will be created suffixed "-dlq"
Resources:
  MyTasksQueue:
    Type: 'AWS::SQS::Queue'
    Properties:
      DelaySeconds: 5
      QueueName: !Ref QueueNameParam
      VisibilityTimeout: 5
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt MyTasksDLQ.Arn
        maxReceiveCount : 10
      Tags:
        - Key: created_by
          Value: serverless-lambda-sqs-trigger
  MyTasksDLQ:
    Type: 'AWS::SQS::Queue'
    Properties:
      QueueName: !Join ['-', [!Ref QueueNameParam, 'dlq']]
      Tags:
        - Key: created_by
          Value: serverless-lambda-sqs-trigger
Outputs:
  QueueArn:
    Description: ARN of the SQS Queue to be used in Serverless Framework YAML config file
    Value: !GetAtt MyTasksQueue.Arn
  QueueUrl:
    Description: URL of the SQS Queue to be used in AWS SQS API
    Value: !Ref MyTasksQueue

After you’ll invoke the CloudFormation template, a stack will be created and in Outputs section of it you can see two values:

AWS Cloud Formation Output

Write it down as you’ll need it when configuring Serverless Framework.

You can adjust the name of the queue to be created using the QueueNameParam. It defaults to tasks-queue.

$ aws cloudformation create-stack --stack-name "MySqsLambdaStack" --template-body file://aws.yaml --parameters ParameterKey=QueueNameParam,ParameterValue=my-custom-queue-name

You can also check the results of the stack creation without entering the AWS console:

$ aws cloudformation describe-stacks --stack-name "MySqsLambdaStack"

/serverless

It’s a Serverless Framework project containing 2 AWS Lambda functions:

service: serverless-lambda-sqs-trigger

provider:
  name: aws
  runtime: nodejs10.x
  region: eu-central-1
  memorySize: 128
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "sqs:SendMessage"
        - "sqs:GetQueueUrl"
      Resource: ${self:custom.sqsTasksArn}
  environment:
    SQS_TASKS_URL: "PUT YOUR SQS TASK URL HERE"

functions:
  createTask:
    handler: handler.createTask
    events:
      - http: POST /tasks

  executeTask:
    timeout: 5
    handler: handler.executeTask
    events:
      - sqs:
          arn: ${self:custom.sqsTasksArn}
          batchSize: 1

custom:
  sqsTasksArn: "PUT YOUR SQS TASK ARN HERE"

And here is the implementation of lambdas itself:

'use strict';

const AWS = require('aws-sdk');
const SQS = new AWS.SQS({apiVersion: '2012-11-05'});

const queueUrl = process.env.SQS_TASKS_URL;

module.exports.createTask = async event => {
    return new Promise((resolve) => {
        console.info("Scheduling a task for later invocation.");
        
        const params = {
            MessageBody: JSON.stringify("My Scheduled Task"),
            QueueUrl: queueUrl
        };

        SQS.sendMessage(params, (err, data) => {
            if (err) {
                throw "Task not scheduled";
            }

            resolve({
                statusCode: 202,
                body: "Task scheduled"
            });
        });
    });
};

module.exports.executeTask = async event => {
    console.info('Received task to be executed.');

    // We can do "[0]" thanks to "batchSize: 1"; otherwise, be prepared for a list of events!
    const result = tryToProcess(event.Records[0].body);

    console.info("Correctly processed the message with result: %s", result);
};

// This simulates some business logic process - here we're just getting some random responses
const tryToProcess = (requestData) => {
    if (Math.round(Math.random()) === 0) {
        throw "Need to wait for some third-party system, try again later";
    }

    return "Success";
};

You’d need to put your SQS queue data you wrote down after CloudFormation execution in sqsTasksArn and SQS_TASKS_URL.

Deploy

Enter the serverless directory and deploy the serverless application:

$ sls deploy

After the success, you’ll see the AWS API Gateway link you can use to simulate the job being scheduled.

Invoke and Observe the Results

Now simulate few tasks being scheduled by invoking the POST /tasks endpoint:

$ curl -X POST https://{your-api-gw-url}/tasks

All requests should finish with HTTP 202 (Accepted). The Job has been scheduled in the form of an SQS message.

You can now observe the logs using CloudWatch or directly using Serverless Framework:

$ sls logs -tf executeTask 

Some tasks, randomly, should be marked as:

See exemplary output below:

Execution logs

Summary

You have just created a very simplistic job execution where you pay only for the actual processing time (i.e. the code is invoked only when there is a process we’re waiting for).

If there is no process happening in the external system - no processing is done with no costs on your side.

In the end, there are a couple of things that you might consider adjusting: