[ https://issues.apache.org/jira/browse/COMMONSSITE-163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17605957#comment-17605957 ]
Raj Prakash Kante commented on COMMONSSITE-163: ----------------------------------------------- Actually my software is not listed in the projects list. On Sat, Sep 17, 2022, 12:02 AM Gary D. Gregory (Jira) <j...@apache.org> > Xcom not returned while using 'BeamRunJavaPipelineOperator'. > ------------------------------------------------------------ > > Key: COMMONSSITE-163 > URL: https://issues.apache.org/jira/browse/COMMONSSITE-163 > Project: Apache Commons All > Issue Type: Bug > Reporter: Raj Prakash Kante > Priority: Major > > I was using 'BeamRunJavaPipelineOperator' to run a java jar to ingest data > using dataflow from google cloud storage to Bigquery using a airflow DAG.The > dataflow job is submitted successfully but I want to wait until the dataflow > job runs successfully in the background and then move on to the next task.I > am thinking to tackle this using 'DataflowJobStatusSensor' which checks the > status of the job in the background. This requires the job ID we want to > check which is supposed to be returned as a Xcom by the > 'BeamRunJavaPipelineOperator' but it does not return the desired Xcom. > > > > start_java_pipeline = BeamRunJavaPipelineOperator( > task_id="start_java_pipeline", > runner='dataflow', > jar="<path-to-java-jar>", > pipeline_options={'airflowBucket': '<bucket-path>', > 'jobName': '<job-name>', > 'inputfileBucket': '<input-file-path>', > 'maxNumWorkers': '10', > 'targetTableProject': '<Project-name>', > 'datasetName': '<dataset-name>', > 'serviceAccount': '<service-account>', > 'runConfig': '<path-config-files>', > 'project': '<project-name>', > 'workerMachineType': 'n1-standard-2', > 'region': '<region>', > 'subnetwork': "<subnetwork>", > 'usePublicIps': 'false', > 'stagingLocation': '<Staging-loaction>', > 'tempLocation': '<temp-location>' > }, > job_class='<class-name-in-jar>', > do_xcom_push=True, > dag=dag) > > > wait_for_done = DataflowJobStatusSensor( > task_id="wait-for-java-dataflow", > job_id="\{{task_instance.xcom_pull('Get_job_id')}}", > expected_statuses=\{DataflowJobStatus.JOB_STATE_DONE}, > project_id="xxx-xx-xxx", > gcp_conn_id='google_cloud_default', > location='us-central1', > ) > start_java_pipeline >> wait_for_done > > Using "DataFlowJavaOperator" I am able to get the jobid to the xcom and fetch > the same using "DataflowJobStatusSensor" without any issues.But this is a > deprecated operator. > -- This message was sent by Atlassian Jira (v8.20.10#820010)