At least one database (postgres) has a pub/sub messaging facility (NOTIFY/LISTEN) that you can use to do this. See the postgres docs. We use this extensively.

On the listen end, you basically want to get down to the psycopg layer, because sqlalchemy's layers aren't going to be helpful.

1.  Get it using engine.raw_connection()
2.  Detach it from the thread pool (c.detach())
3. Change the isolation level to autocommit c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) (note that if you return the connection to the connection pool, this change may survive after the connection is returned to the pool, causing subtle havoc later)
4. Call c.poll() to wait for events (see psycopg2 docs)
5. Use c.notifies to get messages

On the notify end, you don't need to do these special things to the connection and can issue messages using raw sql text on sqlalchemy connection objects.

I would assume other backends will be completely different. You can use select/epoll/whatever to do async IO if needed. You may find postgres' advisory locks useful if any synchronization needs arise.

References:
http://www.postgresql.org/docs/9.4/static/sql-listen.html
http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications

- Dave

On 9/2/2015 8:48 PM, Ken Lareau wrote:
I'm going to try to see if I can give enough detail here to allow folks to make sense, but I will be simplifying things a bit to prevent a 1,000+ line email, too...

So I have an in-house application that handles software deployments. It uses a database backend to keep track of current deployments and maintain history (actually, the database is used more generally for our Site Operations site management with specific tables created just for the application). I am working on a major refactoring of the deployment code itself where the actually installation of the software is handled by a daemon that runs constantly in the background looking for new deployments to perform. The key tables look like this:


class Deployment(Base):
    __tablename__ = 'deployments'

    id = Column(u'DeploymentID', INTEGER(), primary_key=True)
    package_id = Column(
        INTEGER(),
        ForeignKey('packages.package_id', ondelete='cascade'),
        nullable=False
    )

    package = relationship(
        "Package",
        uselist=False,
        back_populates='deployments'
    )

    user = Column(String(length=32), nullable=False)
    status = Column(
        Enum('pending', 'queued', 'inprogress', 'complete', 'failed',
             'canceled', 'stopped'),
        server_default='pending',
        nullable=False,
    )
    declared = Column(
        TIMESTAMP(),
        nullable=False,
server_default=func.current_timestamp()
    )
    created_at = synonym('declared')
    app_deployments = relationship(
'AppDeployment', order_by="AppDeployment.created_at, AppDeployment.id"
    )
    host_deployments = relationship(
'HostDeployment', order_by="HostDeployment.created_at, HostDeployment.id"
    )


class AppDeployment(Base):
    __tablename__ = 'app_deployments'

    id = Column(u'AppDeploymentID', INTEGER(), primary_key=True)
    deployment_id = Column(
        u'DeploymentID',
        INTEGER(),
        ForeignKey('deployments.DeploymentID', ondelete='cascade'),
        nullable=False
    )
    app_id = Column(
        u'AppID',
        SMALLINT(display_width=6),
        ForeignKey('app_definitions.AppID', ondelete='cascade'),
        nullable=False
    )

    application = relationship("AppDefinition", uselist=False)
    target = synonym('application')
    deployment = relationship("Deployment", uselist=False)

    user = Column(String(length=32), nullable=False)
    status = Column(
        Enum(
            'complete',
            'incomplete',
            'inprogress',
            'invalidated',
            'validated',
        ),
        nullable=False
    )
    environment_id = Column(
        u'environment_id',
        INTEGER(),
ForeignKey('environments.environmentID', ondelete='cascade'),
        nullable=False
    )
    realized = Column(
        TIMESTAMP(),
        nullable=False,
server_default=func.current_timestamp()
    )
    created_at = synonym('realized')

    environment_obj = relationship('Environment')


class HostDeployment(Base):
    __tablename__ = 'host_deployments'

    id = Column(u'HostDeploymentID', INTEGER(), primary_key=True)
    deployment_id = Column(
        u'DeploymentID',
        INTEGER(),
        ForeignKey('deployments.DeploymentID', ondelete='cascade'),
        nullable=False
    )

    deployment = relationship("Deployment", uselist=False)

    host_id = Column(
        u'HostID',
        INTEGER(),
        ForeignKey('hosts.HostID', ondelete='cascade'),
        nullable=False
    )
    host = relationship("Host", uselist=False)

    user = Column(String(length=32), nullable=False)
    status = Column(Enum('inprogress', 'failed', 'ok'), nullable=False)
    realized = Column(
        TIMESTAMP(),
        nullable=False,
server_default=func.current_timestamp()
    )
    created_at = synonym('realized')


(Forgive a bit of the 'messiness', these classes have been a bit organic as we've been trying to improve the program flow and design.)

The basic process the application itself performs is to gather the information from the user on which tiers (AppDeployment) and hosts (HostDeployment) to deploy the software onto, then creates (or updates) entries for all of them, setting their status to 'inprogress'. Once this is done, the base Deployment entry has its status set to 'queued', at which point the installer daemon will take over and work its way through all the entries connected to that Deployment entry and update the entries as each individual deployment finishes.

Because the daemon is doing the state changes itself, I need the main application to 'watch' the database for changes and inform the user when they occur (the application does have a 'detached' state which simply exits after it sets up the deployment, but that's orthogonal to the current issue). I could do this the "brute force" way, which is essentially:

1) Query the database for all the relevant entries
2) Check against the previous state to see which have changed; if there are changes, notify the user
3) Update the tracked state for all
4) Sleep for a given amount of time, then go back to (1)

This is of course very database intensive, so to finally get to the point... I'm wondering if there's a way to do this such that the application can 'listen' for changes and be notified when they occur. I've tried doing a bit of reading on events, but not having used them before I'm unsure if this will work, especially since multiple deployments can be occurring so the 'listener' will need to be only paying attention to the entries related to that one given deployment (basically just the AppDeployment and HostDeployment entries that are tied to the specific Deployment entry). If it makes a difference, the database server itself is MySQL currently, though there are plans to migrate to PostgreSQL in the not-too-distant future. This also leads to the possibly using triggers on the DB side, but I'm unsure if that's a valid possibility here?

Hopefully this made sense, but if any clarification is needed, please let me know.

--
Ken Lareau
klar...@ifwe.co <mailto:klar...@ifwe.co>
--
You received this message because you are subscribed to the Google Groups "sqlalchemy" group. To unsubscribe from this group and stop receiving emails from it, send an email to sqlalchemy+unsubscr...@googlegroups.com <mailto:sqlalchemy+unsubscr...@googlegroups.com>. To post to this group, send email to sqlalchemy@googlegroups.com <mailto:sqlalchemy@googlegroups.com>.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

--
You received this message because you are subscribed to the Google Groups 
"sqlalchemy" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to sqlalchemy+unsubscr...@googlegroups.com.
To post to this group, send email to sqlalchemy@googlegroups.com.
Visit this group at http://groups.google.com/group/sqlalchemy.
For more options, visit https://groups.google.com/d/optout.

Reply via email to