Re: [sqlalchemy] Sane way to monitor external DB changes from application?
At least one database (postgres) has a pub/sub messaging facility (NOTIFY/LISTEN) that you can use to do this. See the postgres docs. We use this extensively. On the listen end, you basically want to get down to the psycopg layer, because sqlalchemy's layers aren't going to be helpful. 1. Get it using engine.raw_connection() 2. Detach it from the thread pool (c.detach()) 3. Change the isolation level to autocommit c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) (note that if you return the connection to the connection pool, this change may survive after the connection is returned to the pool, causing subtle havoc later) 4. Call c.poll() to wait for events (see psycopg2 docs) 5. Use c.notifies to get messages On the notify end, you don't need to do these special things to the connection and can issue messages using raw sql text on sqlalchemy connection objects. I would assume other backends will be completely different. You can use select/epoll/whatever to do async IO if needed. You may find postgres' advisory locks useful if any synchronization needs arise. References: http://www.postgresql.org/docs/9.4/static/sql-listen.html http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications - Dave On 9/2/2015 8:48 PM, Ken Lareau wrote: I'm going to try to see if I can give enough detail here to allow folks to make sense, but I will be simplifying things a bit to prevent a 1,000+ line email, too... So I have an in-house application that handles software deployments. It uses a database backend to keep track of current deployments and maintain history (actually, the database is used more generally for our Site Operations site management with specific tables created just for the application). I am working on a major refactoring of the deployment code itself where the actually installation of the software is handled by a daemon that runs constantly in the background looking for new deployments to perform. The key tables look like this: class Deployment(Base): __tablename__ = 'deployments' id = Column(u'DeploymentID', INTEGER(), primary_key=True) package_id = Column( INTEGER(), ForeignKey('packages.package_id', ondelete='cascade'), nullable=False ) package = relationship( "Package", uselist=False, back_populates='deployments' ) user = Column(String(length=32), nullable=False) status = Column( Enum('pending', 'queued', 'inprogress', 'complete', 'failed', 'canceled', 'stopped'), server_default='pending', nullable=False, ) declared = Column( TIMESTAMP(), nullable=False, server_default=func.current_timestamp() ) created_at = synonym('declared') app_deployments = relationship( 'AppDeployment', order_by="AppDeployment.created_at, AppDeployment.id" ) host_deployments = relationship( 'HostDeployment', order_by="HostDeployment.created_at, HostDeployment.id" ) class AppDeployment(Base): __tablename__ = 'app_deployments' id = Column(u'AppDeploymentID', INTEGER(), primary_key=True) deployment_id = Column( u'DeploymentID', INTEGER(), ForeignKey('deployments.DeploymentID', ondelete='cascade'), nullable=False ) app_id = Column( u'AppID', SMALLINT(display_width=6), ForeignKey('app_definitions.AppID', ondelete='cascade'), nullable=False ) application = relationship("AppDefinition", uselist=False) target = synonym('application') deployment = relationship("Deployment", uselist=False) user = Column(String(length=32), nullable=False) status = Column( Enum( 'complete', 'incomplete', 'inprogress', 'invalidated', 'validated', ), nullable=False ) environment_id = Column( u'environment_id', INTEGER(), ForeignKey('environments.environmentID', ondelete='cascade'), nullable=False ) realized = Column( TIMESTAMP(), nullable=False, server_default=func.current_timestamp() ) created_at = synonym('realized') environment_obj = relationship('Environment') class HostDeployment(Base): __tablename__ = 'host_deployments' id = Column(u'HostDeploymentID', INTEGER(), primary_key=True) deployment_id = Column( u'DeploymentID', INTEGER(), ForeignKey('deployments.DeploymentID', ondelete='cascade'), nullable=False ) deployment = relationship("Deployment", uselist=False) host_id = Column( u'HostID', INTEGER(), ForeignKey('hosts.HostID', ondelete='cascade'), nullable=False ) host = relationship("Host", uselist=False) user = Column(String(length=32), nullable=False) status = Column(Enum('inprogress', 'failed', 'ok'), nullable=False)
Re: [sqlalchemy] Sane way to monitor external DB changes from application?
Dave, Thanks for the response. Unfortunately the switch to PostgreSQL, while not a long ways away, probably will not occur until at least the beginning of next year and this rewrite is needed now. If there are no other options available then I'll just need to brute force it. :) - Ken On Wed, Sep 2, 2015 at 6:32 PM, Dave Vitekwrote: > At least one database (postgres) has a pub/sub messaging facility > (NOTIFY/LISTEN) that you can use to do this. See the postgres docs. We > use this extensively. > > On the listen end, you basically want to get down to the psycopg layer, > because sqlalchemy's layers aren't going to be helpful. > > 1. Get it using engine.raw_connection() > 2. Detach it from the thread pool (c.detach()) > 3. Change the isolation level to autocommit > c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) > (note that if you return the connection to the connection pool, this > change may survive after the connection is returned to the pool, causing > subtle havoc later) > 4. Call c.poll() to wait for events (see psycopg2 docs) > 5. Use c.notifies to get messages > > On the notify end, you don't need to do these special things to the > connection and can issue messages using raw sql text on sqlalchemy > connection objects. > > I would assume other backends will be completely different. You can use > select/epoll/whatever to do async IO if needed. You may find postgres' > advisory locks useful if any synchronization needs arise. > > References: > http://www.postgresql.org/docs/9.4/static/sql-listen.html > http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications > > - Dave > > > On 9/2/2015 8:48 PM, Ken Lareau wrote: > > I'm going to try to see if I can give enough detail here to allow folks to > make sense, but I will be simplifying things a bit to prevent a 1,000+ line > email, too... > > So I have an in-house application that handles software deployments. It > uses a database backend to keep track of current deployments and maintain > history (actually, the database is used more generally for our Site > Operations site management with specific tables created just for the > application). I am working on a major refactoring of the deployment code > itself where the actually installation of the software is handled by a > daemon that runs constantly in the background looking for new deployments > to perform. The key tables look like this: > > > class Deployment(Base): > __tablename__ = 'deployments' > > id = Column(u'DeploymentID', INTEGER(), primary_key=True) > package_id = Column( > INTEGER(), > ForeignKey('packages.package_id', ondelete='cascade'), > nullable=False > ) > > package = relationship( > "Package", > uselist=False, > back_populates='deployments' > ) > > user = Column(String(length=32), nullable=False) > status = Column( > Enum('pending', 'queued', 'inprogress', 'complete', 'failed', > 'canceled', 'stopped'), > server_default='pending', > nullable=False, > ) > declared = Column( > TIMESTAMP(), > nullable=False, > server_default=func.current_timestamp() > ) > created_at = synonym('declared') > app_deployments = relationship( > 'AppDeployment', order_by="AppDeployment.created_at, > AppDeployment.id" > ) > host_deployments = relationship( > 'HostDeployment', order_by="HostDeployment.created_at, > HostDeployment.id" > ) > > > class AppDeployment(Base): > __tablename__ = 'app_deployments' > > id = Column(u'AppDeploymentID', INTEGER(), primary_key=True) > deployment_id = Column( > u'DeploymentID', > INTEGER(), > ForeignKey('deployments.DeploymentID', ondelete='cascade'), > nullable=False > ) > app_id = Column( > u'AppID', > SMALLINT(display_width=6), > ForeignKey('app_definitions.AppID', ondelete='cascade'), > nullable=False > ) > > application = relationship("AppDefinition", uselist=False) > target = synonym('application') > deployment = relationship("Deployment", uselist=False) > > user = Column(String(length=32), nullable=False) > status = Column( > Enum( > 'complete', > 'incomplete', > 'inprogress', > 'invalidated', > 'validated', > ), > nullable=False > ) > environment_id = Column( > u'environment_id', > INTEGER(), > ForeignKey('environments.environmentID', ondelete='cascade'), > nullable=False > ) > realized = Column( > TIMESTAMP(), > nullable=False, > server_default=func.current_timestamp() > ) > created_at = synonym('realized') > > environment_obj = relationship('Environment') > > > class HostDeployment(Base): > __tablename__ = 'host_deployments' > >
[sqlalchemy] Sane way to monitor external DB changes from application?
I'm going to try to see if I can give enough detail here to allow folks to make sense, but I will be simplifying things a bit to prevent a 1,000+ line email, too... So I have an in-house application that handles software deployments. It uses a database backend to keep track of current deployments and maintain history (actually, the database is used more generally for our Site Operations site management with specific tables created just for the application). I am working on a major refactoring of the deployment code itself where the actually installation of the software is handled by a daemon that runs constantly in the background looking for new deployments to perform. The key tables look like this: class Deployment(Base): __tablename__ = 'deployments' id = Column(u'DeploymentID', INTEGER(), primary_key=True) package_id = Column( INTEGER(), ForeignKey('packages.package_id', ondelete='cascade'), nullable=False ) package = relationship( "Package", uselist=False, back_populates='deployments' ) user = Column(String(length=32), nullable=False) status = Column( Enum('pending', 'queued', 'inprogress', 'complete', 'failed', 'canceled', 'stopped'), server_default='pending', nullable=False, ) declared = Column( TIMESTAMP(), nullable=False, server_default=func.current_timestamp() ) created_at = synonym('declared') app_deployments = relationship( 'AppDeployment', order_by="AppDeployment.created_at, AppDeployment.id" ) host_deployments = relationship( 'HostDeployment', order_by="HostDeployment.created_at, HostDeployment.id" ) class AppDeployment(Base): __tablename__ = 'app_deployments' id = Column(u'AppDeploymentID', INTEGER(), primary_key=True) deployment_id = Column( u'DeploymentID', INTEGER(), ForeignKey('deployments.DeploymentID', ondelete='cascade'), nullable=False ) app_id = Column( u'AppID', SMALLINT(display_width=6), ForeignKey('app_definitions.AppID', ondelete='cascade'), nullable=False ) application = relationship("AppDefinition", uselist=False) target = synonym('application') deployment = relationship("Deployment", uselist=False) user = Column(String(length=32), nullable=False) status = Column( Enum( 'complete', 'incomplete', 'inprogress', 'invalidated', 'validated', ), nullable=False ) environment_id = Column( u'environment_id', INTEGER(), ForeignKey('environments.environmentID', ondelete='cascade'), nullable=False ) realized = Column( TIMESTAMP(), nullable=False, server_default=func.current_timestamp() ) created_at = synonym('realized') environment_obj = relationship('Environment') class HostDeployment(Base): __tablename__ = 'host_deployments' id = Column(u'HostDeploymentID', INTEGER(), primary_key=True) deployment_id = Column( u'DeploymentID', INTEGER(), ForeignKey('deployments.DeploymentID', ondelete='cascade'), nullable=False ) deployment = relationship("Deployment", uselist=False) host_id = Column( u'HostID', INTEGER(), ForeignKey('hosts.HostID', ondelete='cascade'), nullable=False ) host = relationship("Host", uselist=False) user = Column(String(length=32), nullable=False) status = Column(Enum('inprogress', 'failed', 'ok'), nullable=False) realized = Column( TIMESTAMP(), nullable=False, server_default=func.current_timestamp() ) created_at = synonym('realized') (Forgive a bit of the 'messiness', these classes have been a bit organic as we've been trying to improve the program flow and design.) The basic process the application itself performs is to gather the information from the user on which tiers (AppDeployment) and hosts (HostDeployment) to deploy the software onto, then creates (or updates) entries for all of them, setting their status to 'inprogress'. Once this is done, the base Deployment entry has its status set to 'queued', at which point the installer daemon will take over and work its way through all the entries connected to that Deployment entry and update the entries as each individual deployment finishes. Because the daemon is doing the state changes itself, I need the main application to 'watch' the database for changes and inform the user when they occur (the application does have a 'detached' state which simply exits after it sets up the deployment, but that's orthogonal to the current issue). I could do this the "brute force" way, which is essentially: 1) Query the database for all the relevant entries 2) Check against the previous state to see which have