Here:

from contextlib import closing

from airflow.models.baseoperator import BaseOperator
from airflow.providers.odbc.hooks.odbc import OdbcHook


class SqlAgentOperator(BaseOperator):
    def __init__(self, job_name: str, **kwargs):
        super().__init__(**kwargs)
        self.job_name = job_name

    def execute(self, context):
        cmd = f"execute msdb.dbo.sp_start_job '{self.job_name}'"
        hook = OdbcHook()
        with closing(hook.get_conn()) as conn:
            cur = conn.cursor()
            cur.execute(cmd)


There is a guide called Creating a Custom Operator
<https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html>
which explains how operators work.

Note, I don't think this waits synchronously for the sqlagent job to
complete.  For that you'd have to add some polling logic.

Reply via email to