ASG tags

来自linuxsa wiki
跳转到导航 跳转到搜索


import boto3
from datetime import datetime

autoscaling = boto3.client('autoscaling')

# ===== 配置区 =====
ASG_NAMES = [
    'asg-prod-web',
    'asg-prod-api',
    'asg-canary'
]

TAG_KEY = 'DailyTag'
TAG_VALUE = datetime.now().strftime('%Y-%m-%d')  # 
PROPAGATE_AT_LAUNCH = True


def get_asg_tag(asg_name, key):
    """获取 ASG 当前 Tag 值"""
    try:
        response = autoscaling.describe_auto_scaling_groups(
            AutoScalingGroupNames=[asg_name]
        )
        tags = response['AutoScalingGroups'][0].get('Tags', [])
        for tag in tags:
            if tag['Key'] == key:
                return tag['Value']
    except Exception:
        return None

    return None


def update_asg_tag(asg_name, key, value):
    """更新单个 ASG Tag"""
    autoscaling.create_or_update_tags(
        Tags=[
            {
                'ResourceId': asg_name,
                'ResourceType': 'auto-scaling-group',
                'Key': key,
                'Value': value,
                'PropagateAtLaunch': PROPAGATE_AT_LAUNCH
            }
        ]
    )


def lambda_handler(event, context):
    results = []

    for asg in ASG_NAMES:
        try:
            current_value = get_asg_tag(asg, TAG_KEY)

            if current_value == TAG_VALUE:
                print(f"{asg}: Tag already up-to-date ({TAG_VALUE}), skip.")
                results.append(f"{asg}: SKIP")
                continue

            update_asg_tag(asg, TAG_KEY, TAG_VALUE)
            print(f"{asg}: Updated tag {TAG_KEY}={TAG_VALUE}")
            results.append(f"{asg}: SUCCESS")

        except Exception as e:
            print(f"{asg}: ERROR - {str(e)}")
            results.append(f"{asg}: ERROR")

    return {
        'statusCode': 200,
        'body': {
            'tag_key': TAG_KEY,
            'tag_value': TAG_VALUE,
            'results': results
        }
    }



##**************************************************************

ASG_LIST = ['asg-prod', 'asg-canary']

for asg in ASG_LIST:
    autoscaling.create_or_update_tags(...)





import boto3
from datetime import datetime

autoscaling = boto3.client('autoscaling')

ASG_NAME = 'my-asg-name'
TAG_KEY = 'Env'
TAG_VALUE = 'prod-' + datetime.now().strftime('%Y%m%d')


def lambda_handler(event, context):
    try:
        response = autoscaling.create_or_update_tags(
            Tags=[
                {
                    'ResourceId': ASG_NAME,
                    'ResourceType': 'auto-scaling-group',
                    'Key': TAG_KEY,
                    'Value': TAG_VALUE,
                    'PropagateAtLaunch': True
                }
            ]
        )

        print(f"Updated ASG tag: {TAG_KEY}={TAG_VALUE}")
        return {
            'statusCode': 200,
            'body': f"Successfully updated {ASG_NAME} tag"
        }

    except Exception as e:
        print(f"Error updating ASG tag: {str(e)}")
        raise