phet commented on code in PR #3896:
URL: https://github.com/apache/gobblin/pull/3896#discussion_r1540527516


##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java:
##########
@@ -35,38 +36,39 @@
 
 /**
  * Responsible for performing the actual work for a given {@link DagTask} by 
first initializing its state, performing
- * actions based on the type of {@link DagTask} and finally submitting an 
event to the executor.
+ * actions based on the type of {@link DagTask}. Submitting events in time is 
important (PR#3641), hence initialize and
+ * act methods submit events as they happen.
  */
 @Alpha
 @Slf4j
 @RequiredArgsConstructor
-public abstract class DagProc<S, T> {
+public abstract class DagProc<S> {
   protected final DagTask dagTask;
+  @Getter protected final DagManager.DagId dagId;
+  @Getter protected final DagNodeId dagNodeId;
   protected static final MetricContext metricContext = 
Instrumented.getMetricContext(new State(), DagProc.class);
   protected static final EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(
       metricContext, "org.apache.gobblin.service").build();
 
-  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
-    S state = initialize(dagManagementStateStore);   // todo - retry
-    T result = act(dagManagementStateStore, state);   // todo - retry
-    commit(dagManagementStateStore, result);   // todo - retry
-    log.info("{} successfully concluded actions for dagId : {}", 
getClass().getSimpleName(), getDagId());
-  }
-
-  protected DagManager.DagId getDagId() {
-    return this.dagTask.getDagId();
+  public DagProc(DagTask dagTask) {
+    this.dagTask = dagTask;
+    this.dagId = this.dagTask.getDagId();
+    this.dagNodeId = this.dagTask.getDagNodeId();
   }
 
-  protected DagNodeId getDagNodeId() {
-    return this.dagTask.getDagNodeId();
+  public final void process(DagManagementStateStore dagManagementStateStore) 
throws IOException {
+    S state = initialize(dagManagementStateStore);   // todo - retry
+    act(dagManagementStateStore, state);   // todo - retry
+    commit(dagManagementStateStore);   // todo - retry
+    log.info("{} successfully concluded actions for dagId : {}", 
getClass().getSimpleName(), this.dagId);

Review Comment:
   I don't believe we need `commit`.  do you see a purpose for it? 
   
   nit: "...concluded *processing* for dagId..." (as `process` is the method 
name)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to