SQLAlchemy versus Distributed Postgres
One of our customers recently asked if they could use their Python application built with SQLAlchemy with pgEdge, and were pleased to learn that they could. But what is SQLAlchemy, and what considerations might there be when working with a distributed multi-master PostgreSQL cluster like pgEdge Distributed Postgres?
SQLAlchemy is “the Python SQL Toolkit and Object Relational Mapper” according to its website. Most famously, it is used for its ORM capabilities which allow you to define your data model and to manage the database schema and access from Python, without having to worry about inconveniences like SQL. A good example from my world is pgAdmin, the management tool project for PostgreSQL that I started nearly 30(!) years ago; pgAdmin 4 stores most of its runtime configuration in either a SQLite database, or for larger shared installations, PostgreSQL. Most of the database code for that purpose uses SQLAlchemy both to handle schema creation and upgrades (known as migrations) as it makes it trivial to manage.
One of my awesome colleagues, Gil Browdy, took on the task of showing the customer how pgEdge can work in a distributed environment, and started with a simple script. The script shows the very basics of how we might get started working with SQLAlchemy and pgEdge, so let’s take a look at Gil’s example.
Setup
First, we need to get everything set up. We’re going to import the SQLAlchemy library, which we’ll be using with the psycopg PostgreSQL interface for Python, so we need to get them installed into a virtual environment:
~/sqlalchemy % python3 -m venv venv
~/sqlalchemy % source venv/bin/activate
~/sqlalchemy % pip install sqlalchemy psycopg
Collecting sqlalchemy
Using cached sqlalchemy-2.0.41-cp312-cp312-macosx_11_0_arm64.whl.metadata (9.6 kB)
Collecting psycopg
Using cached psycopg-3.2.9-py3-none-any.whl.metadata (4.5 kB)
Collecting typing-extensions>=4.6.0 (from sqlalchemy)
Using cached typing_extensions-4.14.1-py3-none-any.whl.metadata (3.0 kB)
Using cached sqlalchemy-2.0.41-cp312-cp312-macosx_11_0_arm64.whl (2.1 MB)
Using cached psycopg-3.2.9-py3-none-any.whl (202 kB)
Using cached typing_extensions-4.14.1-py3-none-any.whl (43 kB)
Installing collected packages: typing-extensions, sqlalchemy, psycopg
Successfully installed psycopg-3.2.9 sqlalchemy-2.0.41 typing-extensions-4.14.1
Code
With the environment set up we can play with our script. First, the boiler plate to import the SQLAlchemy functions we need:
import random
from sqlalchemy import create_engine, Column, inspect, Integer, String, MetaData, Table
from sqlalchemy.orm import sessionmaker
Next, we’ll create connections to each of the three nodes in my pgEdge cluster:
We define an array of connection strings, and then an engine
object for each:
# Replace with your database credentials
DATABASES = [
"postgresql+psycopg://pgedge:[email protected]:5432/demo",
"postgresql+psycopg://pgedge:[email protected]:5432/demo",
"postgresql+psycopg://pgedge:[email protected]:5432/demo",
]
# Create database engines
engines = [create_engine(db_url) for db_url in DATABASES]
We need a table to work with to demonstrate that replication works, so we can define a SQLAlchemy Table
object. This is attached to a MetaData
object which is a collection that holds all table objects. The tables themselves also contain Column
objects defining each column in which we’ll store data. As this is a test script we’ll also create a simple function to drop and recreate all of our managed tables each time we run the test.
# Define the table schema
metadata = MetaData()
example_table = Table(
"example_table",
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("data", String, nullable=False),
)
def create_table(engine):
"""Create the table on the specified database."""
metadata.drop_all(engine)
metadata.create_all(engine)
Some additional helper functions can be useful to validate whether or not a table or data exists on a given node in the cluster:
def validate_table_exists(engine):
"""Validate that the table exists on the specified database."""
inspector = inspect(engine)
return "example_table" in inspector.get_table_names()
def validate_data_exists(engine, data):
"""Validate that the data exists in the table on the specified database."""
with engine.connect() as connection:
result = connection.execute(example_table.select().where(example_table.c.data == data))
return result.fetchone() is not None
And last but not least, we need a function to insert some test data. You will note that this does not simply execute a SQL INSERT
statement (though we could do that by calling a psycopg function directly), but uses a regular Python method invocation on the table object:
def insert_data(engine, data):
"""Insert data into the table on the specified database and commit the transaction."""
Session = sessionmaker(bind=engine)
session = Session()
try:
session.execute(example_table.insert().values(data=data))
session.commit() # Commit the transaction
except Exception as e:
session.rollback() # Rollback the transaction in case of an error
print(f"Error inserting data: {e}")
finally:
session.close() # Close the session
We’ve set everything up and defined all of our helper functions, so now for the main function. Gil has commented this code nicely, but in a nutshell, we create the table on the first database, and then check to ensure that it exists on the other nodes in the cluster.
Note that as we’re using asynchronous replication in pgEdge, this may actually fail if the script runs the check before the Spock replication engine has replicated the DDL statement to the other nodes. That could be solved with the addition of a brief sleep if needed, however in a typical application you would normally only use one node of the cluster so this is really only a potential problem for this test.
Assuming the table now exists on all nodes, we insert a row on a node chosen at random and then check that it is replicated to all other nodes.
def main():
# Step 1: Create the table on the first database
create_table(engines[0])
# Step 2: Validate the table exists on the other two databases
for engine in engines[1:]:
if not validate_table_exists(engine):
print("Table does not exist on one of the databases.")
return
# Step 3: Randomly select a database to write data
selected_engine = random.choice(engines)
data_to_insert = "test_data"
insert_data(selected_engine, data_to_insert)
# Step 4: Validate the data exists on the other two databases
for engine in engines:
if not validate_data_exists(engine, data_to_insert):
print("Data validation failed on one of the databases.")
return
print("Data successfully validated across all databases.")
if __name__ == "__main__":
main()
Now this is a somewhat contrived example, and not overly representative of a real world application in which you would almost certainly have affinity to one particular node in the cluster - but it does show how simple it is to setup and use the basics of SQLAlchemy and prove that it functions as expected with a multi-master replicated cluster.
Snowflakes
One important concept this example does not show is how to handle unique identifiers across the cluster. pgEdge uses a Snowflake Sequence extension as a replacement for standard sequences that is designed to ensure that generated values are unique across the cluster. You can learn more about the Snowflake extension in our documentation - in particular, note that it is important to set the snowflake.node
configuration parameter (or GUC) for each individual cluster node once the extension has been installed and created in the database.
To use the Snowflake sequence, we must additionally import the Sequence
object and text()
function from SQLAlchemy:
from sqlalchemy import create_engine, Column, inspect, BigInteger, String, MetaData, Table, Sequence, text
Then, we simply modify the schema to first create a regular sequence which will be used by Snowflake, and then set the server default value for the column in our example table. It’s worth noting that we also need to use the bigint
(AKA int8) datatype for Snowflake sequences – an integer
(AKA int4) will not be large enough:
# Define the table schema
metadata = MetaData()
example_seq = Sequence(
"example_seq",
metadata=metadata,
start=1
)
example_table = Table(
"example_table",
metadata,
Column("id", BigInteger, server_default=text("snowflake.nextval('example_seq'::regclass)"), primary_key=True),
Column("data", String, nullable=False),
)
With these minor modifications, rows will be identified by values from the Snowflake sequence, thus ensuring that there are no sequence value collisions from different nodes in the cluster.
Conclusion
This blog demonstrates how an ORM such as SQLAlchemy can be used with a distributed multi-master, asynchronous PostgreSQL cluster such as pgEdge Distributed Postgres. With the exception of the need to ensure sequence values do not collide when generated on different nodes of the cluster, there really aren’t any differences from using the ORM with a single PostgreSQL instance.