amoghrajesh commented on code in PR #68213: URL: https://github.com/apache/airflow/pull/68213#discussion_r3372955181
##########
task-sdk/src/airflow/sdk/bases/resumablejobmixin.py:
##########
@@ -101,29 +107,56 @@ def execute_resumable(self, context: Context) -> Any:
Closing this window would require atomic "submit + persist", which is
not possible across
an external system boundary.
"""
- task_store = context.get("task_store")
+ operator_tag = {"operator": type(self).__name__}
+
+ with tracer.start_as_current_span("resumable_job.resume_decision") as
span:
Review Comment:
I pulled up the raw data:
```
{
"data": [
{
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spans": [
{
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "8d2b5e920557d213",
"operationName": "resumable_job.resume_decision",
"references": [
{
"refType": "CHILD_OF",
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "3147d2475a70939e"
}
],
"startTime": 1780918052877673,
"duration": 13454,
"tags": [
{
"key": "otel.library.name",
"type": "string",
"value": "airflow.sdk.bases.resumablejobmixin"
},
{
"key": "operator",
"type": "string",
"value": "MockBatchOperator"
},
{
"key": "resumable.external_id_key",
"type": "string",
"value": "job_id"
},
{
"key": "resumable.decision",
"type": "string",
"value": "fresh_submit"
},
{
"key": "span.kind",
"type": "string",
"value": "internal"
},
{
"key": "internal.span.format",
"type": "string",
"value": "otlp"
}
],
"logs": [],
"processID": "p1",
"warnings": null
},
{
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "4a0dc3bd6c801a36",
"operationName": "resumable_job.resume_decision",
"references": [
{
"refType": "CHILD_OF",
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "510623f51c85b4a1"
}
],
"startTime": 1780918072213237,
"duration": 61069106,
"tags": [
{
"key": "otel.library.name",
"type": "string",
"value": "airflow.sdk.bases.resumablejobmixin"
},
{
"key": "operator",
"type": "string",
"value": "MockBatchOperator"
},
{
"key": "resumable.external_id_key",
"type": "string",
"value": "job_id"
},
{
"key": "resumable.external_id",
"type": "string",
"value": "batch-868odtun"
},
{
"key": "resumable.prior_status",
"type": "string",
"value": "RUNNING"
},
{
"key": "resumable.decision",
"type": "string",
"value": "reconnect"
},
{
"key": "span.kind",
"type": "string",
"value": "internal"
},
{
"key": "internal.span.format",
"type": "string",
"value": "otlp"
}
],
"logs": [],
"processID": "p1",
"warnings": null
},
{
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "510623f51c85b4a1",
"operationName": "worker.run_batch_job",
"references": [
{
"refType": "CHILD_OF",
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "7d222698e66e4a01"
}
],
"startTime": 1780918071782794,
"duration": 61546017,
"tags": [
{
"key": "otel.library.name",
"type": "string",
"value": "airflow.sdk.execution_time.task_runner"
},
{
"key": "airflow.dag_id",
"type": "string",
"value": "example_resumable_job"
},
{
"key": "airflow.task_id",
"type": "string",
"value": "run_batch_job"
},
{
"key": "airflow.dag_run.run_id",
"type": "string",
"value":
"manual__2026-06-08T11:27:28.145361+00:00"
},
{
"key": "airflow.task_instance.try_number",
"type": "int64",
"value": 2
},
{
"key": "airflow.task_instance.map_index",
"type": "int64",
"value": -1
},
{
"key": "span.kind",
"type": "string",
"value": "internal"
},
{
"key": "internal.span.format",
"type": "string",
"value": "otlp"
}
],
"logs": [],
"processID": "p1",
"warnings": null
},
{
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "7d222698e66e4a01",
"operationName": "task_run.run_batch_job",
"references": [
{
"refType": "CHILD_OF",
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "18e7cf0be40a97f8"
}
],
"startTime": 1780918063591456,
"duration": 69727984,
"tags": [
{
"key": "otel.library.name",
"type": "string",
"value":
"airflow.api_fastapi.execution_api.routes.task_instances"
},
{
"key": "airflow.dag_id",
"type": "string",
"value": "example_resumable_job"
},
{
"key": "airflow.task_id",
"type": "string",
"value": "run_batch_job"
},
{
"key": "airflow.dag_run.run_id",
"type": "string",
"value":
"manual__2026-06-08T11:27:28.145361+00:00"
},
{
"key": "airflow.task_instance.try_number",
"type": "int64",
"value": 2
},
{
"key": "airflow.task_instance.map_index",
"type": "int64",
"value": -1
},
{
"key": "airflow.task_instance.state",
"type": "string",
"value": "success"
},
{
"key": "airflow.task_instance.id",
"type": "string",
"value": "019ea6fc-fde1-723b-a4da-c1691de5aff5"
},
{
"key": "span.kind",
"type": "string",
"value": "internal"
},
{
"key": "otel.status_code",
"type": "string",
"value": "OK"
},
{
"key": "internal.span.format",
"type": "string",
"value": "otlp"
}
],
"logs": [],
"processID": "p1",
"warnings": null
},
{
"traceID": "51d62adaa8f8c0fed09bd7f29696c7bd",
"spanID": "18e7cf0be40a97f8",
"operationName": "dag_run.example_resumable_job",
"references": [],
"startTime": 1780918048165949,
"duration": 85982946,
"tags": [
{
"key": "otel.library.name",
"type": "string",
"value": "airflow.models.dagrun"
},
{
"key": "airflow.dag_id",
"type": "string",
"value": "example_resumable_job"
},
{
"key": "airflow.dag_run.run_id",
"type": "string",
"value":
"manual__2026-06-08T11:27:28.145361+00:00"
},
{
"key": "airflow.dag_run.start_date",
"type": "string",
"value": "2026-06-08 11:27:28.319135+00:00"
},
{
"key": "airflow.dag_run.end_date",
"type": "string",
"value": "2026-06-08 11:28:54.145508+00:00"
},
{
"key": "airflow.dag_run.queued_at",
"type": "string",
"value": "2026-06-08 11:27:28.165950+00:00"
},
{
"key": "airflow.dag_run.created_at",
"type": "string",
"value": "2026-06-08 11:27:28.173554+00:00"
},
{
"key": "airflow.dag_run.logical_date",
"type": "string",
"value": "2026-06-08 11:27:27+00:00"
},
{
"key": "span.kind",
"type": "string",
"value": "internal"
},
{
"key": "otel.status_code",
"type": "string",
"value": "OK"
},
{
"key": "internal.span.format",
"type": "string",
"value": "otlp"
}
],
"logs": [],
"processID": "p1",
"warnings": null
}
],
"processes": {
"p1": {
"serviceName": "test",
"tags": [
{
"key": "telemetry.sdk.language",
"type": "string",
"value": "python"
},
{
"key": "telemetry.sdk.name",
"type": "string",
"value": "opentelemetry"
},
{
"key": "telemetry.sdk.version",
"type": "string",
"value": "1.41.1"
}
]
}
},
"warnings": null
}
],
"total": 0,
"limit": 0,
"offset": 0,
"errors": null
}
```
From this I can tell that this ins't under dagrun but a floating span. The
floating span is expected and it's a direct consequence of the crash scenario
`ResumableJobMixin` is designed for.
Try 1's worker process died before it could flush its buffered spans to the
collector, so the parent `worker.run_batch_job` span from that try was never
exported. The child (`resumable_job.resume_decision`) made it out because it
completed before the crash, but its parent reference now points to a span that
doesn't exist in jaeger.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
