Nono.MA

FEBRUARY 10, 2023

Here's a Python class that can track and push metrics to AWS CloudWatch.

Metrics are reset to their initial values on creation and when metrics are uploaded to CloudWatch.

# metrics.py
'''
A metrics class ready to track and push metrics to AWS CloudWatch.
'''

from datetime import datetime
import os
import boto3


# CloudWatch metrics namespace.
METRICS_NAMESPACE = 'my_metrics_namespace'

# Duration to wait between metric uploads.
METRICS_UPLOAD_THRESHOLD_SECONDS = 50


class Metrics:
    '''
    Holds metrics, serializes them to CloudWatch format,
    and ingests foreign metric values.
    '''

    def __init__(self):
        self.reset()

    def reset(self):
        '''
        Resets metric values and last upload time.
        '''
        self.last_upload_time = datetime.now()
        # Your custom metrics and initial values
        # Note that here we're using 'my_prefix' as
        # a custom prefix in case you want this class
        # to add a prefix namespace to all its metrics.
        self.my_prefix_first_metric = 0
        self.my_prefix_second_metric = 0

    def to_data(self):
        '''
        Serializes metrics and their values.
        '''
        def to_cloudwatch_format(name, value):
            return {'MetricName': name, 'Value': value}

        result = []
        for name, value in vars(self).items():
            if name != 'last_upload_time':
                result.append(to_cloudwatch_format(name, value))
        return result

    def ingest(self, metrics, prefix=''):
        '''
        Adds foreign metric values to this metrics object.
        '''
        input_metric_names = [attr for attr in dir(metrics)
                              if not callable(getattr(metrics, attr))
                              and not attr.startswith("__")]

        # Iterate through foreign keys and add metric values.
        for metric_name in input_metric_names:

            # Get value of foreign metric.
            input_metric_value = getattr(metrics, metric_name)

            # Get metric key.
            metric_key = f'{prefix}_{metric_name}'

            # Get metric value.
            metric_value = getattr(self, metric_key)

            # Add foreign values to this metrics object.
            setattr(
              self,
              metric_key,
              input_metric_value + metric_value
            )

    def upload(self, force=False):
        '''
        Uploads metrics to CloudWatch when time since last
        upload is above a duration or when forced.
        '''

        # Get time elapsed since last upload.
        seconds_since_last_upload = \
            (datetime.now() - self.last_upload_time).seconds

        # Only upload if duration is greater than threshold,
        # or when the force flag is set to True.
        if seconds_since_last_upload > 50 or force:
            # Upload metrics to CloudWatch.
            cloudwatch = boto3.client(
                           'cloudwatch',
                           os.getenv('AWS_REGION')
                         )
            cloudwatch.put_metric_data(
                Namespace=METRICS_NAMESPACE,
                MetricData=self.to_data()
            )
            # Reset metrics.
            self.reset()


To use this class, we just have to instantiate a metrics object, track some metrics, and upload them.

# Create a metrics object.
metrics = Metrics()

# Add values to its metrics.
metrics.my_prefix_first_metric += 3
metrics.my_prefix_second_metric += 1

# Upload metrics to CloudWatch.
metrics.upload(force=True)

If you were processing metrics at a fast pace, you don't want to upload metrics every single time you increase their value, as otherwise CloudWatch will complain. In certain cases, AWS CloudWatch's limit is 5 transactions per second (TPS) per account or AWS Region. When this limit is reached, you'll receive a RateExceeded throttling error.

By calling metrics.upload(force=False) we only upload once every METRICS_UPLOAD_THRESHOLD_SECONDS. (In this example, at maximum every 50 seconds.)

import time

# Create a metrics object.
metrics = Metrics()

for i in range(0, 100, 1):
    # Wait for illustration purposes,
    # as if we were doing work.
    time.sleep(1)

    # Add values to its metrics.
    metrics.my_prefix_first_metric += 3
    metrics.my_prefix_second_metric += 1

    # Only upload if more than the threshold
    # duration has passed since we last uploaded.
    metrics.upload()

# Force-upload metrics to CloudWatch once we're done.
metrics.upload(force=True)

Lastly, here's how to ingest foreign metrics with or without a prefix.

# We define a foreign metrics class.
class OtherMetrics:

    def __init__(self):
        self.reset()

    def reset(self):
        # Note that here we don't have 'my_prefix'.
        self.first_metric = 0
        self.second_metric = 0

# We instantiate both metric objects.
metrics = Metrics()
other_metrics = OtherMetrics()

# The foreign metrics track values.
other_metrics.first_metric += 15
other_metrics.second_metric += 3

# Then our main metrics class ingests those metrics.
metrics.ingest(other_metrics, prefix='my_prefix')

# Then our main metrics class has those values.
print(metrics.my_prefix_first_metric)
# Returns 15

print(metrics.my_prefix_second_metric)
# Returns 3

If you found this useful, let me know!


Take a look at other posts about code, Python, and Today I Learned(s).

LAST UPDATED FEBRUARY 9, 2023

Here's how to sort a Python dictionary by a key, a property name, of its items. Check this post if you're looking to sort a list of lists instead.

# A list of people
people = [
    {'name': 'Nono', 'age': 32, 'location': 'Spain'},
    {'name': 'Alice', 'age': 20, 'location': 'Wonderland'},
    {'name': 'Phillipe', 'age': 100, 'location': 'France'},
    {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
]

# Sort people by age, ascending
people_sorted_by_age_asc = sorted(people, key=lambda x: x['age'])
print(people_sorted_by_age_asc)
# [
#     {'name': 'Alice', 'age': 20, 'location': 'Wonderland'},
#     {'name': 'Nono', 'age': 32, 'location': 'Spain'},
#     {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
#     {'name': 'Phillipe', 'age': 100, 'location': 'France'}
# ]

# Sort people by age, descending
people_sorted_by_age_desc = sorted(people, key=lambda x: -x['age'])
print(people_sorted_by_age_desc)
# [
#     {'name': 'Phillipe', 'age': 100, 'location': 'France'},
#     {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
#     {'name': 'Nono', 'age': 32, 'location': 'Spain'},
#     {'name': 'Alice', 'age': 20, 'location': 'Wonderland'}
# ]

# Sort people by name, ascending
people_sorted_by_name_desc = sorted(people, key=lambda x: x['name'])
print(people_sorted_by_name_desc)
# [
#     {'name': 'Alice', 'age': 20, 'location': 'Wonderland'},
#     {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
#     {'name': 'Nono', 'age': 32, 'location': 'Spain'},
#     {'name': 'Phillipe', 'age': 100, 'location': 'France'}
# ]

LAST UPDATED FEBRUARY 2, 2023

You can measure the time elapsed during the execution of Python commands by keeping a reference to the start time and then subtracting the current time at any point on your program from that start time to obtain the duration between two points in time.

from datetime import datetime
import time

# Define the start time.
start = datetime.now()

# Run some code..
time.sleep(2)

# Get the time delta since the start.
elapsed = datetime.now() - start
# datetime.timedelta(seconds=2, microseconds=005088)
# 0:00:02.005088

# Get the seconds since the start.
elapsed_seconds = elapsed.seconds
# 2

Let's create two helper functions to get the current time (i.e. now) and the elapsed time at any moment.

# Returns current time
# (and, if provided, prints the event's name)
def now(eventName = ''):
  if eventName:
    print(f'Started {eventName}..')
  return datetime.now()

# Store current time as `start`
start = now()

# Returns time elapsed since `beginning`
# (and, optionally, prints the duration in seconds)
def elapsed(beginning = start, log = False):
  duration = datetime.now() - beginning;
  if log:
    print(f'{duration.seconds}s')
  return duration

With those utility functions defined, we can measure the duration of different events.

# Define time to wait
wait_seconds = 2

# Measure duration (while waiting for 2 seconds)
beginning = now(f'{wait_seconds}-second wait.')

# Wait.
time.sleep(wait_seconds)

# Get time delta.
elapsed_time = elapsed(beginning, True)
# Prints 0:00:02.004004

# Get seconds.
elapsed_seconds = elapsed_time.seconds
# Prints 2

# Get microseconds.
elapsed_microseconds = elapsed_time.microseconds
# Prints 4004

Before you go

If you found this useful, you might want to join my mailing lists; or take a look at other posts about code, Python, React, and TypeScript.

LAST UPDATED FEBRUARY 6, 2023

Here's how to sort a Python list by a key of its items. Check this post if you're looking to sort a list of dictionaries instead.

# A list of people
# name, age, location
people = [
    ['Nono', 32, 'Spain'],
    ['Alice', 20, 'Wonderland'],
    ['Phillipe', 100, 'France'],
    ['Jack', 45, 'Caribbean'],
]

# Sort people by age, ascending
people_sorted_by_age_asc = sorted(people, key=lambda x: x[1])
# [
#     ['Alice', 20, 'Wonderland'],
#     ['Nono', 32, 'Spain'],
#     ['Jack', 45, 'Caribbean'],
#     ['Phillipe', 100, 'France']
# ]

# Sort people by age, descending
people_sorted_by_age_desc = sorted(people, key=lambda x: -x[1])
# [
#     ['Phillipe', 100, 'France'],
#     ['Jack', 45, 'Caribbean'],
#     ['Nono', 32, 'Spain'],
#     ['Alice', 20, 'Wonderland']
# ]

# Sort people by name, ascending
people_sorted_by_name_desc = sorted(people, key=lambda x: x[0])
# [
#     ['Alice', 20, 'Wonderland'],
#     ['Jack', 45, 'Caribbean'],
#     ['Nono', 32, 'Spain'],
#     ['Phillipe', 100, 'France']
# ]

LAST UPDATED FEBRUARY 3, 2023

Here's how to read contents from a comma-separated value (CSV) file in Python; maybe a CSV that already exists or a CSV you saved from Python.

Read CSV and print rows

import csv

csv_file_path = 'file.csv'

with open(csv_file_path, encoding='utf-8') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')

    # Print the first five rows
    for row in list(csv_reader)[:5]:
        print(row)

    # Print all rows
    for row in list(csv_reader)[:5]:
        print(row)

FEBRUARY 2, 2023

Here's how to generate pseudo-random numbers in Python.

import random

# Random generation seed for reproducible results
seed = 42

# Float
random.Random(seed).uniform(3,10)
# 7.475987589205186

# Integer
int(random.Random(seed).uniform(3,10))
# 7

# Integer
random.Random(seed).randint(0, 999)
# 654

See the random module for more information.

JANUARY 27, 2023

Leading zeros are extra zeros to the left of a number when you want to have a regular amount of digits in a set of numbers. For instance, 0001, 0002, and 0003 is a good formatting if you think you'll get to have thousands of entries, as you can stay at four digits up to 9999.

# Define your number
number = 1

two_digits = f'{number:02d}'
# 01

four_digits = f'{number:04d}'
# 0001

We use the Python formatting helper {my_number:04d} to enforce a minimum set of digits in our number variable. This means you can use it to set the value of a string or to create or print a longer string with that number, not necessarily having to store its value.

a_number = 42
print(f'The number is {a_number:06d}.')
# The number is 000042.

print(f'The number is {512:06d}.')
# The number is 000512.

JANUARY 26, 2023

The e flag/option of pip "installs a project in editable mode (i.e. setuptools “develop mode”) from a local project path or a VCS url."

pip install -e .

-e, --editable <path/url>

As described in the expanded command flag, -e stands for editable.

JANUARY 20, 2023

Today I learned you can use the plus (+) operator to concatenate or extend lists in Python.

Say you have two lists.

list_a = [1, 2, 3]
list_b = ['Nono', 'MA']

And that you want to create a continuous list with the contents of both, which would look something like [1, 2, 3, 'Nono', 'MA'].

You can simple add both lists to obtain that result.

>>> combined_list = [1, 2, 3] + ['Nono', 'MA']
>>> combined_list
[1, 2, 3, 'Nono', 'MA']

Of course, it doesn't too much sense in this example because we're explicitly defining the lists and we could define a combined list directly.

combined_list = [1, 2, 3, 'Nono', 'MA']

But it can be useful when we actually need to add lists, for instance to concatenate the results of glob file listing operations.

>>> from glob import glob
>>> files_a = glob('a/*')
>>> files_a
['a/file.txt', 'a/image.jpg']
>>> files_b = glob('b/*')
>>> files_b
['b/data.json', 'b/profile.jpeg']
>>> all_files = files_a + files_b
>>> all_files
['a/file.txt', 'a/image.jpg', 'b/data.json', 'b/profile.jpeg']

NOVEMBER 21, 2022


How to encode an image dataset to reduce its dimensionality and visualize it in the 2D space.


See transcript ›

NOVEMBER 16, 2022

Here's how to translate 3d points in Python using a translation matrix.


To translate a series of points in three dimensions in Cartesian space (x, y, z) you first need to "homogenize" the points by adding a value to their projective dimension—which we'll set to one to maintain the point's original coordinates, and then multiply our point cloud using NumPy's np.matmul method by a transformation matrix constructed from a (4, 4) identity matrix with three translation parameters in its bottom row (tx, ty, tz).

Steps

Here's a breakdown of the steps.

  • Import the NumPy Python library
  • Define a point cloud with Cartesian coordinates (x, y, z)
  • Convert the points to homogeneous coordinates (x, y, z, w)
  • Define our translation parameters (tx, ty, tz)
  • Construct the translation matrix
  • Multiply the homogenized point cloud by the transformation matrix with NumPy's np.matmul

Code

# translate.py
import numpy as np

# Define a set of Cartesian (x, y, z) points
point_cloud = [
    [0, 0, 0],
    [1, 0, 0],
    [0, 1, 0],
    [0, 0, 1],
    [1, 1, 1],
    [1, 2, 3],
]

# Convert to homogeneous coordinates
point_cloud_homogeneous = []
for point in point_cloud:
    point_homogeneous = point.copy()
    point_homogeneous.append(1)
    point_cloud_homogeneous.append(point_homogeneous)

# Define the translation
tx = 2
ty = 10
tz = 100

# Construct the translation matrix
translation_matrix = [
    [1, 0, 0, 0],
    [0, 1, 0, 0],
    [0, 0, 1, 0],
    [tx, ty, tz, 1],
]

# Apply the transformation to our point cloud
translated_points = np.matmul(
    point_cloud_homogeneous,
    translation_matrix)

# Convert to cartesian coordinates
translated_points_xyz = []
for point in translated_points:
    point = np.array(point[:-1])
    translated_points_xyz.append(point)

# Map original to translated point coordinates
# (x0, y0, z0) → (x1, y1, z1)
for i in range(len(point_cloud)):
    point = point_cloud[i]
    translated_point = translated_points_xyz[i]
    print(f'{point} → {list(translated_point)}')

NOVEMBER 12, 2022

If you try to serialize a NumPy array to JSON in Python, you'll get the error below.

TypeError: Object of type ndarray is not JSON serializable

Luckily, NumPy has a built-in method to convert one- or multi-dimensional arrays to lists, which are in turn JSON serializable.

import numpy as np
import json

# Define your NumPy array
arr = np.array([[100,200],[300,400]])

# Convert the array to list
arr_as_list = arr.tolist()

# Serialize as JSON
json.dumps(arr_as_list)
# '[[100, 200], [300, 400]]'

NOVEMBER 11, 2022

Here's the error I was getting when trying to return a NumPy ndarray in the response body of an AWS Lambda function.

Object of type ndarray is not JSON serializable

Reproduce the error

import numpy as np
import json

# A NumPy array
arr = np.array([[1,2,3],[4,5,6]])
        .astype(np.float64)

# Serialize the array
json.dumps(arr)
# TypeError: Object of type ndarray is not JSON serializable

Solution

NumPy arrays provide a built-in method to convert them to lists called .tolist().

import numpy as np
import json

# A NumPy array
arr = np.array([[1,2,3],[4,5,6.78]])
        .astype(np.float64)

# Convert the NumPy array to a list
arr_as_list = arr.tolist()

# Serialize the list
json.dumps(arr_as_list)

NOVEMBER 9, 2022


How to use TensorFlow inside of a Docker container.


See transcript ›

OCTOBER 1, 2022

I've had conda's initialization code in my .zshrc file for a long time. I've used it on my former Intel and M1 Macs, but it was just recently that I migrated my code to a new M1 Max Apple Silicon Mac. When I start a new Terminal window, I see how a Python process takes up to 5–10 seconds to finalize before the Terminal becomes responsive. I'm used to hitting CMD + N for a new window or CMT + T for a new tab and starting to type immediately. But this issue breaks my workflow and keeps me hanging for a few seconds per new window (!).

Here's my initialization code, auto-generated by Anaconda.

# >>> conda initialize >>>
!! Contents within this block are managed by 'conda init' !!
__conda_setup="$('/Users/nono/anaconda3/bin/conda' 'shell.zsh' 'hook' 2> /dev/null)"
if [ $? -eq 0 ]; then
    eval "$__conda_setup"
else
    if [ -f "/Users/nono/anaconda3/etc/profile.d/conda.sh" ]; then
        . "/Users/nono/anaconda3/etc/profile.d/conda.sh"
    else
        export PATH="/Users/nono/anaconda3/bin:$PATH"
    fi
fi
unset __conda_setup
# <<< conda initialize <<<

This issue also happens with Miniforge on Macbooks, as seen in this GitHub issue titled Slow zsh startup on MacBook Pro 14-inch (M1 Pro). In my case, it's not the M1 Pro but the M1 Max. So the issue seems independent of specific chips but may be an Apple-Silicon-only problem.

Others may see this in M1, M1 Pro, M1 Max, M1 Ultra, and M2 chips and the incoming M2 Pro, M2 Max, and M2 Ultra.

Please let me know on Twitter if you find out how to make this initialization faster. In the meantime, I've removed this code and will have to get it back when I use conda, or simply find another way to initialize Anaconda on demand, only in the Terminal instances I want to use it.

SEPTEMBER 3, 2022

Here's how to convert a string from CamelCase to snake_case in Python with regular expressions.

import re

# Option 1
regex = r'(?<!^)(?=[A-Z])'
re.sub(regex, '_', 'GettingSimple', 0).lower()
# returns 'getting_simple'

# Option 2
pattern = re.compile(r'(?<!^)(?=[A-Z])')
pattern.sub('_', 'nonoMartinezAlonso').lower()
# returns 'nono_martinez_alonso'

See how to Convert from snake_case to camelCase.

JULY 17, 2022

Here's an example on how to use the "in" and "not in" Python operators.

› python
Python 3.9.13 (main, May 24 2022, 21:28:31) 
[Clang 13.1.6 (clang-1316.0.21.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> names = ['James', 'Paul', 'Lily', 'Glory']
>>> names
['James', 'Paul', 'Lily', 'Glory']
>>> print('YES' if 'Lily' in names else 'NO')
YES
>>> print('YES' if 'John' in names else 'NO')
NO
>>> print('NO' if 'Lily' not in names else 'YES')
YES
>>> print('NO' if 'John' not in names else 'YES')
NO

You could use this as a conditional in your code.

names = ['James', 'Paul', 'Lily', 'Glory']
new_person = 'Nono'

if new_person not in names:
  names.append(new_person)
  print(f'Added {new_person} to names.')
  # Added Nono to names.

if new_person in names:
  print(f'{new_person} was correctly added to names.')
  # Nono was correctly added to names.

JULY 8, 2022

PHP's ucwords() function converts a phrase to title case.

// PHP
ucwords('a big snake') 
// Returns "A Big Snake"

You can obtain the same result in Python with the .title() string method.

# Python
"a big snake".title()
# Returns "A Big Snake"

Here's how to test this function in the command-line interface.

python -c "print('nono martinez alonso'.title())"

LAST UPDATED JANUARY 31, 2023

Here's how to read text from a file in Python; maybe a file that already exists or a file to which you wrote the text with Python.

Read file contents

my_file = open('/your/file.txt', 'r')

# Read all file contents
contents = my_file.read()

# Print the contents
print(contents)

# Close the file
my_file.close()

Read file contents as lines

my_file = open('/your/file.txt', 'r')

# Read the lines of the file
lines = my_file.readlines()

# Iterate through the lines
for line in lines:
  print(line)

file.close()

JUNE 16, 2022

Today I tried to do this on my 13-inch MacBook Pro (M1, 2020).

conda create -n py2 python=2.7 -y

And I continue getting this error.

Collecting package metadata (current_repodata.json): done
Solving environment: failed with repodata from current_repodata.json,
will retry with next repodata source.
Collecting package metadata (repodata.json): done
Solving environment: failed

PackagesNotFoundError: The following packages are not available
from current channels:

  - python=2.7

Current channels:

  - https://conda.anaconda.org/conda-forge/osx-arm64
  - https://conda.anaconda.org/conda-forge/noarch

To search for alternate channels that may provide the conda
package you're looking for, navigate to

    https://anaconda.org

and use the search bar at the top of the page.

I can create environments with Python 3 versions without a problem though; say, Python 3.7, 3.8, or 3.9.

conda create -n py2 python=3.9 -y

JUNE 3, 2022

Here's how to determine the location of the active Python binary.

import sys

# Get Python binary's location.
print(sys.executable)
# /Users/nono/miniforge3/bin/python

See how to get Python's version information.

JUNE 2, 2022

Here's how to get the version of your Python executable with Python code and determine the location of the active Python binary.

import sys

# Get Python's version.
print(sys.version)
# 3.9.7 | packaged by conda-forge | (default, Sep 29 2021, 19:24:02) \n[Clang 11.1.0 ]

# Get Python's version information.
print(sys.version_info)
# sys.version_info(major=3, minor=9, micro=7, releaselevel='final', serial=0)

# Get Python binary's location.
print(sys.executable)
# /Users/nono/miniforge3/bin/python

APRIL 27, 2022

If we try to convert a literal string with decimal points—say, '123.456'—to an integer, we'll get this error.

>>> int('123.456') # Returns 123
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: invalid literal for int() with base 10: '123.456'

The solution is to convert the string literal into a float first and then convert it into an integer.

int(float('123.456')) # Returns 123

APRIL 14, 2022

Here's an easy way to check your Python variable types.

We first define our variables.

name = "Nono"
names = ["Nono", "Bea"]
person = {"name": "Nono", "location": "Spain"}
pair = ("Getting", "Simple")

Use the isinstance() method to check their types.

# Is name a string?
isinstance(name, str) # True

# Is names a list?
isinstance(names, list) # True

# Is person a dictionary?
isinstance(person, dict) # True

# Is pair a tuple?
isinstance(pair, tuple) # True

# Is name a list?
isinstance(name, list) # False

# Is person a string?
isinstance(person, str) # False

Then we can build conditional statements based on a variable's type.

if isinstance(name, str):
  print(f'Hello, {name}!')

MARCH 7, 2022

Here's a straightforward way to copy a file with Python to a different path or directory, also useful to duplicate or version a file with a different name.

import shutil

from = '/path/to/origin/file.md'
to = '/path/to/destination/new-name.md'

shutil.copy(from, to)

Source

SEPTEMBER 21, 2021

If you're trying to remove a directory using the os.rmdir function, but it contains other files, you'll probably hit the following error.

OSError: [Errno 66] Directory not empty:

You can ignore this error by using the shutil library instead of os.

import shutil
shutil.rmtree(path)

Note that Python won't prompt you to confirm this deletion action and this may lead to deleting files by mistake.

MAY 13, 2021

Here's a way to map a given color in a Pillow image (PIL.Image) to another color. This is not the fastest method and it will only replace exact matches.

In this example, we're turning all black pixels in the input image (0,0,0) with blue (0,0,255).

from PIL import Image
import numpy as np

img = Image.open('/path/to/image.png')
img[np.where((img==[0,0,0]).all(axis=2))] = [0,0,255]
img.show()

MAY 13, 2021

from PIL import Image

img = Image.open('/path/to/image')

left = 10
top = 20
right = 10
bottom = 20

img = img.crop((left, top, right, bottom))

LAST UPDATED MARCH 28, 2022

import json

file = open('my-file.json')
obj = json.load(file)
file.close()

Want to see older publications? Visit the archive.

Listen to Getting Simple .