Skip to content

Step functions

serverless-builder have a built-in support for serverless-step-functions which allows for a faster and more comprehensive development of complex workflows with AWS Step Functions

serverless-builder provides a programming interface for building a complex workflows using serverless lambda functions with some extra features like: * autogeneration of the state names * auto tracking of the dependencies (automatically setting up )

from serverless.aws.features.stepfunctions import Branch, Choice, Iterator, Map, Scheduled, Succeed, Task

get_all_users = service.builder.function.generic(
    "get_all_users",
    "Returns with all users (for scheduling tasks)",
    "task_service.handler.get_all_users",
)

weekly_task = service.builder.function.generic(
    "weekly_task",
    "Schedules tasks for a user on each week",
    "task_service.handler.weekly_tasks",
    timeout=30,
)

machine = service.stepFunctions.machine("Weekly", "", type="EXPRESS", auto_fallback=False, auto_catch=False)
task = machine.task(get_all_users)
task.next(
    machine.parallel(
        name="ProcessAndContinue",
        branches=[
            Branch(
                Map(
                    name="ProcessUser",
                    items_path="$.user_ids",
                    result_path=None,
                    input_path=None,
                    concurrency=1,
                    end=True,
                    steps=Iterator(
                        map_name="ProcessUser",
                        steps=[Task(name="ProcessFunction", end=True, function=weekly_task)],
                        auto_catch=False,
                        auto_fallback=False,
                    ),
                )
            ),
            Branch(
                Choice(
                    name="ContinueIfMoreUsers",
                    default="NoMoreUsers",
                    choices=[dict(IsPresent=True, Next="Restart", Variable="$.PaginationToken")],
                ),
                Succeed(name="NoMoreUsers"),
                Task(
                    name="Restart",
                    end=True,
                    parameters=dict(
                        Input={"PaginationToken.$": "$.PaginationToken"}, StateMachineArn=machine.arn()
                    ),
                    resource="arn:aws:states:::states:startExecution",
                ),
            ),
        ],
        end=True,
    )
)

machine.event(Scheduled("0 18 ? * SAT *", dict(start="true")))

Sample workflow generated with serverless

stepFunctions:
  validate: true
  stateMachines:
    Weekly:
      name: task-service-${sls:stage}-Weekly
      tracingConfig:
        enabled: true
      type: EXPRESS
      loggingConfig:
        level: ERROR
        includeExecutionData: true
        destinations:
        - Fn::GetAtt:
          - StepmachineTaskServiceWeeklyLogGroup
          - Arn
      definition:
        StartAt: GetAllUsers
        Comment: ''
        States:
          GetAllUsers:
            Type: Task
            Resource:
              Fn::GetAtt:
              - GetAllUsersLambdaFunction
              - Arn
            Next: ProcessAndContinue
          ProcessAndContinue:
            Type: Parallel
            Branches:
            - StartAt: ProcessUser
              States:
                ProcessUser:
                  Type: Map
                  ItemsPath: $.user_ids
                  MaxConcurrency: 1
                  End: true
                  Iterator:
                    StartAt: ProcessFunction
                    States:
                      ProcessFunction:
                        Type: Task
                        Resource:
                          Fn::GetAtt:
                          - WeeklyTaskLambdaFunction
                          - Arn
                        End: true
            - StartAt: ContinueIfMoreUsers
              States:
                ContinueIfMoreUsers:
                  Type: Choice
                  Default: NoMoreUsers
                  Choices:
                  - IsPresent: true
                    Next: Restart
                    Variable: $.PaginationToken
                NoMoreUsers:
                  Type: Succeed
                Restart:
                  Type: Task
                  Parameters:
                    Input:
                      PaginationToken.$: $.PaginationToken
                    StateMachineArn: arn:aws:states:${aws:region}:${aws:accountId}:stateMachine:task-service-${sls:stage}-Weekly
                  Resource: arn:aws:states:::states:startExecution
                  End: true
            End: true
      events:
      - schedule:
          rate: cron(0 18 ? * SAT *)
        inputPath:
          start: 'true'