Here's a Python class that can track and push metrics to AWS CloudWatch.
Metrics are reset to their initial values on creation and when metrics are uploaded to CloudWatch.
# metrics.py
'''
A metrics class ready to track and push metrics to AWS CloudWatch.
'''
from datetime import datetime
import os
import boto3
# CloudWatch metrics namespace.
METRICS_NAMESPACE = 'my_metrics_namespace'
# Duration to wait between metric uploads.
METRICS_UPLOAD_THRESHOLD_SECONDS = 50
class Metrics:
'''
Holds metrics, serializes them to CloudWatch format,
and ingests foreign metric values.
'''
def __init__(self):
self.reset()
def reset(self):
'''
Resets metric values and last upload time.
'''
self.last_upload_time = datetime.now()
# Your custom metrics and initial values
# Note that here we're using 'my_prefix' as
# a custom prefix in case you want this class
# to add a prefix namespace to all its metrics.
self.my_prefix_first_metric = 0
self.my_prefix_second_metric = 0
def to_data(self):
'''
Serializes metrics and their values.
'''
def to_cloudwatch_format(name, value):
return {'MetricName': name, 'Value': value}
result = []
for name, value in vars(self).items():
if name != 'last_upload_time':
result.append(to_cloudwatch_format(name, value))
return result
def ingest(self, metrics, prefix=''):
'''
Adds foreign metric values to this metrics object.
'''
input_metric_names = [attr for attr in dir(metrics)
if not callable(getattr(metrics, attr))
and not attr.startswith("__")]
# Iterate through foreign keys and add metric values.
for metric_name in input_metric_names:
# Get value of foreign metric.
input_metric_value = getattr(metrics, metric_name)
# Get metric key.
metric_key = f'{prefix}_{metric_name}'
# Get metric value.
metric_value = getattr(self, metric_key)
# Add foreign values to this metrics object.
setattr(
self,
metric_key,
input_metric_value + metric_value
)
def upload(self, force=False):
'''
Uploads metrics to CloudWatch when time since last
upload is above a duration or when forced.
'''
# Get time elapsed since last upload.
seconds_since_last_upload = \
(datetime.now() - self.last_upload_time).seconds
# Only upload if duration is greater than threshold,
# or when the force flag is set to True.
if seconds_since_last_upload > 50 or force:
# Upload metrics to CloudWatch.
cloudwatch = boto3.client(
'cloudwatch',
os.getenv('AWS_REGION')
)
cloudwatch.put_metric_data(
Namespace=METRICS_NAMESPACE,
MetricData=self.to_data()
)
# Reset metrics.
self.reset()
To use this class, we just have to instantiate a metrics object, track some metrics, and upload them.
# Create a metrics object.
metrics = Metrics()
# Add values to its metrics.
metrics.my_prefix_first_metric += 3
metrics.my_prefix_second_metric += 1
# Upload metrics to CloudWatch.
metrics.upload(force=True)
If you were processing metrics at a fast pace, you don't want to upload metrics every single time you increase their value, as otherwise CloudWatch will complain. In certain cases, AWS CloudWatch's limit is 5 transactions per second (TPS) per account or AWS Region. When this limit is reached, you'll receive a RateExceeded throttling error.
By calling metrics.upload(force=False)
we only upload once every METRICS_UPLOAD_THRESHOLD_SECONDS
. (In this example, at maximum every 50 seconds.)
import time
# Create a metrics object.
metrics = Metrics()
for i in range(0, 100, 1):
# Wait for illustration purposes,
# as if we were doing work.
time.sleep(1)
# Add values to its metrics.
metrics.my_prefix_first_metric += 3
metrics.my_prefix_second_metric += 1
# Only upload if more than the threshold
# duration has passed since we last uploaded.
metrics.upload()
# Force-upload metrics to CloudWatch once we're done.
metrics.upload(force=True)
Lastly, here's how to ingest foreign metrics with or without a prefix.
# We define a foreign metrics class.
class OtherMetrics:
def __init__(self):
self.reset()
def reset(self):
# Note that here we don't have 'my_prefix'.
self.first_metric = 0
self.second_metric = 0
# We instantiate both metric objects.
metrics = Metrics()
other_metrics = OtherMetrics()
# The foreign metrics track values.
other_metrics.first_metric += 15
other_metrics.second_metric += 3
# Then our main metrics class ingests those metrics.
metrics.ingest(other_metrics, prefix='my_prefix')
# Then our main metrics class has those values.
print(metrics.my_prefix_first_metric)
# Returns 15
print(metrics.my_prefix_second_metric)
# Returns 3
If you found this useful, let me know!
Take a look at other posts about code, Python, and Today I Learned(s).
Here's how to sort a Python dictionary by a key, a property name, of its items. Check this post if you're looking to sort a list of lists instead.
# A list of people
people = [
{'name': 'Nono', 'age': 32, 'location': 'Spain'},
{'name': 'Alice', 'age': 20, 'location': 'Wonderland'},
{'name': 'Phillipe', 'age': 100, 'location': 'France'},
{'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
]
# Sort people by age, ascending
people_sorted_by_age_asc = sorted(people, key=lambda x: x['age'])
print(people_sorted_by_age_asc)
# [
# {'name': 'Alice', 'age': 20, 'location': 'Wonderland'},
# {'name': 'Nono', 'age': 32, 'location': 'Spain'},
# {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
# {'name': 'Phillipe', 'age': 100, 'location': 'France'}
# ]
# Sort people by age, descending
people_sorted_by_age_desc = sorted(people, key=lambda x: -x['age'])
print(people_sorted_by_age_desc)
# [
# {'name': 'Phillipe', 'age': 100, 'location': 'France'},
# {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
# {'name': 'Nono', 'age': 32, 'location': 'Spain'},
# {'name': 'Alice', 'age': 20, 'location': 'Wonderland'}
# ]
# Sort people by name, ascending
people_sorted_by_name_desc = sorted(people, key=lambda x: x['name'])
print(people_sorted_by_name_desc)
# [
# {'name': 'Alice', 'age': 20, 'location': 'Wonderland'},
# {'name': 'Jack', 'age': 45, 'location': 'Caribbean'},
# {'name': 'Nono', 'age': 32, 'location': 'Spain'},
# {'name': 'Phillipe', 'age': 100, 'location': 'France'}
# ]
You can measure the time elapsed during the execution of Python commands by keeping a reference to the start
time and then subtracting the current
time at any point on your program from that start
time to obtain the duration between two points in time.
from datetime import datetime
import time
# Define the start time.
start = datetime.now()
# Run some code..
time.sleep(2)
# Get the time delta since the start.
elapsed = datetime.now() - start
# datetime.timedelta(seconds=2, microseconds=005088)
# 0:00:02.005088
# Get the seconds since the start.
elapsed_seconds = elapsed.seconds
# 2
Let's create two helper functions to get the current time (i.e. now
) and the elapsed
time at any moment.
# Returns current time
# (and, if provided, prints the event's name)
def now(eventName = ''):
if eventName:
print(f'Started {eventName}..')
return datetime.now()
# Store current time as `start`
start = now()
# Returns time elapsed since `beginning`
# (and, optionally, prints the duration in seconds)
def elapsed(beginning = start, log = False):
duration = datetime.now() - beginning;
if log:
print(f'{duration.seconds}s')
return duration
With those utility functions defined, we can measure the duration of different events.
# Define time to wait
wait_seconds = 2
# Measure duration (while waiting for 2 seconds)
beginning = now(f'{wait_seconds}-second wait.')
# Wait.
time.sleep(wait_seconds)
# Get time delta.
elapsed_time = elapsed(beginning, True)
# Prints 0:00:02.004004
# Get seconds.
elapsed_seconds = elapsed_time.seconds
# Prints 2
# Get microseconds.
elapsed_microseconds = elapsed_time.microseconds
# Prints 4004
If you found this useful, you might want to join my mailing lists; or take a look at other posts about code, Python, React, and TypeScript.
Here's how to sort a Python list by a key of its items. Check this post if you're looking to sort a list of dictionaries instead.
# A list of people
# name, age, location
people = [
['Nono', 32, 'Spain'],
['Alice', 20, 'Wonderland'],
['Phillipe', 100, 'France'],
['Jack', 45, 'Caribbean'],
]
# Sort people by age, ascending
people_sorted_by_age_asc = sorted(people, key=lambda x: x[1])
# [
# ['Alice', 20, 'Wonderland'],
# ['Nono', 32, 'Spain'],
# ['Jack', 45, 'Caribbean'],
# ['Phillipe', 100, 'France']
# ]
# Sort people by age, descending
people_sorted_by_age_desc = sorted(people, key=lambda x: -x[1])
# [
# ['Phillipe', 100, 'France'],
# ['Jack', 45, 'Caribbean'],
# ['Nono', 32, 'Spain'],
# ['Alice', 20, 'Wonderland']
# ]
# Sort people by name, ascending
people_sorted_by_name_desc = sorted(people, key=lambda x: x[0])
# [
# ['Alice', 20, 'Wonderland'],
# ['Jack', 45, 'Caribbean'],
# ['Nono', 32, 'Spain'],
# ['Phillipe', 100, 'France']
# ]
Here's how to read contents from a comma-separated value (CSV) file in Python; maybe a CSV that already exists or a CSV you saved from Python.
import csv
csv_file_path = 'file.csv'
with open(csv_file_path, encoding='utf-8') as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
# Print the first five rows
for row in list(csv_reader)[:5]:
print(row)
# Print all rows
for row in list(csv_reader)[:5]:
print(row)
Here's how to generate pseudo-random numbers in Python.
import random
# Random generation seed for reproducible results
seed = 42
# Float
random.Random(seed).uniform(3,10)
# 7.475987589205186
# Integer
int(random.Random(seed).uniform(3,10))
# 7
# Integer
random.Random(seed).randint(0, 999)
# 654
See the random module for more information.
Leading zeros are extra zeros to the left of a number when you want to have a regular amount of digits in a set of numbers.
For instance, 0001
, 0002
, and 0003
is a good formatting if you think you'll get to have thousands of entries, as you can stay at four digits up to 9999
.
# Define your number
number = 1
two_digits = f'{number:02d}'
# 01
four_digits = f'{number:04d}'
# 0001
We use the Python formatting helper {my_number:04d}
to enforce a minimum set of digits in our number variable.
This means you can use it to set the value of a string or to create or print a longer string with that number, not necessarily having to store its value.
a_number = 42
print(f'The number is {a_number:06d}.')
# The number is 000042.
print(f'The number is {512:06d}.')
# The number is 000512.
The e
flag/option of pip
"installs a project in editable mode (i.e. setuptools “develop mode”) from a local project path or a VCS url."
pip install -e .
-e, --editable <path/url>
As described in the expanded command flag, -e
stands for editable.
Today I learned you can use the plus (+) operator to concatenate or extend lists in Python.
Say you have two lists.
list_a = [1, 2, 3]
list_b = ['Nono', 'MA']
And that you want to create a continuous list with the contents of both, which would look something like [1, 2, 3, 'Nono', 'MA']
.
You can simple add both lists to obtain that result.
>>> combined_list = [1, 2, 3] + ['Nono', 'MA']
>>> combined_list
[1, 2, 3, 'Nono', 'MA']
Of course, it doesn't too much sense in this example because we're explicitly defining the lists and we could define a combined list directly.
combined_list = [1, 2, 3, 'Nono', 'MA']
But it can be useful when we actually need to add lists, for instance to concatenate the results of glob
file listing operations.
>>> from glob import glob
>>> files_a = glob('a/*')
>>> files_a
['a/file.txt', 'a/image.jpg']
>>> files_b = glob('b/*')
>>> files_b
['b/data.json', 'b/profile.jpeg']
>>> all_files = files_a + files_b
>>> all_files
['a/file.txt', 'a/image.jpg', 'b/data.json', 'b/profile.jpeg']
How to encode an image dataset to reduce its dimensionality and visualize it in the 2D space.
Here's how to translate 3d points in Python using a translation matrix.
To translate a series of points in three dimensions in Cartesian space (x, y, z) you first need to "homogenize" the points by adding a value to their projective dimension—which we'll set to one to maintain the point's original coordinates, and then multiply our point cloud using NumPy's np.matmul
method by a transformation matrix constructed from a (4, 4) identity matrix with three translation parameters in its bottom row (tx, ty, tz).
Here's a breakdown of the steps.
np.matmul
# translate.py
import numpy as np
# Define a set of Cartesian (x, y, z) points
point_cloud = [
[0, 0, 0],
[1, 0, 0],
[0, 1, 0],
[0, 0, 1],
[1, 1, 1],
[1, 2, 3],
]
# Convert to homogeneous coordinates
point_cloud_homogeneous = []
for point in point_cloud:
point_homogeneous = point.copy()
point_homogeneous.append(1)
point_cloud_homogeneous.append(point_homogeneous)
# Define the translation
tx = 2
ty = 10
tz = 100
# Construct the translation matrix
translation_matrix = [
[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
[tx, ty, tz, 1],
]
# Apply the transformation to our point cloud
translated_points = np.matmul(
point_cloud_homogeneous,
translation_matrix)
# Convert to cartesian coordinates
translated_points_xyz = []
for point in translated_points:
point = np.array(point[:-1])
translated_points_xyz.append(point)
# Map original to translated point coordinates
# (x0, y0, z0) → (x1, y1, z1)
for i in range(len(point_cloud)):
point = point_cloud[i]
translated_point = translated_points_xyz[i]
print(f'{point} → {list(translated_point)}')
If you try to serialize a NumPy array to JSON in Python, you'll get the error below.
TypeError: Object of type ndarray is not JSON serializable
Luckily, NumPy has a built-in method to convert one- or multi-dimensional arrays to lists, which are in turn JSON serializable.
import numpy as np
import json
# Define your NumPy array
arr = np.array([[100,200],[300,400]])
# Convert the array to list
arr_as_list = arr.tolist()
# Serialize as JSON
json.dumps(arr_as_list)
# '[[100, 200], [300, 400]]'
Here's the error I was getting when trying to return a NumPy ndarray
in the response body of an AWS Lambda function.
Object of type ndarray is not JSON serializable
import numpy as np
import json
# A NumPy array
arr = np.array([[1,2,3],[4,5,6]])
.astype(np.float64)
# Serialize the array
json.dumps(arr)
# TypeError: Object of type ndarray is not JSON serializable
NumPy arrays provide a built-in method to convert them to lists called .tolist()
.
import numpy as np
import json
# A NumPy array
arr = np.array([[1,2,3],[4,5,6.78]])
.astype(np.float64)
# Convert the NumPy array to a list
arr_as_list = arr.tolist()
# Serialize the list
json.dumps(arr_as_list)
I've had conda
's initialization code in my .zshrc
file for a long time.
I've used it on my former Intel and M1 Macs, but it was just recently that I migrated my code to a new M1 Max Apple Silicon Mac.
When I start a new Terminal window, I see how a Python process takes up to 5–10 seconds to finalize before the Terminal becomes responsive.
I'm used to hitting CMD + N
for a new window or CMT + T
for a new tab and starting to type immediately.
But this issue breaks my workflow and keeps me hanging for a few seconds per new window (!).
Here's my initialization code, auto-generated by Anaconda.
# >>> conda initialize >>>
!! Contents within this block are managed by 'conda init' !!
__conda_setup="$('/Users/nono/anaconda3/bin/conda' 'shell.zsh' 'hook' 2> /dev/null)"
if [ $? -eq 0 ]; then
eval "$__conda_setup"
else
if [ -f "/Users/nono/anaconda3/etc/profile.d/conda.sh" ]; then
. "/Users/nono/anaconda3/etc/profile.d/conda.sh"
else
export PATH="/Users/nono/anaconda3/bin:$PATH"
fi
fi
unset __conda_setup
# <<< conda initialize <<<
This issue also happens with Miniforge on Macbooks, as seen in this GitHub issue titled Slow zsh startup on MacBook Pro 14-inch (M1 Pro). In my case, it's not the M1 Pro but the M1 Max. So the issue seems independent of specific chips but may be an Apple-Silicon-only problem.
Others may see this in M1, M1 Pro, M1 Max, M1 Ultra, and M2 chips and the incoming M2 Pro, M2 Max, and M2 Ultra.
Please let me know on Twitter if you find out how to make this initialization faster. In the meantime, I've removed this code and will have to get it back when I use conda, or simply find another way to initialize Anaconda on demand, only in the Terminal instances I want to use it.
Here's how to convert a string from CamelCase
to snake_case
in Python with regular expressions.
import re
# Option 1
regex = r'(?<!^)(?=[A-Z])'
re.sub(regex, '_', 'GettingSimple', 0).lower()
# returns 'getting_simple'
# Option 2
pattern = re.compile(r'(?<!^)(?=[A-Z])')
pattern.sub('_', 'nonoMartinezAlonso').lower()
# returns 'nono_martinez_alonso'
See how to Convert from snake_case to camelCase.
Here's an example on how to use the "in" and "not in" Python operators.
› python
Python 3.9.13 (main, May 24 2022, 21:28:31)
[Clang 13.1.6 (clang-1316.0.21.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> names = ['James', 'Paul', 'Lily', 'Glory']
>>> names
['James', 'Paul', 'Lily', 'Glory']
>>> print('YES' if 'Lily' in names else 'NO')
YES
>>> print('YES' if 'John' in names else 'NO')
NO
>>> print('NO' if 'Lily' not in names else 'YES')
YES
>>> print('NO' if 'John' not in names else 'YES')
NO
You could use this as a conditional in your code.
names = ['James', 'Paul', 'Lily', 'Glory']
new_person = 'Nono'
if new_person not in names:
names.append(new_person)
print(f'Added {new_person} to names.')
# Added Nono to names.
if new_person in names:
print(f'{new_person} was correctly added to names.')
# Nono was correctly added to names.
PHP's ucwords()
function converts a phrase to title case.
// PHP
ucwords('a big snake')
// Returns "A Big Snake"
You can obtain the same result in Python with the .title()
string method.
# Python
"a big snake".title()
# Returns "A Big Snake"
Here's how to test this function in the command-line interface.
python -c "print('nono martinez alonso'.title())"
Here's how to read text from a file in Python; maybe a file that already exists or a file to which you wrote the text with Python.
my_file = open('/your/file.txt', 'r')
# Read all file contents
contents = my_file.read()
# Print the contents
print(contents)
# Close the file
my_file.close()
my_file = open('/your/file.txt', 'r')
# Read the lines of the file
lines = my_file.readlines()
# Iterate through the lines
for line in lines:
print(line)
file.close()
Today I tried to do this on my 13-inch MacBook Pro (M1, 2020).
conda create -n py2 python=2.7 -y
And I continue getting this error.
Collecting package metadata (current_repodata.json): done
Solving environment: failed with repodata from current_repodata.json,
will retry with next repodata source.
Collecting package metadata (repodata.json): done
Solving environment: failed
PackagesNotFoundError: The following packages are not available
from current channels:
- python=2.7
Current channels:
- https://conda.anaconda.org/conda-forge/osx-arm64
- https://conda.anaconda.org/conda-forge/noarch
To search for alternate channels that may provide the conda
package you're looking for, navigate to
https://anaconda.org
and use the search bar at the top of the page.
I can create environments with Python 3 versions without a problem though; say, Python 3.7, 3.8, or 3.9.
conda create -n py2 python=3.9 -y
Here's how to determine the location of the active Python binary.
import sys
# Get Python binary's location.
print(sys.executable)
# /Users/nono/miniforge3/bin/python
Here's how to get the version of your Python executable with Python code and determine the location of the active Python binary.
import sys
# Get Python's version.
print(sys.version)
# 3.9.7 | packaged by conda-forge | (default, Sep 29 2021, 19:24:02) \n[Clang 11.1.0 ]
# Get Python's version information.
print(sys.version_info)
# sys.version_info(major=3, minor=9, micro=7, releaselevel='final', serial=0)
# Get Python binary's location.
print(sys.executable)
# /Users/nono/miniforge3/bin/python
If we try to convert a literal string with decimal points—say, '123.456'
—to an integer, we'll get this error.
>>> int('123.456') # Returns 123
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: invalid literal for int() with base 10: '123.456'
The solution is to convert the string literal into a float
first and then convert it into an integer
.
int(float('123.456')) # Returns 123
Here's an easy way to check your Python variable types.
We first define our variables.
name = "Nono"
names = ["Nono", "Bea"]
person = {"name": "Nono", "location": "Spain"}
pair = ("Getting", "Simple")
Use the isinstance()
method to check their types.
# Is name a string?
isinstance(name, str) # True
# Is names a list?
isinstance(names, list) # True
# Is person a dictionary?
isinstance(person, dict) # True
# Is pair a tuple?
isinstance(pair, tuple) # True
# Is name a list?
isinstance(name, list) # False
# Is person a string?
isinstance(person, str) # False
Then we can build conditional statements based on a variable's type.
if isinstance(name, str):
print(f'Hello, {name}!')
Here's a straightforward way to copy a file with Python to a different path or directory, also useful to duplicate or version a file with a different name.
import shutil
from = '/path/to/origin/file.md'
to = '/path/to/destination/new-name.md'
shutil.copy(from, to)
If you're trying to remove a directory using the os.rmdir
function, but it contains other files, you'll probably hit the following error.
OSError: [Errno 66] Directory not empty:
You can ignore this error by using the shutil
library instead of os
.
import shutil
shutil.rmtree(path)
Note that Python won't prompt you to confirm this deletion action and this may lead to deleting files by mistake.
Here's a way to map a given color in a Pillow image (PIL.Image) to another color. This is not the fastest method and it will only replace exact matches.
In this example, we're turning all black pixels in the input image (0,0,0) with blue (0,0,255).
from PIL import Image
import numpy as np
img = Image.open('/path/to/image.png')
img[np.where((img==[0,0,0]).all(axis=2))] = [0,0,255]
img.show()
from PIL import Image
img = Image.open('/path/to/image')
left = 10
top = 20
right = 10
bottom = 20
img = img.crop((left, top, right, bottom))
import json
file = open('my-file.json')
obj = json.load(file)
file.close()