Nono.MA

Add metrics to CloudWatch

FEBRUARY 10, 2023

Here's a Python class that can track and push metrics to AWS CloudWatch.

Metrics are reset to their initial values on creation and when metrics are uploaded to CloudWatch.

# metrics.py
'''
A metrics class ready to track and push metrics to AWS CloudWatch.
'''

from datetime import datetime
import os
import boto3


# CloudWatch metrics namespace.
METRICS_NAMESPACE = 'my_metrics_namespace'

# Duration to wait between metric uploads.
METRICS_UPLOAD_THRESHOLD_SECONDS = 50


class Metrics:
    '''
    Holds metrics, serializes them to CloudWatch format,
    and ingests foreign metric values.
    '''

    def __init__(self):
        self.reset()

    def reset(self):
        '''
        Resets metric values and last upload time.
        '''
        self.last_upload_time = datetime.now()
        # Your custom metrics and initial values
        # Note that here we're using 'my_prefix' as
        # a custom prefix in case you want this class
        # to add a prefix namespace to all its metrics.
        self.my_prefix_first_metric = 0
        self.my_prefix_second_metric = 0

    def to_data(self):
        '''
        Serializes metrics and their values.
        '''
        def to_cloudwatch_format(name, value):
            return {'MetricName': name, 'Value': value}

        result = []
        for name, value in vars(self).items():
            if name != 'last_upload_time':
                result.append(to_cloudwatch_format(name, value))
        return result

    def ingest(self, metrics, prefix=''):
        '''
        Adds foreign metric values to this metrics object.
        '''
        input_metric_names = [attr for attr in dir(metrics)
                              if not callable(getattr(metrics, attr))
                              and not attr.startswith("__")]

        # Iterate through foreign keys and add metric values.
        for metric_name in input_metric_names:

            # Get value of foreign metric.
            input_metric_value = getattr(metrics, metric_name)

            # Get metric key.
            metric_key = f'{prefix}_{metric_name}'

            # Get metric value.
            metric_value = getattr(self, metric_key)

            # Add foreign values to this metrics object.
            setattr(
              self,
              metric_key,
              input_metric_value + metric_value
            )

    def upload(self, force=False):
        '''
        Uploads metrics to CloudWatch when time since last
        upload is above a duration or when forced.
        '''

        # Get time elapsed since last upload.
        seconds_since_last_upload = \
            (datetime.now() - self.last_upload_time).seconds

        # Only upload if duration is greater than threshold,
        # or when the force flag is set to True.
        if seconds_since_last_upload > 50 or force:
            # Upload metrics to CloudWatch.
            cloudwatch = boto3.client(
                           'cloudwatch',
                           os.getenv('AWS_REGION')
                         )
            cloudwatch.put_metric_data(
                Namespace=METRICS_NAMESPACE,
                MetricData=self.to_data()
            )
            # Reset metrics.
            self.reset()


To use this class, we just have to instantiate a metrics object, track some metrics, and upload them.

# Create a metrics object.
metrics = Metrics()

# Add values to its metrics.
metrics.my_prefix_first_metric += 3
metrics.my_prefix_second_metric += 1

# Upload metrics to CloudWatch.
metrics.upload(force=True)

If you were processing metrics at a fast pace, you don't want to upload metrics every single time you increase their value, as otherwise CloudWatch will complain. In certain cases, AWS CloudWatch's limit is 5 transactions per second (TPS) per account or AWS Region. When this limit is reached, you'll receive a RateExceeded throttling error.

By calling metrics.upload(force=False) we only upload once every METRICS_UPLOAD_THRESHOLD_SECONDS. (In this example, at maximum every 50 seconds.)

import time

# Create a metrics object.
metrics = Metrics()

for i in range(0, 100, 1):
    # Wait for illustration purposes,
    # as if we were doing work.
    time.sleep(1)

    # Add values to its metrics.
    metrics.my_prefix_first_metric += 3
    metrics.my_prefix_second_metric += 1

    # Only upload if more than the threshold
    # duration has passed since we last uploaded.
    metrics.upload()

# Force-upload metrics to CloudWatch once we're done.
metrics.upload(force=True)

Lastly, here's how to ingest foreign metrics with or without a prefix.

# We define a foreign metrics class.
class OtherMetrics:

    def __init__(self):
        self.reset()

    def reset(self):
        # Note that here we don't have 'my_prefix'.
        self.first_metric = 0
        self.second_metric = 0

# We instantiate both metric objects.
metrics = Metrics()
other_metrics = OtherMetrics()

# The foreign metrics track values.
other_metrics.first_metric += 15
other_metrics.second_metric += 3

# Then our main metrics class ingests those metrics.
metrics.ingest(other_metrics, prefix='my_prefix')

# Then our main metrics class has those values.
print(metrics.my_prefix_first_metric)
# Returns 15

print(metrics.my_prefix_second_metric)
# Returns 3

If you found this useful, let me know!


Take a look at other posts about code, Python, and Today I Learned(s).

BlogCodePythonAwsTil