nclaeys opened a new issue, #29974:
URL: https://github.com/apache/airflow/issues/29974

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   We are using Airflow 2.4.3.
   
   When looking at the documentation for the EmptyOperator, it says explicitly 
that it is never processed by the executor. 
   However what I notice is that in our cases it differs between start and end 
EmptyOperators. The start tasks are not processed by the executor but for some 
reason the end tasks are for some reason.
   
   This results in unexpected behavior and is inefficient as it creates a pod 
on kubernetes in our case for no reason. Additionally, it causes some weird 
behavior in our lineage graphs.
   
   For the start task we see no logs:
   ```
   *** Log file does not exist: 
/opt/airflow/logs/dag_id=dbt-datahub/run_id=scheduled__2023-03-07T00:00:00+00:00/task_id=initial_task_start/attempt=1.log
   *** Fetching from: 
http://:8793/log/dag_id=dbt-datahub/run_id=scheduled__2023-03-07T00:00:00+00:00/task_id=initial_task_start/attempt=1.log
   *** Failed to fetch log file from worker. Request URL is missing an 
'http://' or 'https://' protocol.
   ```
   
   ```
   dbtdatahubend-dc6d51700abc41e0974b46caafd857ac
   *** Reading local file: 
/opt/airflow/logs/dag_id=dbt-datahub/run_id=manual__2023-03-07T16:56:07.937548+00:00/task_id=end/attempt=1.log
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: dbt-datahub.end manual__2023-03-07T16:56:07.937548+00:00 
[queued]>
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1165} INFO - Dependencies all 
met for <TaskInstance: dbt-datahub.end manual__2023-03-07T16:56:07.937548+00:00 
[queued]>
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1362} INFO - 
   
--------------------------------------------------------------------------------
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1363} INFO - Starting attempt 1 
of 1
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1364} INFO - 
   
--------------------------------------------------------------------------------
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1383} INFO - Executing 
<Task(EmptyOperator): end> on 2023-03-07 16:56:07.937548+00:00
   [2023-03-07, 16:56:31 UTC] {standard_task_runner.py:55} INFO - Started 
process 19 to run task
   [2023-03-07, 16:56:31 UTC] {standard_task_runner.py:82} INFO - Running: 
['airflow', 'tasks', 'run', 'dbt-datahub', 'end', 
'manual__2023-03-07T16:56:07.937548+00:00', '--job-id', '24', '--raw', 
'--subdir', 'DAGS_FOLDER/dbt-datahub/dbt-datahub.py', '--cfg-path', 
'/tmp/tmpdr42kl3k']
   [2023-03-07, 16:56:31 UTC] {standard_task_runner.py:83} INFO - Job 24: 
Subtask end
   [2023-03-07, 16:56:31 UTC] {task_command.py:376} INFO - Running 
<TaskInstance: dbt-datahub.end manual__2023-03-07T16:56:07.937548+00:00 
[running]> on host dbtdatahubend-dc6d51700abc41e0974b46caafd857ac
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1590} INFO - Exporting the 
following env vars:
   AIRFLOW_CTX_DAG_OWNER=Conveyor
   AIRFLOW_CTX_DAG_ID=dbt-datahub
   AIRFLOW_CTX_TASK_ID=end
   AIRFLOW_CTX_EXECUTION_DATE=2023-03-07T16:56:07.937548+00:00
   AIRFLOW_CTX_TRY_NUMBER=1
   AIRFLOW_CTX_DAG_RUN_ID=manual__2023-03-07T16:56:07.937548+00:00
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:1401} INFO - Marking task as 
SUCCESS. dag_id=dbt-datahub, task_id=end, execution_date=20230307T165607, 
start_date=20230307T165631, end_date=20230307T165631
   [2023-03-07, 16:56:31 UTC] {base.py:71} INFO - Using connection ID 
'datahub_rest_default' for task execution.
   [2023-03-07, 16:56:31 UTC] {base.py:71} INFO - Using connection ID 
'datahub_rest_default' for task execution.
   [2023-03-07, 16:56:31 UTC] {_plugin.py:147} INFO - Emitting Datahub 
Dataflow: DataFlow(urn=<datahub.utilities.urns.data_flow_urn.DataFlowUrn object 
at 0x7fb9ced397c0>, id='dbt-datahub', orchestrator='airflow', cluster='prod', 
name=None, description='None\n\n', properties={'_access_control': 'None', 
'_default_view': "'grid'", 'catchup': 'True', 'fileloc': 
"'/opt/airflow/dags/dbt-datahub/dbt-datahub.py'", 'is_paused_upon_creation': 
'None', 'start_date': 'None', 'tags': '[]', 'timezone': "Timezone('UTC')"}, 
url='https://app.dev.datafy.cloud/environments/datahubtest/airflow/tree?dag_id=dbt-datahub',
 tags=set(), owners={'Conveyor'})
   [2023-03-07, 16:56:31 UTC] {_plugin.py:165} INFO - Emitting Datahub Datajob: 
DataJob(id='end', urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 
0x7fb9cecbbfa0>, flow_urn=<datahub.utilities.urns.data_flow_urn.DataFlowUrn 
object at 0x7fb9cecbf910>, name=None, description=None, 
properties={'depends_on_past': 'False', 'email': '[]', 'label': "'end'", 
'execution_timeout': 'None', 'sla': 'None', 'task_id': "'end'", 'trigger_rule': 
"<TriggerRule.ALL_SUCCESS: 'all_success'>", 'wait_for_downstream': 'False', 
'downstream_task_ids': 'set()', 'inlets': '[]', 'outlets': '[]'}, 
url='https://app.dev.datafy.cloud/environments/datahubtest/airflow/taskinstance/list/?flt1_dag_id_equals=dbt-datahub&_flt_3_task_id=end',
 tags=set(), owners={'Conveyor'}, group_owners=set(), inlets=[], outlets=[], 
upstream_urns=[<datahub.utilities.urns.data_job_urn.DataJobUrn object at 
0x7fb9cecbbc10>])
   [2023-03-07, 16:56:31 UTC] {_plugin.py:179} INFO - Emitted Start Datahub 
Dataprocess Instance: 
DataProcessInstance(id='dbt-datahub_end_manual__2023-03-07T16:56:07.937548+00:00',
 urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn 
object at 0x7fb9cecbb040>, orchestrator='airflow', cluster='prod', 
type='BATCH_AD_HOC', 
template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 
0x7fb9cecbbfa0>, parent_instance=None, properties={'run_id': 
'manual__2023-03-07T16:56:07.937548+00:00', 'duration': '0.163779', 
'start_date': '2023-03-07 16:56:31.157871+00:00', 'end_date': '2023-03-07 
16:56:31.321650+00:00', 'execution_date': '2023-03-07 16:56:07.937548+00:00', 
'try_number': '1', 'hostname': 
'dbtdatahubend-dc6d51700abc41e0974b46caafd857ac', 'max_tries': '0', 
'external_executor_id': 'None', 'pid': '19', 'state': 'success', 'operator': 
'EmptyOperator', 'priority_weight': '1', 'unixname': 'airflow', 'log_url': 
'https://app.dev.datafy.cloud/environments/datahu
 
btest/airflow/log?execution_date=2023-03-07T16%3A56%3A07.937548%2B00%3A00&task_id=end&dag_id=dbt-datahub&map_index=-1'},
 
url='https://app.dev.datafy.cloud/environments/datahubtest/airflow/log?execution_date=2023-03-07T16%3A56%3A07.937548%2B00%3A00&task_id=end&dag_id=dbt-datahub&map_index=-1',
 inlets=[], outlets=[], upstream_urns=[])
   [2023-03-07, 16:56:31 UTC] {_plugin.py:191} INFO - Emitted Completed Data 
Process Instance: 
DataProcessInstance(id='dbt-datahub_end_manual__2023-03-07T16:56:07.937548+00:00',
 urn=<datahub.utilities.urns.data_process_instance_urn.DataProcessInstanceUrn 
object at 0x7fb9ced39700>, orchestrator='airflow', cluster='prod', 
type='BATCH_SCHEDULED', 
template_urn=<datahub.utilities.urns.data_job_urn.DataJobUrn object at 
0x7fb9cecbbfa0>, parent_instance=None, properties={}, url=None, inlets=[], 
outlets=[], upstream_urns=[])
   [2023-03-07, 16:56:31 UTC] {local_task_job.py:159} INFO - Task exited with 
return code 0
   [2023-03-07, 16:56:31 UTC] {taskinstance.py:2623} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   
   ### What you think should happen instead
   
   I expect it to be consistent and that no matter whether the EmptyOperator is 
in your dag, the same behavior is observed (it is never processed by the 
executor-.
   
   ### How to reproduce
   
   Create 1 dag containing: 
   - a start emptyOperator task
   - a random task (in our case a simple containerTask)
   - an end emptyOperator task
   
   ### Operating System
   
   kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==6.0.0
   apache-airflow-providers-celery==3.0.0
   apache-airflow-providers-cncf-kubernetes==4.0.2
   apache-airflow-providers-common-sql==1.3.3
   apache-airflow-providers-docker==3.2.0
   apache-airflow-providers-elasticsearch==4.2.1
   apache-airflow-providers-ftp==3.3.0
   apache-airflow-providers-google==8.4.0
   apache-airflow-providers-grpc==3.0.0
   apache-airflow-providers-hashicorp==3.1.0
   apache-airflow-providers-http==4.1.1
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-microsoft-azure==4.3.0
   apache-airflow-providers-mysql==3.2.1
   apache-airflow-providers-odbc==3.1.2
   apache-airflow-providers-opsgenie==3.1.0
   apache-airflow-providers-postgres==5.2.2
   apache-airflow-providers-redis==3.0.0
   apache-airflow-providers-sendgrid==3.0.0
   apache-airflow-providers-sftp==4.1.0
   apache-airflow-providers-slack==4.2.3
   apache-airflow-providers-sqlite==3.3.1
   apache-airflow-providers-ssh==3.2.0
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   /
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to