amoghrajesh commented on code in PR #68213:
URL: https://github.com/apache/airflow/pull/68213#discussion_r3372955181


##########
task-sdk/src/airflow/sdk/bases/resumablejobmixin.py:
##########
@@ -101,29 +107,56 @@ def execute_resumable(self, context: Context) -> Any:
         Closing this window would require atomic "submit + persist", which is 
not possible across
         an external system boundary.
         """
-        task_store = context.get("task_store")
+        operator_tag = {"operator": type(self).__name__}
+
+        with tracer.start_as_current_span("resumable_job.resume_decision") as 
span:

Review Comment:
   I pulled up the raw data:
   
   ```
   {
       "data": [
           {
               "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
               "spans": [
                   {
                       "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                       "spanID": "8d2b5e920557d213",
                       "operationName": "resumable_job.resume_decision",
                       "references": [
                           {
                               "refType": "CHILD_OF",
                               "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                               "spanID": "3147d2475a70939e"
                           }
                       ],
                       "startTime": 1780918052877673,
                       "duration": 13454,
                       "tags": [
                           {
                               "key": "otel.library.name",
                               "type": "string",
                               "value": "airflow.sdk.bases.resumablejobmixin"
                           },
                           {
                               "key": "operator",
                               "type": "string",
                               "value": "MockBatchOperator"
                           },
                           {
                               "key": "resumable.external_id_key",
                               "type": "string",
                               "value": "job_id"
                           },
                           {
                               "key": "resumable.decision",
                               "type": "string",
                               "value": "fresh_submit"
                           },
                           {
                               "key": "span.kind",
                               "type": "string",
                               "value": "internal"
                           },
                           {
                               "key": "internal.span.format",
                               "type": "string",
                               "value": "otlp"
                           }
                       ],
                       "logs": [],
                       "processID": "p1",
                       "warnings": null
                   },
                   {
                       "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                       "spanID": "4a0dc3bd6c801a36",
                       "operationName": "resumable_job.resume_decision",
                       "references": [
                           {
                               "refType": "CHILD_OF",
                               "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                               "spanID": "510623f51c85b4a1"
                           }
                       ],
                       "startTime": 1780918072213237,
                       "duration": 61069106,
                       "tags": [
                           {
                               "key": "otel.library.name",
                               "type": "string",
                               "value": "airflow.sdk.bases.resumablejobmixin"
                           },
                           {
                               "key": "operator",
                               "type": "string",
                               "value": "MockBatchOperator"
                           },
                           {
                               "key": "resumable.external_id_key",
                               "type": "string",
                               "value": "job_id"
                           },
                           {
                               "key": "resumable.external_id",
                               "type": "string",
                               "value": "batch-868odtun"
                           },
                           {
                               "key": "resumable.prior_status",
                               "type": "string",
                               "value": "RUNNING"
                           },
                           {
                               "key": "resumable.decision",
                               "type": "string",
                               "value": "reconnect"
                           },
                           {
                               "key": "span.kind",
                               "type": "string",
                               "value": "internal"
                           },
                           {
                               "key": "internal.span.format",
                               "type": "string",
                               "value": "otlp"
                           }
                       ],
                       "logs": [],
                       "processID": "p1",
                       "warnings": null
                   },
                   {
                       "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                       "spanID": "510623f51c85b4a1",
                       "operationName": "worker.run_batch_job",
                       "references": [
                           {
                               "refType": "CHILD_OF",
                               "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                               "spanID": "7d222698e66e4a01"
                           }
                       ],
                       "startTime": 1780918071782794,
                       "duration": 61546017,
                       "tags": [
                           {
                               "key": "otel.library.name",
                               "type": "string",
                               "value": "airflow.sdk.execution_time.task_runner"
                           },
                           {
                               "key": "airflow.dag_id",
                               "type": "string",
                               "value": "example_resumable_job"
                           },
                           {
                               "key": "airflow.task_id",
                               "type": "string",
                               "value": "run_batch_job"
                           },
                           {
                               "key": "airflow.dag_run.run_id",
                               "type": "string",
                               "value": 
"manual__2026-06-08T11:27:28.145361+00:00"
                           },
                           {
                               "key": "airflow.task_instance.try_number",
                               "type": "int64",
                               "value": 2
                           },
                           {
                               "key": "airflow.task_instance.map_index",
                               "type": "int64",
                               "value": -1
                           },
                           {
                               "key": "span.kind",
                               "type": "string",
                               "value": "internal"
                           },
                           {
                               "key": "internal.span.format",
                               "type": "string",
                               "value": "otlp"
                           }
                       ],
                       "logs": [],
                       "processID": "p1",
                       "warnings": null
                   },
                   {
                       "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                       "spanID": "7d222698e66e4a01",
                       "operationName": "task_run.run_batch_job",
                       "references": [
                           {
                               "refType": "CHILD_OF",
                               "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                               "spanID": "18e7cf0be40a97f8"
                           }
                       ],
                       "startTime": 1780918063591456,
                       "duration": 69727984,
                       "tags": [
                           {
                               "key": "otel.library.name",
                               "type": "string",
                               "value": 
"airflow.api_fastapi.execution_api.routes.task_instances"
                           },
                           {
                               "key": "airflow.dag_id",
                               "type": "string",
                               "value": "example_resumable_job"
                           },
                           {
                               "key": "airflow.task_id",
                               "type": "string",
                               "value": "run_batch_job"
                           },
                           {
                               "key": "airflow.dag_run.run_id",
                               "type": "string",
                               "value": 
"manual__2026-06-08T11:27:28.145361+00:00"
                           },
                           {
                               "key": "airflow.task_instance.try_number",
                               "type": "int64",
                               "value": 2
                           },
                           {
                               "key": "airflow.task_instance.map_index",
                               "type": "int64",
                               "value": -1
                           },
                           {
                               "key": "airflow.task_instance.state",
                               "type": "string",
                               "value": "success"
                           },
                           {
                               "key": "airflow.task_instance.id",
                               "type": "string",
                               "value": "019ea6fc-fde1-723b-a4da-c1691de5aff5"
                           },
                           {
                               "key": "span.kind",
                               "type": "string",
                               "value": "internal"
                           },
                           {
                               "key": "otel.status_code",
                               "type": "string",
                               "value": "OK"
                           },
                           {
                               "key": "internal.span.format",
                               "type": "string",
                               "value": "otlp"
                           }
                       ],
                       "logs": [],
                       "processID": "p1",
                       "warnings": null
                   },
                   {
                       "traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
                       "spanID": "18e7cf0be40a97f8",
                       "operationName": "dag_run.example_resumable_job",
                       "references": [],
                       "startTime": 1780918048165949,
                       "duration": 85982946,
                       "tags": [
                           {
                               "key": "otel.library.name",
                               "type": "string",
                               "value": "airflow.models.dagrun"
                           },
                           {
                               "key": "airflow.dag_id",
                               "type": "string",
                               "value": "example_resumable_job"
                           },
                           {
                               "key": "airflow.dag_run.run_id",
                               "type": "string",
                               "value": 
"manual__2026-06-08T11:27:28.145361+00:00"
                           },
                           {
                               "key": "airflow.dag_run.start_date",
                               "type": "string",
                               "value": "2026-06-08 11:27:28.319135+00:00"
                           },
                           {
                               "key": "airflow.dag_run.end_date",
                               "type": "string",
                               "value": "2026-06-08 11:28:54.145508+00:00"
                           },
                           {
                               "key": "airflow.dag_run.queued_at",
                               "type": "string",
                               "value": "2026-06-08 11:27:28.165950+00:00"
                           },
                           {
                               "key": "airflow.dag_run.created_at",
                               "type": "string",
                               "value": "2026-06-08 11:27:28.173554+00:00"
                           },
                           {
                               "key": "airflow.dag_run.logical_date",
                               "type": "string",
                               "value": "2026-06-08 11:27:27+00:00"
                           },
                           {
                               "key": "span.kind",
                               "type": "string",
                               "value": "internal"
                           },
                           {
                               "key": "otel.status_code",
                               "type": "string",
                               "value": "OK"
                           },
                           {
                               "key": "internal.span.format",
                               "type": "string",
                               "value": "otlp"
                           }
                       ],
                       "logs": [],
                       "processID": "p1",
                       "warnings": null
                   }
               ],
               "processes": {
                   "p1": {
                       "serviceName": "test",
                       "tags": [
                           {
                               "key": "telemetry.sdk.language",
                               "type": "string",
                               "value": "python"
                           },
                           {
                               "key": "telemetry.sdk.name",
                               "type": "string",
                               "value": "opentelemetry"
                           },
                           {
                               "key": "telemetry.sdk.version",
                               "type": "string",
                               "value": "1.41.1"
                           }
                       ]
                   }
               },
               "warnings": null
           }
       ],
       "total": 0,
       "limit": 0,
       "offset": 0,
       "errors": null
   }
   ```
   
   From this I can tell that this ins't under dagrun but a floating span. The 
floating span is expected and it's a direct consequence of the crash scenario 
`ResumableJobMixin` is designed for. 
   
   Try 1's worker process died before it could flush its buffered spans to the 
collector, so the parent `worker.run_batch_job` span from that try was never 
exported. The child (`resumable_job.resume_decision`) made it out because it 
completed before the crash, but its parent reference now points to a span that 
doesn't exist in jaeger.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to