Here: from contextlib import closing
from airflow.models.baseoperator import BaseOperator from airflow.providers.odbc.hooks.odbc import OdbcHook class SqlAgentOperator(BaseOperator): def __init__(self, job_name: str, **kwargs): super().__init__(**kwargs) self.job_name = job_name def execute(self, context): cmd = f"execute msdb.dbo.sp_start_job '{self.job_name}'" hook = OdbcHook() with closing(hook.get_conn()) as conn: cur = conn.cursor() cur.execute(cmd) There is a guide called Creating a Custom Operator <https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html> which explains how operators work. Note, I don't think this waits synchronously for the sqlagent job to complete. For that you'd have to add some polling logic.