abhishekmjain commented on code in PR #4167:
URL: https://github.com/apache/gobblin/pull/4167#discussion_r2910845202
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -56,36 +61,67 @@ public abstract class DagProc<T> {
protected final DagTask dagTask;
@Getter protected final Dag.DagId dagId;
@Getter protected final DagNodeId dagNodeId;
+ protected final Config config;
protected static final MetricContext metricContext =
Instrumented.getMetricContext(new State(), DagProc.class);
protected static final EventSubmitter eventSubmitter = new
EventSubmitter.Builder(
metricContext, "org.apache.gobblin.service").build();
public DagProc(DagTask dagTask, Config config) {
this.dagTask = dagTask;
+ this.config = config;
this.dagId =
DagUtils.generateDagId(this.dagTask.getDagAction().getFlowGroup(),
this.dagTask.getDagAction().getFlowName(),
this.dagTask.getDagAction().getFlowExecutionId());
this.dagNodeId = this.dagTask.getDagAction().getDagNodeId();
}
+ /**
+ * Main processing method for DagProc that orchestrates the full lifecycle:
+ * 1. Sets up MDC context for flow/job identification
+ * 2. Initializes state
+ * 3. Performs actions
+ * 4. Cleans up MDC context
+ *
+ * @param dagManagementStateStore State store for DAG management operations
+ * @param dagProcEngineMetrics Metrics for tracking DagProc execution
+ * @throws IOException if processing fails
+ */
public final void process(DagManagementStateStore dagManagementStateStore,
DagProcessingEngineMetrics dagProcEngineMetrics) throws IOException {
- T state;
- try {
- logContextualizedInfo("initializing");
- state = initialize(dagManagementStateStore);
- dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), true);
- } catch (Exception e) {
- dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
- throw e;
- }
- logContextualizedInfo("ready to process");
- try {
- act(dagManagementStateStore, state, dagProcEngineMetrics);
- } catch (Exception e) {
- dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
- throw e;
+
+ // Extract flow/job identifiers for MDC context
+ String flowGroup = this.dagId.getFlowGroup();
+ String flowName = this.dagId.getFlowName();
+ String flowExecutionId = String.valueOf(this.dagId.getFlowExecutionId());
+ String jobName = this.dagNodeId != null ? this.dagNodeId.getJobName() :
JobStatusRetriever.NA_KEY;
+
+ try (
+ Closeable c1 = MDC.putCloseable(ConfigurationKeys.FLOW_GROUP_KEY,
flowGroup);
Review Comment:
where are these getting used?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]