Home CloudAWSAWS SES Suppression List Monitoring and Alerting

AWS SES Suppression List Monitoring and Alerting

by Lakindu Jayasena
17 views 12 mins read
Suppression List Monitoring and Alerting

Amazon Simple Email Service (SES) is a reliable and scalable cloud-based email sending service widely used for transactional and marketing emails. One of the important features of SES is the Suppression List, which automatically prevents sending emails to addresses that have previously resulted in hard bounces or complaints.

While this helps protect your sender reputation, it can also impact business-critical email delivery if important email addresses or domains get added to the suppression list. That’s why implementing AWS SES suppression list monitoring and alerting is essential for maintaining uninterrupted communication with your users and customers.

In this article, I will demonstrate how to set up email alerts using AWS Lambda and AWS EventBridge rules to notify you whenever your important email domains are added to the SES suppression list.

What is the AWS SES Suppression List?

The SES Suppression List is a global list maintained by AWS. It contains recipient email addresses that:

  • Have caused hard bounces (e.g., Invalid or non-existent email addresses).
  • Have generated complaints (when a recipient marks an email as spam).
  • Have the manual added to the list.

When an address is on the suppression list, SES automatically blocks outgoing emails to that recipient to protect your email deliverability and sender reputation.

Why We Need SES Suppression List Monitoring

  • Sometimes legitimate domains like corporate emails, partner accounts, or key customer addresses may be added automatically.
  • Blocked transactional or notification emails like password resets, invoices, alerts)can cause financial and customer satisfaction issues.
  • Identifying false positives early helps maintain domain reputation and ensures compliance with email best practices.
AWS SES Suppression List Monitoring Architecture

Let’s Monitor and Get Alerts for the SES Suppression List

Prerequisites

Before implementing the monitoring system, ensure you have:

  • AWS Account with SES configured and verified
  • IAM permissions for SES, SNS, Lambda, EventBridge, and DynamoDB services
  • AWC CLI tools installed on your laptop and configured
  • Email domains you want to monitor are identified

Create an SNS Topic for Alerts

We’ll use Amazon SNS to send email notifications whenever a critical email address in a specified domain is found on the suppression list.

# Create SNS topic for notifications
aws sns create-topic --name ses-suppression-alerts

Subscribe your alerting recipients.

aws sns subscribe \
    --topic-arn arn:aws:sns:us-east-1:ACCOUNT-ID:ses-suppression-alerts \
    --protocol email \
    --notification-endpoint [email protected]
SNS Topic

Once you subscribe successfully, check your inbox and confirm the subscription. If you are not familiar with AWS CLI commands or if the AWS CLI is not configured on your system, you can use the AWS Management Console to do this task. For more details, you can refer to my previous article: Amazon SES Sending Quota Alerts

Set Up DynamoDB Tracking Table

Create a DynamoDB table to track processed suppressions and prevent duplicate alerts:

Key Benefits of DynamoDB Tracking:

  • Prevents duplicate notifications for the same suppression event
  • Maintains a historical record of suppression patterns
  • Automatic cleanup after 90 days using TTL
  • Serverless and cost-effective storage solution
aws dynamodb create-table \
    --table-name ses-suppression-tracking \
    --attribute-definitions \
        AttributeName=email,AttributeType=S \
        AttributeName=timestamp,AttributeType=S \
    --key-schema \
        AttributeName=email,KeyType=HASH \
        AttributeName=timestamp,KeyType=RANGE \
    --billing-mode PAY_PER_REQUEST \
    --tags Key=Purpose,Value=SESSuppressionMonitoring
Dynamo DB Table

Create an IAM Role with Proper Permissions for a Lambda Function

To access the lambda function with other resources like Cloudwatch, SES Suppression List, DynamoDB, and SNS topic, need to create the following IAM policy.

In the AWS Management Console, go to IAMPoliciesCreate Policy. Select Lambda as the service, switch to the JSON view, and paste the following code.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ses:ListSuppressedDestinations",
                "ses:GetSuppressedDestination"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Query",
                "dynamodb:Scan"
            ],
            "Resource": "arn:aws:dynamodb:*:*:table/ses-suppression-tracking"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sns:Publish"
            ],
            "Resource": "arn:aws:sns:*:*:ses-suppression-alerts"
        }
    ]
}
IAM Policy for Lambda Role

Then move to the Roles section of the IAM and create a new Lambda role, adding the above-created policy.

Lambda Role

Deploy Lambda Monitoring Function

Now we’ll write a Python-based Lambda function that provides the following features:

  • Automated Scanning: Runs every 15 minutes via Amazon EventBridge
  • Domain Filtering: Monitor only specific business-critical domains
  • Duplicate Prevention: DynamoDB tracking prevents repeated alerts
  • Detailed Notifications: Rich alerts with suppression reasons and timestamps
  • Error Handling: Robust error management with fallback notifications
  • Performance Optimized: Handles large suppression lists efficiently
  • Cost-Effective: Serverless architecture with minimal operational costs

From the AWS Management Console, navigate to AWS Lambda and create a new function. Select Python 3.x as the runtime, and choose the option “Create a new role with basic Lambda permissions.”

Once you go inside the Lambda function. Paste the following code snippet under the Code section.

import boto3
import json
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
import os
import traceback
import time

def lambda_handler(event, context):
    
    # Configuration
    MONITORED_DOMAINS = os.environ.get('MONITORED_DOMAINS', 'example.com,yourcompany.com').split(',')
    SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']
    DYNAMODB_TABLE = os.environ.get('DYNAMODB_TABLE', 'ses-suppression-tracking')
    
    # Clean up monitored domains (remove empty strings and whitespace)
    MONITORED_DOMAINS = [d.strip().lower() for d in MONITORED_DOMAINS if d.strip()]
    
    # Initialize AWS clients
    ses_v2 = boto3.client('sesv2')
    dynamodb = boto3.resource('dynamodb')
    sns = boto3.client('sns')
    
    try:

        table = dynamodb.Table(DYNAMODB_TABLE)
        
        # Scan for new suppressions
        new_suppressions = scan_suppression_list_manual(ses_v2, table, MONITORED_DOMAINS)
        
        if new_suppressions:
            print(f"Found {len(new_suppressions)} new suppressions")
            send_suppression_alerts(sns, SNS_TOPIC_ARN, new_suppressions)
            
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': f'Found and alerted on {len(new_suppressions)} new suppressions'
                })
            }
        else:
            print("No new suppressions found")
            return {
                'statusCode': 200,
                'body': json.dumps({'message': 'No new suppressions found'})
            }
            
    except Exception as e:
        print(f"Error in lambda execution: {str(e)}")
        print(f"Error type: {type(e).__name__}")
        traceback.print_exc()
        
        # Send error notification
        try:
            sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Subject='🚨 SES Suppression Monitor Error',
                Message=f'Error in suppression monitoring: {str(e)}\n\nCheck CloudWatch logs for details.'
            )
        except Exception as sns_error:
            print(f"Failed to send error notification: {sns_error}")
            
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

def scan_suppression_list_manual(ses_v2, table, monitored_domains):
    """
    Manually scan SES suppression list using NextToken - NO PAGINATOR USED
    """
    new_suppressions = []
    
    try:
        
        next_token = None
        page_count = 0
        total_processed = 0
        max_pages = 50
        start_time = time.time()
        
        while True:
            page_count += 1
            current_time = time.time()
            elapsed_time = current_time - start_time
            
            # Timeout protection
            if elapsed_time > 270:
                print(f"Approaching timeout limit. Processed {page_count-1} pages in {elapsed_time:.1f} seconds")
                break
                
            if page_count > max_pages:
                print(f"Reached maximum page limit ({max_pages})")
                break
            
            print(f"Processing page {page_count} (elapsed: {elapsed_time:.1f}s)...")
            
            # Build parameters for API call
            api_params = {}
            if next_token:
                api_params['NextToken'] = next_token
                print(f"Using NextToken: {next_token[:20]}...")
            
            try:
                api_response = ses_v2.list_suppressed_destinations(**api_params)
                
                if not api_response:
                    print("Empty response from API")
                    break
                
                # Get suppressions from response
                suppressions_list = api_response.get('SuppressedDestinationSummaries', [])
                print(f"Page {page_count}: Processing {len(suppressions_list)} suppressions")
                
                # Process each suppression in this page
                for suppression_item in suppressions_list:
                    total_processed += 1
                    
                    # Validate item
                    required_fields = ['EmailAddress', 'Reason', 'LastUpdateTime']
                    if not all(field in suppression_item for field in required_fields):
                        print(f"Skipping item with missing fields: {suppression_item}")
                        continue
                    
                    email_address = suppression_item['EmailAddress']
                    
                    # Validate email
                    if '@' not in email_address:
                        print(f"Skipping invalid email: {email_address}")
                        continue
                    
                    # Extract domain
                    email_domain = email_address.split('@')[1].lower()
                    
                    # Check if domain is monitored
                    if email_domain in monitored_domains:
                        #Enable only for debug purposes
                        #print(f"Found monitored domain suppression: {email_address}")
                        
                        # Check if already processed
                        if not check_already_processed(table, email_address, suppression_item['LastUpdateTime']):
                            
                            # Create suppression info
                            suppression_data = {
                                'email': email_address,
                                'domain': email_domain,
                                'reason': suppression_item['Reason'],
                                'lastUpdate': suppression_item['LastUpdateTime'].isoformat()
                            }
                            
                            new_suppressions.append(suppression_data)
                            
                            # Mark as processed
                            record_as_processed(table, email_address, suppression_item['LastUpdateTime'], suppression_item['Reason'])
                            
                            #Enable only debug purposes
                            #print(f"NEW: {email_address} ({suppression_item['Reason']})")
                        else:
                            print(f"Already processed: {email_address}")
                
            except ClientError as client_error:
                error_code = client_error.response['Error']['Code']
                error_message = client_error.response['Error']['Message']
                print(f"AWS API ClientError on page {page_count}: {error_code} - {error_message}")
                
                if error_code in ['AccessDenied', 'UnauthorizedOperation']:
                    raise client_error
                else:
                    print("Continuing despite ClientError...")
                    break
                    
            except Exception as other_error:
                print(f"Unexpected error on page {page_count}: {str(other_error)}")
                print(f"Error type: {type(other_error).__name__}")
                traceback.print_exc()
                raise other_error
            
            # Check for next page
            next_token = api_response.get('NextToken')
            if not next_token:
                print("No more pages (no NextToken)")
                break
            
            print(f"Has NextToken, continuing to next page...")
        
        total_time = time.time() - start_time
        print(f"Manual scan complete: {page_count} pages, {total_processed} total, {len(new_suppressions)} new matches")
        print(f"Execution time: {total_time:.1f} seconds")
        
    except Exception as scan_error:
        print(f"Error in manual scan: {str(scan_error)}")
        print(f"Error type: {type(scan_error).__name__}")
        traceback.print_exc()
        raise scan_error
    
    return new_suppressions

def check_already_processed(table, email, timestamp):
    """
    Check if suppression already processed
    """
    try:
        timestamp_str = timestamp.isoformat() if hasattr(timestamp, 'isoformat') else str(timestamp)
        
        response = table.get_item(
            Key={
                'email': email,
                'timestamp': timestamp_str
            }
        )
        
        return 'Item' in response
        
    except Exception as e:
        print(f"Error checking if processed: {e}")
        return False

def record_as_processed(table, email, timestamp, reason):
    """
    Record suppression as processed
    """
    try:
        timestamp_str = timestamp.isoformat() if hasattr(timestamp, 'isoformat') else str(timestamp)
        ttl_timestamp = int((datetime.now() + timedelta(days=90)).timestamp())
        
        table.put_item(
            Item={
                'email': email,
                'timestamp': timestamp_str,
                'reason': reason,
                'processed_at': datetime.now().isoformat(),
                'ttl': ttl_timestamp,
                'domain': email.split('@')[1].lower()
            }
        )
        
    except Exception as e:
        print(f"Error recording as processed: {e}")

def send_suppression_alerts(sns, topic_arn, suppressions):
    """
    Send suppression alerts
    """
    try:
        if not suppressions:
            return
            
        # Group by domain
        domains = {}
        for item in suppressions:
            domain = item['domain']
            if domain not in domains:
                domains[domain] = []
            domains[domain].append(item)
        
        # Send alerts for each domain
        for domain, domain_suppressions in domains.items():
            
            message_parts = [
                f"🚨 SES SUPPRESSION ALERT - {domain.upper()}",
                f"New suppressions: {len(domain_suppressions)}",
                f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}",
                "",
                "Suppressed emails:"
            ]
            
            for item in domain_suppressions:
                message_parts.append(f"• {item['email']} ({item['reason']})")
            
            message_parts.extend([
                "",
                "⚠️ These emails will no longer receive messages from SES.",
                "Review your email practices and consider list hygiene improvements."
            ])
            
            alert_message = "\n".join(message_parts)
            
            sns.publish(
                TopicArn=topic_arn,
                Subject=f'🚨 SES Alert - {domain} ({len(domain_suppressions)} suppressions)',
                Message=alert_message
            )
            
            print(f"Alert sent for {domain}")
        
    except Exception as e:
        print(f"Error sending alerts: {e}")
        try:
            sns.publish(
                TopicArn=topic_arn,
                Subject='🚨 SES Monitor Error',
                Message=f'Alert error: {str(e)}\nFound {len(suppressions)} new suppressions but failed to send detailed alerts.'
            )
        except:
            pass
Create Lambda Function

Next, go to the General Configuration section under Configuration. Set the timeout to 5 minutes (300 seconds) and allocate 256 MB of memory.

Lambda Function General Configuration

Then configure the Environment Variables for the following Keys.

  • DYNAMODB_TABLE
  • MONITORED_DOMAINS (comma-separated domains you need to monitor)
  • SNS_TOPIC_ARN
Lambda Function Environment Variables

Trigger the Lambda function & Testing

Open the Test tab in your Lambda function, enter a name for the test event, and click Test to execute the function.

Lambda Function Testing

If the function executes successfully and any suppressed emails are found for the specified domains, the logs will display results similar to the following.

Lambda Function Testing Logs

To avoid running this task manually, you can automate it using Amazon EventBridge Rules. By creating a scheduled trigger, you can configure AWS to invoke the Lambda function automatically at fixed intervals—for example, every 15 minutes. This ensures continuous monitoring of the SES Suppression List without any manual intervention. Let’s now set up and automate this monitoring process.

Create EventBridge Rule (for Scheduled Trigger)

In the Amazon EventBridge console, create a new rule and configure it with a schedule.

Amazon Eventbridge Scheduling

Next, configure the schedule as needed. In this example, I will set it to invoke the Lambda function every 15 minutes.

Amazon Eventbridge Scheduling

Add the target as the Lambda function you created and assign the necessary permissions to allow EventBridge to invoke it. In this case, I am creating a new role for this purpose.

Amazon Eventbridge Scheduling

Test the Setup with Suppressed Email

To verify that your monitoring setup works correctly, add a test email address to the SES Suppression List. Make sure the test email belongs to one of the domains you are monitoring.

Testing - Add Email to Suppression List

After adding the test email, wait until the next scheduled EventBridge rule triggers your Lambda function. Once the function runs, it will detect the suppressed address and send out an alert. You should then receive an email notification from Amazon SNS similar to the example shown below.

Suppression List Alert

In the meantime, you can also verify the record in your DynamoDB table, where the suppressed email entry is stored. The Lambda function will not process or notify you about this specific suppressed address again. This prevents duplicate alerts and ensures each suppression event is captured only once.

Dynamo DB Table Data

Conclusion

The AWS SES Suppression List is a safeguard for your sender reputation, but unmanaged suppression can disrupt critical business communications. By implementing suppression list monitoring and alerting with SNS, EventBridge, and Lambda, organizations can ensure that important recipients are never missed.

Related Articles

Leave a Comment

* By using this form you agree with the storage and handling of your data by this website.