Scheduling and delaying queue messages - with aws step functions

Background

Consider a scenario where you run an e-commerce platform that offers flash sales and limited-time offers to your customers. These time-limited promotions require precise timing to ensure that they start and end at the designated times. Additionally, you may want to send personalized notifications to customers before a sale starts, reminding them to be ready to take advantage of the discounts.

Implementing a delayed schedule in this case becomes crucial. You need to schedule the start and end times of the flash sales in advance and set up reminders to notify your customers before the sale begins. Furthermore, you may need to delay push notifications to inform customers about specific offers and promotions during the sale period.

Design Purpose:

  1. Consumption Behavior

  2. High Availability

  3. Real-time

  4. Support for Message Deletion

Challenges with Traditional Scheduling Approaches:

While scanning the schedule at regular intervals might seem like a straightforward solution, it does come with a few disadvantages. Let's take a closer look at some of these challenges:

Extra Storage and Business Coupling: Implementing timing or delay tasks often requires an additional message (msg) table for storage. This creates a coupling between storage and business logic, which can lead to increased complexity.

Timing Control: Scanning the schedule at regular intervals might not always be precise, potentially resulting in missed trigger times for executing tasks.

Database Pressure: Maintaining a separate msg table for each service that requires timing or delay tasks can impose a burden on the database. The constant scanning and processing of the table generate repeated pressure on the database instance.

The Biggest Problem:

The scheduling model based on scanning the schedule at regular intervals often leads to duplicated business logic across multiple services. This duplication of logic can cause maintenance and scalability issues.

Introducing AWS Step Functions:

In this newsletter, we will explore a closed-source solution using AWS services. While AWS Simple Queue Service (SQS) offers a delay functionality of up to 15 minutes, it requires frequent message consumption to ensure timely processing, which can become a bottleneck when handling large message volumes.

Step Functions and State Machines:

To overcome these limitations, let's consider a different approach. Instead of forwarding messages directly to the queue, we can leverage AWS Step Functions and its powerful state machine capabilities.

A state machine allows us to define a sequence of steps or states that the message will go through. We can specify waiting states with timestamps or durations, enabling precise control over when the message should proceed to the next step.

Here's how a state machine can be used to handle delayed message processing and forwarding:

  • Create a State Machine in Amazon Step Functions, defining the flow of message processing

  • Add a wait state to introduce a delay, specifying either a fixed interval or a dynamic one based on the message content.

Select Get timestamp from state input

  • Include a state that triggers the SQS service to forward the message to the specified queue.

By following these steps, we will have a working state machine that receives messages, keeps them in a waiting state for the specified duration, and then forwards them to the SQS queue for further processing.

Here's a JSON definition of the state machine:

{
  "Comment": "A description of my state machine",
  "StartAt": "Wait",
  "States": {
    "Wait": {
      "Type": "Wait",
      "Next": "SQS SendMessage",
      "Seconds": 0
    },
    "SQS SendMessage": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sqs:sendMessage",
      "Parameters": {
        "MessageBody.$": "$"
      },
      "End": true
    }
  }
}

To make the waiting interval dynamic, we can modify the Seconds parameter in the wait state to SecondsPath. This allows us to pass a dynamic variable from the message we send to the state machine.

"Wait": {
  "Type": "Wait",
  "SecondsPath": "$.expiryInterval", // dynamic variable passed in the message
  "Next": "SQS SendMessage"
}

Here's an example of how you can trigger AWS Step Functions using Go:

package main

import (
	"fmt"
	"log"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/stepfunctions"
)

func main() {
	// Create a new AWS session
	sess, err := session.NewSession(&aws.Config{
		Region:      aws.String("ap-south-1"),
		Credentials: credentials.NewStaticCredentials("ACCESS_KEY_ID", "SECRET_ACCESS_KEY", ""),
	})
	if err != nil {
		log.Fatal(err)
	}

	// Create a Step Functions client
	svc := stepfunctions.New(sess)

	// Specify the input data for the Step Function
	input := `{
		"input": "evan testing",
		"expiryInterval": 10
	}`

	// Specify the ARN of the Step Function state machine
	stateMachineArn := "STATE_MACHINE_ARN"

	// Create the StartExecution input parameters
	params := &stepfunctions.StartExecutionInput{
		StateMachineArn: aws.String(stateMachineArn),
		Input:           aws.String(input),
	}

	// Start the execution of the Step Function
	resp, err := svc.StartExecution(params)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Println(resp)
}

we have explored how AWS Step Functions can be used to implement a delay queue with just a few simple steps. By leveraging Step Function’s state machine capabilities, we can introduce delays in message processing and efficiently forward messages to the desired destinations, such as SQS queues. This approach helps decouple the timing and delay logic from specific business services, providing a more scalable and maintainable solution.

In the next edition of our newsletter, we will delve into implementing a delay queue using an open-source solution. Stay tuned!