ASG tags
跳转到导航
跳转到搜索
import boto3
from datetime import datetime
autoscaling = boto3.client('autoscaling')
# ===== 配置区 =====
ASG_NAMES = [
'asg-prod-web',
'asg-prod-api',
'asg-canary'
]
TAG_KEY = 'DailyTag'
TAG_VALUE = datetime.now().strftime('%Y-%m-%d') #
PROPAGATE_AT_LAUNCH = True
def get_asg_tag(asg_name, key):
"""获取 ASG 当前 Tag 值"""
try:
response = autoscaling.describe_auto_scaling_groups(
AutoScalingGroupNames=[asg_name]
)
tags = response['AutoScalingGroups'][0].get('Tags', [])
for tag in tags:
if tag['Key'] == key:
return tag['Value']
except Exception:
return None
return None
def update_asg_tag(asg_name, key, value):
"""更新单个 ASG Tag"""
autoscaling.create_or_update_tags(
Tags=[
{
'ResourceId': asg_name,
'ResourceType': 'auto-scaling-group',
'Key': key,
'Value': value,
'PropagateAtLaunch': PROPAGATE_AT_LAUNCH
}
]
)
def lambda_handler(event, context):
results = []
for asg in ASG_NAMES:
try:
current_value = get_asg_tag(asg, TAG_KEY)
if current_value == TAG_VALUE:
print(f"{asg}: Tag already up-to-date ({TAG_VALUE}), skip.")
results.append(f"{asg}: SKIP")
continue
update_asg_tag(asg, TAG_KEY, TAG_VALUE)
print(f"{asg}: Updated tag {TAG_KEY}={TAG_VALUE}")
results.append(f"{asg}: SUCCESS")
except Exception as e:
print(f"{asg}: ERROR - {str(e)}")
results.append(f"{asg}: ERROR")
return {
'statusCode': 200,
'body': {
'tag_key': TAG_KEY,
'tag_value': TAG_VALUE,
'results': results
}
}
##**************************************************************
ASG_LIST = ['asg-prod', 'asg-canary']
for asg in ASG_LIST:
autoscaling.create_or_update_tags(...)
import boto3
from datetime import datetime
autoscaling = boto3.client('autoscaling')
ASG_NAME = 'my-asg-name'
TAG_KEY = 'Env'
TAG_VALUE = 'prod-' + datetime.now().strftime('%Y%m%d')
def lambda_handler(event, context):
try:
response = autoscaling.create_or_update_tags(
Tags=[
{
'ResourceId': ASG_NAME,
'ResourceType': 'auto-scaling-group',
'Key': TAG_KEY,
'Value': TAG_VALUE,
'PropagateAtLaunch': True
}
]
)
print(f"Updated ASG tag: {TAG_KEY}={TAG_VALUE}")
return {
'statusCode': 200,
'body': f"Successfully updated {ASG_NAME} tag"
}
except Exception as e:
print(f"Error updating ASG tag: {str(e)}")
raise