Hi, Ayush!Just use kwargs['dag_run'].conf['message'] See https://www.astronomer.io/guides/trigger-dag-operator/
пн, 6 июл. 2020 г. в 21:41, Ayush Goel <[email protected]>: > Hi, > > Thanks Dileep for clarifying my question. That is the exact thing, I > wanted to ask :) > > Thanks and Regards, > Ayush Goel > > > On Tue, Jul 7, 2020 at 12:08 AM Dileep Kancharla < > [email protected]> wrote: > >> Hi, >> Apologies if I am wrong! >> >> I think the question was how to pass the input arguments through rest api >> call, and how to access those variables inside the dag. >> >> Thanks >> Dileep >> >> On Mon, Jul 6, 2020 at 11:51 PM Li, Richard <[email protected]> wrote: >> >>> Define this way in “val” as a variable reference and pass value from >>> command line, for example, >>> >>> bash_command= “/path/scriptname” + keyname, >>> >>> >>> >>> >>> >>> >>> >>> Richard Li >>> >>> Big Data Engineer @ Product/Service Innovation Development >>> >>> 222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654 >>> >>> 6308632669 >>> >>> >>> >>> >>> >>> *From: *Ayush Goel <[email protected]> >>> *Date: *Monday, July 6, 2020 at 12:59 PM >>> *To: *Richard Li <[email protected]> >>> *Cc: *"[email protected]" <[email protected]> >>> *Subject: *[External] Re: How to access external arguments when passing >>> them using REST API in airflow? >>> >>> >>> >>> Hi Richard, >>> >>> >>> >>> The arguments which we are passing are dynamic and the user does not >>> have access to UI, so he cannot change the variable values each time. >>> >>> >>> >>> We want to give the rest api interface to the user through which he can >>> trigger the dag by himself while passing some user specific arguments. >>> >>> >>> >>> Thanks and Regards, >>> >>> Ayush Goel >>> >>> >>> >>> >>> >>> On Mon, Jul 6, 2020 at 11:14 PM Li, Richard <[email protected]> wrote: >>> >>> From “admin” menu, chose “variable”, input your “key”and “val” values. >>> In python scripts add >>> >>> keyname = Variable.get("key"), use keyname in your dags >>> >>> >>> >>> >>> >>> Richard Li >>> >>> Big Data Engineer @ Product/Service Innovation Development >>> >>> 222 W. Merchandise Mart Plaza | Suite 850 | Chicago, Illinois 60654 >>> >>> 6308632669 >>> >>> >>> >>> >>> >>> *From: *Ayush Goel <[email protected]> >>> *Reply-To: *"[email protected]" <[email protected]> >>> *Date: *Monday, July 6, 2020 at 12:36 PM >>> *To: *"[email protected]" <[email protected]> >>> *Subject: *[External] How to access external arguments when passing >>> them using REST API in airflow? >>> >>> >>> >>> Hi, >>> >>> >>> >>> I tried triggering a airflow dag using the curl: >>> http://localhost:8080/api/experimental/dags/api_test/dag_runs >>> <https://urldefense.com/v3/__http:/localhost:8080/api/experimental/dags/api_test/dag_runs__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRvP6U6aSw$> >>> -H 'Cache-Control: no-cache' >>> -H 'Content-Type: application/json' >>> -d '{"conf":"{"input_path":"value", "output_path":"value" }"}' >>> >>> I am able to trigger the workflow successfully but now I want to access >>> this input_path and output_path inside my PythonOperator code in the >>> airflow dag. >>> >>> Can someone help in this regard? >>> >>> My Dag Code: >>> >>> from datetime import timedelta >>> >>> import logging >>> >>> >>> >>> # The DAG object; we'll need this to instantiate a DAG >>> >>> from airflow import DAG >>> >>> # Operators; we need this to operate! >>> >>> from airflow.operators.python_operator import PythonOperator >>> >>> from airflow.utils.dates import days_ago >>> >>> >>> >>> # These args will get passed on to each operator >>> >>> # You can override them on a per-task basis during operator initialization >>> >>> default_args = { >>> >>> 'owner': 'someone', >>> >>> 'depends_on_past': False, >>> >>> 'start_date': days_ago(2), >>> >>> 'email': ['[email protected]'], >>> >>> 'email_on_failure': False, >>> >>> 'email_on_retry': False, >>> >>> 'retries': 1, >>> >>> 'retry_delay': timedelta(minutes=2) >>> >>> } >>> >>> dag = DAG( >>> >>> 'api_test', >>> >>> default_args=default_args, >>> >>> description='testing rest api calls', >>> >>> schedule_interval=None, >>> >>> ) >>> >>> def run_this_func(**kwargs): >>> >>> logging.info("Trying to print the logs") >>> >>> logging.info(kwargs['conf']) >>> >>> for key,value in kwargs['conf'].items(): >>> >>> logging.info(key) >>> >>> logging.info(value) >>> >>> #logging.info >>> <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path']) >>> >>> #logging.info >>> <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['conf']['output_path']) >>> >>> >>> >>> run_this = PythonOperator( >>> >>> task_id='run_this', >>> >>> python_callable=run_this_func, >>> >>> dag=dag, >>> >>> provide_context=True, >>> >>> ) >>> >>> >>> >>> run_this >>> >>> I tried printing the arguments by logging.info >>> <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$>(kwargs['dag_run'].conf['input_path']) >>> logging.info >>> <https://urldefense.com/v3/__http:/logging.info__;!!IIU9BLNPZ2ob!bzFlFU4TxDdurM-OBcl6Fpj1f_AHAfkulbMVFtUPEX2sxn3leOZOmRtQNTdTJw$> >>> (kwargs['conf']['output_path']) >>> >>> But both of these lines are giving me errors. Is there any way of >>> accessing the input_path and output_path? >>> >>> >>> Thanks and Regards, >>> >>> Ayush Goel >>> >>> -- С уважением, Николаев Дмитрий
