Python for AWS
Overview
Python with Boto3 is the primary way to automate AWS infrastructure and security tasks programmatically.
Setup
Installation
pip install boto3 awscli
Configuration
# Configure AWS CLI
aws configure
# Or use environment variables
export AWS_ACCESS_KEY_ID="your_key"
export AWS_SECRET_ACCESS_KEY="your_secret"
export AWS_DEFAULT_REGION="us-east-1"
Basic Boto3 Usage
import boto3
# Create a client
s3 = boto3.client('s3')
# Create a resource (higher-level)
s3_resource = boto3.resource('s3')
# Specify region
ec2 = boto3.client('ec2', region_name='us-west-2')
EC2 Operations
List Instances
import boto3
ec2 = boto3.client('ec2')
def list_instances():
response = ec2.describe_instances()
for reservation in response['Reservations']:
for instance in reservation['Instances']:
print(f"ID: {instance['InstanceId']}")
print(f"State: {instance['State']['Name']}")
print(f"Type: {instance['InstanceType']}")
print("---")
list_instances()
Security Group Audit
def audit_security_groups():
ec2 = boto3.client('ec2')
response = ec2.describe_security_groups()
risky_groups = []
for sg in response['SecurityGroups']:
for rule in sg.get('IpPermissions', []):
for ip_range in rule.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
risky_groups.append({
'GroupId': sg['GroupId'],
'GroupName': sg['GroupName'],
'Port': rule.get('FromPort', 'All')
})
return risky_groups
# Find publicly accessible security groups
risky = audit_security_groups()
for sg in risky:
print(f"Warning: {sg['GroupName']} allows 0.0.0.0/0 on port {sg['Port']}")
S3 Security
List Buckets
def list_buckets():
s3 = boto3.client('s3')
response = s3.list_buckets()
return [bucket['Name'] for bucket in response['Buckets']]
Check Public Buckets
def check_bucket_acl(bucket_name):
s3 = boto3.client('s3')
try:
acl = s3.get_bucket_acl(Bucket=bucket_name)
for grant in acl['Grants']:
grantee = grant['Grantee']
if grantee.get('URI') == 'http://acs.amazonaws.com/groups/global/AllUsers':
return True
except Exception as e:
print(f"Error: {e}")
return False
# Check all buckets
for bucket in list_buckets():
if check_bucket_acl(bucket):
print(f"WARNING: {bucket} is public!")
Enable Bucket Encryption
def enable_bucket_encryption(bucket_name):
s3 = boto3.client('s3')
s3.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'AES256'
}
}]
}
)
print(f"Encryption enabled for {bucket_name}")
IAM Security
List IAM Users
def list_iam_users():
iam = boto3.client('iam')
paginator = iam.get_paginator('list_users')
users = []
for page in paginator.paginate():
users.extend(page['Users'])
return users
Find Users Without MFA
def users_without_mfa():
iam = boto3.client('iam')
users = list_iam_users()
no_mfa = []
for user in users:
mfa_devices = iam.list_mfa_devices(UserName=user['UserName'])
if not mfa_devices['MFADevices']:
no_mfa.append(user['UserName'])
return no_mfa
# Report users without MFA
for user in users_without_mfa():
print(f"No MFA: {user}")
Find Old Access Keys
from datetime import datetime, timezone
def find_old_keys(days=90):
iam = boto3.client('iam')
users = list_iam_users()
old_keys = []
for user in users:
keys = iam.list_access_keys(UserName=user['UserName'])
for key in keys['AccessKeyMetadata']:
age = (datetime.now(timezone.utc) - key['CreateDate']).days
if age > days:
old_keys.append({
'User': user['UserName'],
'KeyId': key['AccessKeyId'],
'Age': age
})
return old_keys
CloudTrail Analysis
Search CloudTrail Events
def search_cloudtrail(event_name, hours=24):
import datetime
cloudtrail = boto3.client('cloudtrail')
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(hours=hours)
events = []
paginator = cloudtrail.get_paginator('lookup_events')
for page in paginator.paginate(
LookupAttributes=[{
'AttributeKey': 'EventName',
'AttributeValue': event_name
}],
StartTime=start_time,
EndTime=end_time
):
events.extend(page['Events'])
return events
# Find console login events
logins = search_cloudtrail('ConsoleLogin')
Lambda Functions
Invoke Lambda
import json
def invoke_lambda(function_name, payload):
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
FunctionName=function_name,
InvocationType='RequestResponse',
Payload=json.dumps(payload)
)
return json.loads(response['Payload'].read())
Best Practices
- Use IAM roles - Avoid hardcoding credentials
- Enable CloudTrail - Audit all API calls
- Paginate results - Handle large datasets properly
- Handle exceptions - AWS API calls can fail
- Use regions - Specify regions explicitly
- Tag resources - Track ownership and purpose