【AWS】CloudWatch Logsを3ヶ月(90日前)経過後に自動でS3へエクスポートする仕組みをStep Functions × Lambdaで構築する
CloudWatch Logsのログ保存コストを抑えるために、「一定期間(例えば3ヶ月・90日)が経過したログを定期的にS3バケットへ退避(エクスポート)させたい」というユースケースは多いかと思います。
しかし、CloudWatch Logsの CreateExportTask APIを実行するには、期間をミリ秒単位のUNIXタイムスタンプで指定する必要があります。AWS Step Functionsの組み込み関数だけでは、ログ取得日(90日前など)の動的な日付計算やミリ秒換算が難しいため、今回は軽量なLambda関数を組み合わせて日付を動的に取得する仕組みを構築しました。
本記事では、EventBridge Scheduler、Step Functions、Lambda、そしてS3バケットポリシーの設定までを網羅したCloudFormation(Cfn)テンプレートを含めて一挙に解説します。
記事の目次
1. 構成の概要と解決したい課題
今回のアーキテクチャは以下の通りです。
- EventBridge Scheduler: 毎日日本時間(JST)のAM 1:00に定期実行をトリガー
- Lambda: 現在時刻から「90日前」のUTC 00:00:00〜23:59:59を計算し、ミリ秒単位のUNIXタイムスタンプへ変換
- Step Functions:
- Lambdaから受け取ったタイムスタンプを基に、複数のロググループに対して順次(Map状態を使用して1つずつ)エクスポートタスク(CreateExportTask)を発行
- タスクのステータスを定期的に確認(DescribeExportTasks)し、完了するまで待機(ループ処理)
- S3バケット: エクスポートされたログファイルを保管
なぜLambdaが必要なのか?
Step Functions単体でも日時の取得はある程度可能ですが、CloudWatch LogsのAPI仕様である「ミリ秒単位のUNIXタイムスタンプ(例: 1719187200000)」への正確な変換や、「90日前の00:00:00〜23:59:59」といった範囲のパースは、Pythonの datetime ライブラリを用いたLambda関数を1ステップ挟むのが最も確実かつスマートだからです。
2. 事前準備:S3バケットポリシーの設定
CloudWatch LogsからS3バケットへデータを書き込むためには、ターゲットとなるS3バケットに専用のアクセスポリシーを付与しておく必要があります。
まずは、ログ集約用のS3バケットを事前に作成し、以下のバケットポリシーを設定してください(【あなたのS3バケット名】 部分は環境に合わせて書き換えてください)。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowCWLogsToPutObjects",
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::【あなたのS3バケット名】/*"
},
{
"Sid": "AllowCWLogsToGetBucketAcl",
"Effect": "Allow",
"Principal": {
"Service": "logs.ap-northeast-1.amazonaws.com"
},
"Action": "s3:GetBucketAcl",
"Resource": "arn:aws:s3:::【あなたのS3バケット名】"
}
]
}3. CloudFormation(Cfn)テンプレート
上記の自動化環境を一発でデプロイするためのCfnテンプレート(YAML)です。 既存のS3バケット名をパラメータ(ExistingS3BucketName)として渡すだけで、必要なIAMロール、Lambda、Step Functions、EventBridge Schedulerがすべて作成されます。
AWSTemplateFormatVersion: '2010-09-09'
Description: 'Step Functions + Lambda for CWL to S3 Export with Dynamic Date & Unix Timestamp'
# ============================================================
# パラメータ(デプロイ時に入力)
# ============================================================
Parameters:
ExistingS3BucketName:
Type: String
Description: 'すでに作成済みのエクスポート先S3バケット名を入力してください'
Resources:
# ============================================================
# 1. Lambda 実行ロール
# ============================================================
# 説明: Lambda 関数が AWS 上でアクションを実行するための権限を定義するロール
# Lambda は CloudWatch Logs にアクセス不要(日付計算のみ)
LambdaExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
# Lambda が このロールを引き受ける(Assume)ことを許可
- Effect: 'Allow'
Principal:
Service: 'lambda.amazonaws.com'
Action: 'sts:AssumeRole'
# AWS管理ポリシーをアタッチ(CloudWatch Logs への基本的な書き込み権限)
ManagedPolicyArns:
- 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole'
# ============================================================
# 2. Lambda 関数(日付計算 + UNIX タイムスタンプ変換)
# ============================================================
# 説明: 実行時刻から90日前の日付を自動計算し、
# CloudWatch Logs API が必要とする UNIX タイムスタンプに変換する
#
# 入力: event(Step Functions から自動的に渡される)
# 出力: {
# 'TargetDate': '2026-05-25', # YYYY-MM-DD 形式
# 'FromTime': 1779667200000, # その日の 00:00:00 UTC(ミリ秒)
# 'ToTime': 1779753599000, # その日の 23:59:59 UTC(ミリ秒)
# 'Message': 'Calculated dates for...'
# }
CalculateDateFunction:
Type: 'AWS::Lambda::Function'
Properties:
FunctionName: 'cwl-calculate-unix-timestamp'
Runtime: 'python3.11'
Handler: 'index.lambda_handler' # index.py の lambda_handler() 関数を実行
Role: !GetAtt LambdaExecutionRole.Arn
Code:
ZipFile: |
import json
from datetime import datetime, timedelta, timezone
def lambda_handler(event, context):
"""
実行時刻から90日前の日付を計算し、
CloudWatch Logs API が必要とする UNIX タイムスタンプ(ミリ秒)に変換
CloudWatch Logs の createExportTask API は、
From / To パラメータに UNIX タイムスタンプ(ミリ秒)を要求する。
ISO 8601 形式(例:2026-05-25T00:00:00Z)は受け付けない。
"""
# UTC 現在時刻から90日前を計算
now = datetime.now(timezone.utc)
target_date = now - timedelta(days=90)
# YYYY-MM-DD 形式で日付を取得(例:2026-05-25)
formatted_date = target_date.strftime('%Y-%m-%d')
# その日の 00:00:00 UTC をパース
from_datetime = datetime.strptime(
f"{formatted_date}T00:00:00",
"%Y-%m-%dT%H:%M:%S"
).replace(tzinfo=timezone.utc)
# その日の 23:59:59 UTC をパース
to_datetime = datetime.strptime(
f"{formatted_date}T23:59:59",
"%Y-%m-%dT%H:%M:%S"
).replace(tzinfo=timezone.utc)
# datetime.timestamp() は UNIX タイムスタンプ(秒)を返す
# * 1000 でミリ秒に変換、int() で整数に変換
from_time_ms = int(from_datetime.timestamp() * 1000)
to_time_ms = int(to_datetime.timestamp() * 1000)
return {
'TargetDate': formatted_date,
'FromTime': from_time_ms, # ← CloudWatch Logs API が要求する形式
'ToTime': to_time_ms, # ← CloudWatch Logs API が要求する形式
'Message': f'Calculated dates for {formatted_date}'
}
# ============================================================
# 3. IAM Role for Step Functions
# ============================================================
# 説明: Step Functions State Machine が AWS 上でアクションを実行するための権限を定義するロール
# Step Functions は Lambda、CloudWatch Logs に対してアクションを実行する必要がある
StateMachineExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
# Step Functions(states.amazonaws.com)がこのロールを引き受ける(Assume)ことを許可
- Effect: 'Allow'
Principal:
Service: !Sub 'states.${AWS::Region}.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 'StateMachinePolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
# 1. Lambda 関数を呼び出す権限
- Effect: 'Allow'
Action:
- 'lambda:InvokeFunction'
Resource: !GetAtt CalculateDateFunction.Arn
# 2. CloudWatch Logs のエクスポートタスク を作成・確認する権限
- Effect: 'Allow'
Action:
- 'logs:CreateExportTask' # ロググループのエクスポート開始
- 'logs:DescribeExportTasks' # エクスポート状態を確認
Resource: '*'
# ============================================================
# 4. Step Functions State Machine
# ============================================================
# 説明: 一連のタスク(状態)を定義するワークフロー
#
# フロー概要:
# 1. CalculateTargetDate → Lambda で日付計算
# 2. PrepareExportConfig → 設定を統合
# 3. ProcessLogGroups (Map) → ロググループごとにループ
# 3-1. CreateExportTask → S3 へのエクスポート開始
# 3-2. WaitBeforeCheck → 30秒待機
# 3-3. DescribeExportTask → 状態確認(リトライ付き)
# 3-4. CheckStatus (Choice) → ステータス判定
# - COMPLETED → 完了(次のロググループへ)
# - RUNNING/PENDING → 待機へ戻る
# - その他 → 失敗
LogExportStateMachine:
Type: 'AWS::StepFunctions::StateMachine'
Properties:
StateMachineName: 'cwl-to-s3-with-lambda-unix'
RoleArn: !GetAtt StateMachineExecutionRole.Arn
DefinitionString: !Sub |
{
"Comment": "Lambda で90日前の日付を動的計算 → UNIX タイムスタンプに変換 → CloudWatch Logs エクスポート",
"StartAt": "CalculateTargetDate",
"States": {
"CalculateTargetDate": {
"Type": "Task",
"Comment": "【ステップ1】Lambda 関数を実行して、90日前の日付と UNIX タイムスタンプを計算",
"Resource": "${CalculateDateFunction.Arn}",
"ResultPath": "$.DateInfo",
"Next": "PrepareExportConfig"
},
"PrepareExportConfig": {
"Type": "Pass",
"Comment": "【ステップ2】Bucket + DateInfo + LogGroups をまとめて、後続ステップで使用する形に統合",
"Parameters": {
"Bucket": "${ExistingS3BucketName}",
"DateInfo.$": "$.DateInfo",
"LogGroups": [
"/aws/lambda/lambda-sns",
"/aws/lambda/transfer-family-lambda-01-GetUserConfigLambda-wrAxVXPHQir4",
"/aws/rds/instance/database-1-recovery/alert"
]
},
"Next": "ProcessLogGroups"
},
"ProcessLogGroups": {
"Type": "Map",
"Comment": "【ステップ3】LogGroups 配列の各要素に対して、以下の処理をループ実行(同時実行1)",
"ItemsPath": "$.LogGroups",
"ItemSelector": {
"LogGroupName.$": "$$.Map.Item.Value",
"Bucket.$": "$.Bucket",
"TargetDate.$": "$.DateInfo.TargetDate",
"FromTime.$": "$.DateInfo.FromTime",
"ToTime.$": "$.DateInfo.ToTime"
},
"MaxConcurrency": 1,
"Iterator": {
"StartAt": "CreateExportTask",
"States": {
"CreateExportTask": {
"Type": "Task",
"Comment": "【3-1】CloudWatch Logs API を呼び出してエクスポートタスクを作成",
"Parameters": {
"Destination.$": "$.Bucket",
"DestinationPrefix.$": "States.Format('{}/{}', $.LogGroupName, $.TargetDate)",
"From.$": "$.FromTime",
"LogGroupName.$": "$.LogGroupName",
"To.$": "$.ToTime"
},
"Resource": "arn:aws:states:::aws-sdk:cloudwatchlogs:createExportTask",
"ResultPath": "$.ExportTaskResult",
"Next": "WaitBeforeCheck",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.ErrorInfo",
"Next": "TaskFailed"
}
]
},
"WaitBeforeCheck": {
"Type": "Wait",
"Comment": "【3-2】エクスポートタスク処理の完了を待つため、30秒スリープ",
"Seconds": 30,
"Next": "DescribeExportTask"
},
"DescribeExportTask": {
"Type": "Task",
"Comment": "【3-3】DescribeExportTasks API を呼び出して、エクスポートタスクの現在の状態を取得",
"Parameters": {
"TaskId.$": "$.ExportTaskResult.TaskId"
},
"Resource": "arn:aws:states:::aws-sdk:cloudwatchlogs:describeExportTasks",
"ResultPath": "$.CheckTaskResult",
"Retry": [
{
"ErrorEquals": ["States.ALL"],
"IntervalSeconds": 30,
"MaxAttempts": 10,
"BackoffRate": 1.0
}
],
"Next": "CheckStatus"
},
"CheckStatus": {
"Type": "Choice",
"Comment": "【3-4】エクスポートタスクのステータスに基づいて分岐",
"Choices": [
{
"Comment": "COMPLETED の場合は完了",
"Variable": "$.CheckTaskResult.ExportTasks[0].Status.Code",
"StringEquals": "COMPLETED",
"Next": "TaskDone"
},
{
"Comment": "RUNNING の場合はポーリング継続",
"Variable": "$.CheckTaskResult.ExportTasks[0].Status.Code",
"StringEquals": "RUNNING",
"Next": "WaitBeforeCheck"
},
{
"Comment": "PENDING の場合もポーリング継続",
"Variable": "$.CheckTaskResult.ExportTasks[0].Status.Code",
"StringEquals": "PENDING",
"Next": "WaitBeforeCheck"
}
],
"Default": "TaskFailed"
},
"TaskDone": {
"Type": "Succeed",
"Comment": "【成功】このロググループのエクスポート完了。次のアイテムへ"
},
"TaskFailed": {
"Type": "Fail",
"Comment": "【失敗】エクスポートが失敗またはキャンセルされた",
"Cause": "Export Task Failed or Cancelled"
}
}
},
"End": true
}
}
}
# ============================================================
# 5. Scheduler Role
# ============================================================
# 説明: EventBridge Scheduler が Step Functions を起動するための権限を定義するロール
SchedulerExecutionRole:
Type: 'AWS::IAM::Role'
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
# EventBridge Scheduler(scheduler.amazonaws.com)がこのロールを引き受ける
- Effect: 'Allow'
Principal:
Service: 'scheduler.amazonaws.com'
Action: 'sts:AssumeRole'
Policies:
- PolicyName: 'SchedulerPolicy'
PolicyDocument:
Version: '2012-10-17'
Statement:
# Step Functions State Machine を起動する権限
- Effect: 'Allow'
Action:
- 'states:StartExecution'
Resource: !GetAtt LogExportStateMachine.Arn
# ============================================================
# 6. EventBridge Scheduler
# ============================================================
# 説明: 毎日 AM 1:00 JST に Step Functions を自動起動するスケジューラ
# Cron 式: cron(0 1 * * ? *)
# 分: 0(00分)
# 時: 1(1時 = AM 1:00)
# 日: *(毎日)
# 月: *(毎月)
# 曜日: ?(指定なし)
# 年: *(毎年)
# → 毎日 1時00分 JST に起動
DailyScheduler:
Type: 'AWS::Scheduler::Schedule'
Properties:
Name: 'cwl-to-s3-daily-scheduler'
Description: '毎日JST AM 1:00にStep Functionsを定期起動するタイマー設定'
FlexibleTimeWindow:
Mode: 'OFF'
ScheduleExpression: 'cron(0 1 * * ? *)'
ScheduleExpressionTimezone: 'Asia/Tokyo'
State: 'ENABLED'
Target:
Arn: !GetAtt LogExportStateMachine.Arn
RoleArn: !GetAtt SchedulerExecutionRole.Arn
# ============================================================
# 出力(デプロイ後に表示される値)
# ============================================================
Outputs:
StateMachineArn:
Description: 'Step Functions State Machine ARN'
Value: !Ref LogExportStateMachine
Export:
Name: !Sub '${AWS::StackName}-StateMachineArn'
LambdaFunctionArn:
Description: 'Lambda Function ARN'
Value: !GetAtt CalculateDateFunction.Arn
Export:
Name: !Sub '${AWS::StackName}-LambdaFunctionArn'
SchedulerName:
Description: 'EventBridge Scheduler Name'
Value: !Ref DailyScheduler
Export:
Name: !Sub '${AWS::StackName}-SchedulerName'💡 実務運用のワンポイント(MaxConcurrency: 1 の重要性)
CloudWatch Logsのエクスポートタスク(CreateExportTask)は、同一アカウント・同一リージョン内で同時に1つしか実行できないというAWSの制限があります。そのため、Step FunctionsのMap状態(ループ処理)のプロパティで “MaxConcurrency”: 1 を指定し、1つのロググループのエクスポートが完全に終わってから次のロググループの処理に進むように制御しています。
動作確認
最初の状態

深夜1時以降

まとめ
Step Functions単体では手が届きにくい「動的な日時・タイムスタンプ計算」をLambdaで補完することで、非常にシンプルかつ堅牢なログローテーション(退避)機構を構築できました。
CloudWatch Logsの長期保管コストに悩んでいる方、手動でのログエクスポートから脱却したい方は、ぜひこのテンプレートをベースに自動化を試してみてください!