import asyncio import sqlalchemy from sqlalchemy import orm as sqlalchemy_orm from sqlalchemy import future as sqlalchemy_future from Database.base import Base from Database.database import Database class A(Base): __tablename__ = "a" data = sqlalchemy.Column(sqlalchemy.String) create_date = sqlalchemy.Column(sqlalchemy.DateTime, server_default=sqlalchemy.func.now()) bs = sqlalchemy_orm.relationship("B") # required in order to access columns with server defaults # or SQL expression defaults, subsequent to a flush, without # triggering an expired load __mapper_args__ = {"eager_defaults": True} class B(Base): __tablename__ = "b" a_id = sqlalchemy.Column(sqlalchemy.ForeignKey("a.id")) data = sqlalchemy.Column(sqlalchemy.String) async def main(): db = Database( "postgresql://localhost:5432/testing", ) await db.drop_all() await db.create_all() # expire_on_commit=False will prevent attributes from being expired # after commit. async with db.async_session() as session: async with session.begin(): session.add_all( [ A(bs=[B(), B()], data="a1"), A(bs=[B()], data="a2"), A(bs=[B(), B()], data="a3"), ] ) stmt = sqlalchemy_future.select(A).options(sqlalchemy_orm.selectinload(A.bs)) result = await session.execute(stmt) for a1 in result.scalars(): print(a1) print(f"created at: {a1.create_date}") for b1 in a1.bs: print(b1) result = await session.execute(sqlalchemy_future.select(A).order_by(A.id)) a1 = result.scalars().first() a1.data = "new data" await session.commit() # access attribute subsequent to commit; this is what # expire_on_commit=False allows print(a1.data) # for AsyncEngine created in function scope, close and # clean-up pooled connections await db.async_engine.dispose() asyncio.run(main())