Saturday 25 August 2018

AWS S3 event aggregation with Lambda and DynamoDB

Introduction

S3 has had event notifications since 2014 and for individual object notifications these events work well with Lambda, allowing you to perform an action on every object event in a bucket. It is harder to use this approach when you want to perform an action a limited number of times or at an aggregated bucket level. An example use case would be refreshing a dependency (like Storage Gateway RefreshCache) when you are expecting a large number of objects events in a bucket. Performing a relatively expensive action for every event is not practical or efficient in this case. This post provides a solution for aggregating these events using Lambda, DynamoDB, and SQS.

Update (June/July 2020)

Cache refresh can now be automated. The event aggregation approach below may still be useful depending on your requirements.


The problem

We want to call RefreshCache on our Storage Gateway (SGW) whenever the contents of the S3 bucket it exposes are updated by an external process. If the external process is updating a large number of (small) S3 objects then a large number of S3 events will be triggered. We don't want to overload our SGW with refresh requests so we need a way to aggregate these events to only send occasional refresh requests.

The solution

The solution is fairly simple and uses DynamoDB's Conditional Writes for synchronisation and SQS Message Timers to enable aggregation. When the Lambda function processes a new object event it first checks to see if the event falls within the window of the currently active refresh request. If the event is within the window it will automatically be included when the refresh executes and the event can be ignored. If the event occurred after the last refresh then a new refresh request is sent to an SQS queue with a message timer equal to the refresh window period. This allows for all messages received within a refresh window to be included in a single refresh operation.

Implementation

At a high level we need to create resources (SQS queue, DynamoDB table, Lambda functions), set up permissions (create and assign IAM roles), and apply some configuration (linking Lambda to S3 event notification and SQS queues). This implementation really belongs in a CloudFormation template (and I may actually create one) but I was interested to try and do this entirely via the AWS CLI, masochistic as that may be. If you are not interested in the gory implementation details then skip ahead to 'Creation and deletion script' section

Let's start with the S3 event aggregation piece. We need:
  1. A DynamoDB table to track state
  2. An SQS queue as a destination for aggregated actions
  3. A Lambda function for processing and aggregating the S3 events
  4. IAM permissions for all of the above
As the DynamoDB table and SQS queue are independent we can create these first:
aws dynamodb create-table --table-name S3EventAggregator --attribute-definitions AttributeName=BucketName,AttributeType=S --key-schema AttributeName=BucketName,KeyType=HASH --provisioned-throughput ReadCapacityUnits=1,WriteCapacityUnits=5

aws sqs create-queue --queue-name S3EventAggregatorActionQueue


Naturally this needs to be done with a user that has sufficient permissions and assumes your default region is set.

The Lambda function is a bit trickier as it requires a role to be created before the function can be created. So let's start with the IAM permissions. First let's create a policy allowing DynamoDB GetItem and UpdateItem to be performed on the DynamoDB table we created earlier. To do this we need a JSON file containing the necessary permissions. The dynamo-writer.json file looks like this:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
   "dynamodb:UpdateItem",
   "dynamodb:GetItem"
   ],
            "Resource": "arn:aws:dynamodb:REGION:ACCOUNT_ID:table/S3EventAggregator"
        }
    ]
}

We need to replace REGION and ACCOUNT_ID with the relevant values. As we are aiming at using the command line for this exercise, let's use STS to retrieve our account ID, set our region, and then use sed to substitute both variables:

ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text) 
AWS_DEFAULT_REGION="eu-west-1" 
wget -O dynamo-writer.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/dynamo-writer.json 
sed -i "s/ACCOUNT_ID/$ACCOUNT_ID/g" dynamo-writer.json
sed -i "s/REGION/$AWS_DEFAULT_REGION/g" dynamo-writer.json

aws iam create-policy --policy-name S3EventAggregatorDynamo --policy-document file://dynamo-writer.json

We now have a policy that allows the caller (our soon to be created Lambda function in this case) to update items in the S3EventAggregator DynamoDB table. Next we need to create a policy to allow the function to write messages to SQS. The sqs-writer.json policy file contents are similar to the DynamoDB policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "sqs:SendMessage",
            "Resource": "arn:aws:sqs:REGION:ACCOUNT_ID:S3EventAggregatorActionQueue"
        }
    ]
}

Retrieving the file and substituting the ACCOUNT_ID and REGION using the environment variables we created for the DynamoDB policy:
wget -O sqs-writer.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/sqs-writer.json 
sed -i "s/ACCOUNT_ID/$ACCOUNT_ID/g" sqs-writer.json
sed -i "s/REGION/$AWS_DEFAULT_REGION/g" sqs-writer.json
aws iam create-policy --policy-name S3EventAggregatorSqsWriter --policy-document file://sqs-writer.json

Having defined the two resource access policies let's create an IAM role for the Lambda function. 
wget -O lambda-trust.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/lambda-trust.json
aws iam create-role --role-name S3EventAggregatorLambdaRole --assume-role-policy-document file://lambda-trust.json

The lambda-trust.json policy allows Lambda access to assume roles via STS and looks like this (no substitutions required for this one):
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

We can now attach the SQS and DynamoDB policies to the new created Lambda role. We also need the AWSLambdaBasicExecutionRole which is an AWS managed policy providing access to CloudWatch logs and Lambda function execution:
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3EventAggregatorDynamo --role-name S3EventAggregatorLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3EventAggregatorSqsWriter --role-name S3EventAggregatorLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name S3EventAggregatorLambdaRole

We are now finally ready to create the S3EventAggregator Lambda function. Starting by creating the Python deployment package:
wget -O s3_aggregator.py https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/src/s3_aggregator.py
zip function.zip s3_aggregator.py

And then creating the function (using the AWS_DEFAULT_REGION and ACCOUNT_ID environment variables again):
aws lambda create-function --function-name S3EventAggregator --runtime python3.6 --role arn:aws:iam::$ACCOUNT_ID:role/S3EventAggregatorLambdaRole --zip-file fileb://function.zip --handler s3_aggregator.lambda_handler --timeout 10 --environment "Variables={QUEUE_URL=https://sqs.$AWS_DEFAULT_REGION.amazonaws.com/$ACCOUNT_ID/S3EventAggregatorActionQueue,REFRESH_DELAY_SECONDS=30,LOG_LEVEL=INFO}"
aws lambda put-function-concurrency --function-name S3EventAggregator --reserved-concurrent-executions 1

The function concurrency is set to 1 as there is no benefit to having the function processing S3 events concurrently and 'single threading' the function will limit the maximum concurrent DynamoDB request rate to reduce DynamoDB capacity usage and costs.

All that is left now is to give S3 permission to execute the Lambda function and link the bucket notification events to the S3EventAggregator function. Giving S3 permission on the specific bucket:
BUCKET=my-bucket
aws lambda add-permission --function-name S3EventAggregator --statement-id SID_$BUCKET --action lambda:InvokeFunction --principal s3.amazonaws.com --source-account $ACCOUNT_ID --source-arn arn:aws:s3:::$BUCKET

Interestingly, the --source-arn can be omitted to avoid needing to add permissions for each bucket you want the function to operate on but it is required (and must match a specific bucket) for the Lambda Console to display the function and trigger correctly. The S3 event.json configuration creates an event on any object creation or removal events:
{
    "LambdaFunctionConfigurations": [
        {
            "Id": "s3-event-aggregator",
            "LambdaFunctionArn": "arn:aws:lambda:REGION:ACCOUNT_ID:function:S3EventAggregator",
            "Events": [
                "s3:ObjectCreated:*",
                "s3:ObjectRemoved:*"
            ]
        }
    ]
}

Once again substituting the relevant region and account IDs:
wget -O event.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/s3/event.json
sed -i "s/ACCOUNT_ID/$ACCOUNT_ID/g" event.json
sed -i "s/REGION/$AWS_DEFAULT_REGION/g" event.json

And linking the event configuration to a bucket:
aws s3api put-bucket-notification-configuration --bucket $BUCKET --notification-configuration file://event.json

Thus concludes the event aggregation part of the solution. A quick test confirms the event aggregation is working as expected:

time for i in $(seq 1 5); do aws s3 cp test.txt s3://$BUCKET/test$i.txt; done
upload: ./test.txt to s3://s3-test-net/test1.txt                  
upload: ./test.txt to s3://s3-test-net/test2.txt                  
upload: ./test.txt to s3://s3-test-net/test3.txt                  
upload: ./test.txt to s3://s3-test-net/test4.txt                  
upload: ./test.txt to s3://s3-test-net/test5.txt                  

real 0m2.106s
user 0m1.227s
sys 0m0.140s

STREAM=$(aws logs describe-log-streams --log-group-name /aws/lambda/S3EventAggregator --order-by LastEventTime --descending --query 'logStreams[0].logStreamName' --output text); aws logs get-log-events --log-group-name /aws/lambda/S3EventAggregator --log-stream-name $STREAM --query 'events[*].{msg:message}' --output text | grep "^\[" | sed 's/\t/ /g'
[INFO] 2018-08-12T18:14:03.647Z 7da07415-9e5b-11e8-ab6d-8f962149ce24 Sending refresh request for bucket: s3-test-net, timestamp: 1534097642149
[INFO] 2018-08-12T18:14:04.207Z 7e1d6ca2-9e5b-11e8-ac9d-e1f0f9729f66 Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097642938
[INFO] 2018-08-12T18:14:04.426Z 7eefb013-9e5b-11e8-ab6d-8f962149ce24 Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097643812
[INFO] 2018-08-12T18:14:04.635Z 7e5b5feb-9e5b-11e8-8aa9-7908c99c450a Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097643371
[INFO] 2018-08-12T18:14:05.915Z 7ddb5a72-9e5b-11e8-80de-0dd6a15c3f62 Refresh for bucket: s3-test-net within refresh window, skipping. S3 Event timestamp: 1534097642517

From the 'within refresh window' log messages we can see 4 of the 5 events were skipped as they fell within the refresh aggregation window. Checking the SQS queue we can see the refresh request event:

aws sqs receive-message --queue-url https://$AWS_DEFAULT_REGION.queue.amazonaws.com/$ACCOUNT_ID/S3EventAggregatorActionQueue --attribute-names All --message-attribute-names All
{
    "Messages": [
        {
            "MessageId": "c0027dd2-30bc-48bc-b622-b5c85d862c92",
            "ReceiptHandle": "AQEB9DQXkIWsWn...5XU2a13Q8=",
            "MD5OfBody": "99914b932bd37a50b983c5e7c90ae93b",
            "Body": "{}",
            "Attributes": {
                "SenderId": "AROAI55PXBF63XVSEBNYM:S3EventAggregator",
                "ApproximateFirstReceiveTimestamp": "1534097653846",
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1534097642728"
            },
            "MD5OfMessageAttributes": "6f6eaf397811cbece985f3e8d87546c3",
            "MessageAttributes": {
                "bucket-name": {
                    "StringValue": "s3-test-net",
                    "DataType": "String"
                },
                "timestamp": {
                    "StringValue": "1534097642149",
                    "DataType": "Number"
                }
            }
        }
    ]
}

Moving onto the final part of the solution, we need a Lambda function that processes the events that the S3EventAggregator function sends to SQS. For the function's permissions we can reuse the S3EventAggregatorDynamo policy for DynamoDB access but will need to create a new policy for reading and deleting SQS messages and refreshing the Storage Gateway cache.

The sgw-refresh.json is as follows, note that SMB file shares are included but the current Lambda execution environment only supports boto3 1.7.30 which does not actually expose the SMB APIs (more on working around this later):
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "storagegateway:RefreshCache",
                "storagegateway:ListFileShares",
                "storagegateway:DescribeNFSFileShares",
                "storagegateway:DescribeSMBFileShares"
            ],
            "Resource": "*"
        }
    ]
}

Creating the policy:
wget -O sgw-refresh.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/sgw-refresh.json
aws iam create-policy --policy-name StorageGatewayRefreshPolicy --policy-document file://sgw-refresh.json

The sqs-reader.json gives the necessary SQS read permissions  on the S3EventAggregatorActionQueue:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:ReceiveMessage"
            ],
            "Resource": "arn:aws:sqs:REGION:ACCOUNT_ID:S3EventAggregatorActionQueue"
        }
    ]
}

Substituting  and creating the policy:
wget -O sqs-reader.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/sqs-reader.json
sed -i "s/ACCOUNT_ID/$ACCOUNT_ID/g" sqs-reader.json 
sed -i "s/REGION/$AWS_DEFAULT_REGION/g" sqs-reader.json
aws iam create-policy --policy-name S3EventAggregatorSqsReader --policy-document file://sqs-reader.json

And then creating the role and adding the relevant policies:
wget -O lambda-trust.json https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/iam/lambda-trust.json
aws iam create-role --role-name S3AggregatorActionLambdaRole --assume-role-policy-document file://lambda-trust.json
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name S3AggregatorActionLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3EventAggregatorSqsReader --role-name S3AggregatorActionLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/S3EventAggregatorDynamo --role-name S3AggregatorActionLambdaRole
aws iam attach-role-policy --policy-arn arn:aws:iam::$ACCOUNT_ID:policy/StorageGatewayRefreshPolicy --role-name S3AggregatorActionLambdaRole

Next we will create the Lambda function, this will however depend on whether or not you require SMB file share support. As mentioned earlier the current Lambda execution environment does not expose the new SMB file share APIs so if you have SMB shares mapped on your Storage Gateway you will have to include the latest botocore and boto3 libraries with your deployment. The disadvantage of this is that you are not able to view the code in the Lambda console (due to the deployment file size limitation). If you are only using NFS shares then you only need the code without the latest libraries but it will break if you add an SMB share before the Lambda execution environment supports it. Including the dependency in the deployment is the preferred option so that is what we are going to do:

mkdir deploy
pip install boto3 botocore -t deploy
wget -O deploy/s3_sgw_refresh.py https://raw.githubusercontent.com/watchamcb/s3-event-aggregator/master/src/s3_sgw_refresh.py
cd deploy
zip -r function.zip *

aws lambda create-function --function-name S3StorageGatewayRefresh --runtime python3.6 --role arn:aws:iam::$ACCOUNT_ID:role/S3AggregatorActionLambdaRole --zip-file fileb://function.zip --handler s3_sgw_refresh.lambda_handler --timeout 5 --environment "Variables={LOG_LEVEL=INFO}"
aws lambda put-function-concurrency --function-name S3StorageGatewayRefresh --reserved-concurrent-executions 1

And finally create the event mapping to execute the S3StorageGatewayRefresh function when messages are received on the queue:

aws lambda create-event-source-mapping --function-name S3StorageGatewayRefresh --event-source arn:aws:sqs:$AWS_DEFAULT_REGION:$ACCOUNT_ID:S3EventAggregatorActionQueue --batch-size 1

And that is the final solution. Verifying it works as expected, let's mount the NFS share and upload some files via the CLI and confirm the share is refreshed. Mounting the share:

$ sudo mount -t nfs -o nolock 172.31.2.13:/s3-test-net share
$ cd share/sgw
$ ls -l
total 0

Uploading the files through the CLI (from a different machine):

$ for i in $(seq 1 20); do aws s3 cp test.txt s3://$BUCKET/sgw/test$i.txt; done
upload: ./test.txt to s3://s3-test-net/test1.txt                
upload: ./test.txt to s3://s3-test-net/test2.txt                
...
upload: ./test.txt to s3://s3-test-net/test19.txt              
upload: ./test.txt to s3://s3-test-net/test20.txt    

And confirming the refresh of the share:

$ ls -l
total 10
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test10.txt
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test11.txt
...
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test8.txt
-rw-rw-rw- 1 nobody nogroup 19 Aug 25 07:46 test9.txt

Creation and deletion scripts

For convenience a script to create (and remove) this stack is provided on GitHub. Clone the s3-event-aggregator repository and run the create-stack.sh and delete-stack.sh scripts respectively. You need to have the AWS CLI installed and configured and sed and zip must be available. Be sure to edit the BUCKET variable in the script to match your bucket name and change the REGION if appropriate.

Note that the delete stack script will not remove the S3 event notification configuration by default. There is no safe and convenient way to remove only the S3EventAggregator configuration (other than removing all configuration which may result in unintended loss of other event configuration). If you have other events configured on a bucket it is best to use the AWS Console to remove the s3-event-aggregator event configuration. If there are no other events configured on your bucket you can safely uncomment the relevant line in the deletion script.

Configuration

The two Lambda functions both have a LOG_LEVEL environment variable to control the details logged to CloudWatch Logs, the functions were created with the level set to INFO but DEBUG may be useful for troubleshooting and WARN is probably appropriate for use in production.

The S3EventAggregator function also has an environment variable called REFRESH_DELAY_SECONDS for controlling the event aggregation window. It was initialised to 30 seconds when the function was created but it may be appropriate to change it depending on your S3 upload pattern. If the uploads are mostly small and complete quickly, or if you need the Storage Gateway to reflect changes quickly then this may be a reasonable value. If you are performing larger uploads or the total upload process takes significantly longer then the refresh window would need to be increased to be longer than the total expected upload time.

The DynamoDB table was created with 5 write capacity units and as the entries are less than 1KB this should be sufficient as long as you are not writing more than 5 objects a second to the Storage Gateway S3 bucket. Writing more than this will required additional write capacity to be provisioned (or auto scaling enabled).

The same code can be used for multiple buckets by simply adding additional bucket event configurations via the CLI put-bucket-notification-configuration as above or using the AWS Console.

Cost

There are three component costs involved in this solution, the two Lambda functions, DynamoDB, and SQS. The Lambda and DynamoDB costs will scale fairly linearly with usage with both the S3EventAggregator and DynamoDB being charged for each S3 event that is triggered. To get an idea of the number of events to expect you can enable S3 metrics on the bucket and check the PUT and DELETE counts. The S3StorageGatewayRefresh function and SQS messages will be a fraction of the total S3 event counts and dependent on the REFRESH_DELAY_SECONDS configuration. A longer refresh delay will result in fewer SQS messages and S3StorageGatewayRefresh function executions.

As an example lets use an example of 1000 objects uploaded a day with these being aggregated into 50 refresh events. For simplicity we will also assume that the free tier has been exhausted and that there are 30 days in the month. The total Lambda request count will then be:
(30 days x 1000 S3 events) + (30 days x 50 refresh events)  = 31 500 request
As Lambda requests are charged in 1 million increments this will result in a charge of $0.2 for the requests

The compute charges are based on duration, with the S3EventAggregator executing in less than 100ms for all aggregate events and around 300 - 600ms for the refresh events. The S3StorageGatewayRefesh function takes between 400ms and 800ms. Giving us:
(30 days x 950 S3 requests x 0.1s) + (30 days x 50 S3 requests x 0.5s) + (30 days x 50 refresh events x 0.7s) = 4650 seconds
Lambda compute is charged in GB-s, so:
4650 seconds x 128MB/1024 = 581.25 GB-s at $0.00001667 = $0.0096894
Bringing the total Lambda charges for the month to $0.21

For DynamoDB we have provisioned 5 WCU at $0.000735/WCU/hour and 1 RCU at $0.000147/RCU/hour working out as:
(5 WCU x 0.000735 per hour x 24 hours a day x 30 days) + (1 RCU x 0.000147 per hour x 24 hours x 30 days) = $2.75 a month

SQS charges per million requests with the 1500 send message requests and a further 1500 receive and delete requests all falling under this limit (and thus only costing $0.40 for the month). It is worth noting that Lambda does poll the SQS queue roughly 4 times a minute and this will contribute to your total SQS request costs, using around 172,800 SQS requests a month.

There are some other costs associated with CloudWatch Logs and DynamoDB storage but these should be fairly small compared to the request costs and I would not expect the total cost of the stack to be more than $10 - $15 a month.

Conclusion

And so ends this post, well done for reading to the end. I quite enjoyed building this solution and will look at converting it to a CloudFormation template at a later stage. Feel free to log issues or pull requests against the GitHub repo.