phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1562777405


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -118,4 +106,19 @@ protected ProcessWorkUnitsWorkflow 
createProcessWorkUnitsWorkflow(Properties job
         .build();
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }
+
+  protected static WUProcessingSpec createProcessingSpec(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
+    JobState jobState = new JobState(jobProps);
+    URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+    Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+    WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+    // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+    if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+        && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+      int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);

Review Comment:
   NBD: but `JobState` does have `contains` and `getPropAsInt`, which would 
allow you to pass in the `JobState` as a param, for better abstraction



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +195,40 @@ public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Colle
     return datasetStatesByUrns;
   }
 
+  public List<DatasetTaskSummary> generateDatasetTaskSummaries(Map<String, 
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext, 
EventSubmitter eventSubmitter) {
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    // Only process successful datasets unless configuration to process failed 
datasets is set
+    boolean processFailedTasks =
+        
PropertiesUtils.getPropAsBoolean(jobContext.getJobState().getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS,
+            "false");
+    for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
+          && jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          // Certain writers may omit these metrics e.g. 
CompactionLauncherWriter
+          if ((taskState.getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED || processFailedTasks)) {
+            totalBytesWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+            totalRecordsWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+          }
+        }
+        log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: 
%d)", datasetState.getDatasetUrn(),
+            totalRecordsWritten, totalBytesWritten));
+        datasetTaskSummaries.add(
+            new DatasetTaskSummary(datasetState.getDatasetUrn(), 
totalRecordsWritten, totalBytesWritten,
+                datasetState.getState() == JobState.RunningState.COMMITTED));
+      } else if (datasetState.getState() == JobState.RunningState.FAILED) {
+        // Check if config is turned on for submitting writer metrics on 
failure due to non-atomic write semantics
+        if (jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {

Review Comment:
   for clarity (and to parallel the `&&` in the `if` alternative above, suggest 
to combine the nested `if` via `&&`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -87,12 +97,19 @@ public int commit(WUProcessingSpec workSpec) {
         return 0;
       }
       Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
-      commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue), 
globalGobblinContext);
+      Map<String, JobState.DatasetState> datasetStatesByUrns = 
createDatasetStatesByUrns(ImmutableList.copyOf(taskStateQueue));
+      commitTaskStates(jobState, datasetStatesByUrns, globalGobblinContext, 
jobNameOpt);
+      List<DatasetTaskSummary> datasetTaskSummaries = 
generateDatasetTaskSummaries(datasetStatesByUrns, globalGobblinContext, 
workSpec.getEventSubmitterContext().create());
+      // Submit event that summarizes work done
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      TemporalEventTimer eventTimer = 
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY);

Review Comment:
   I'd love to streamline... what do you think about:
   ```
   TemporalEventTimer eventTimer = 
TemporalEventTimer.Factory.createEvent(workSpec.getEventSubmitterContext(), 
TimingEvent.LauncherTimings.JOB_SUMMARY);
   ```
   ?
   
   personally, I'd prefer a more fluent API, to support:
   ```
   TemporalEventTimer.Factory.createEvent(workSpec.getEventSubmitterContext(), 
TimingEvent.LauncherTimings.JOB_SUMMARY)
       .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries))
       .submit();
   ```
   for that, we'd just need:
   a. either to change the `addMetadata` return type to support chaining or 
else introduce a `withMetadata` alt. version that does
   b. alias `stop()` to `submit()`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +195,40 @@ public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Colle
     return datasetStatesByUrns;
   }
 
+  public List<DatasetTaskSummary> generateDatasetTaskSummaries(Map<String, 
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext, 
EventSubmitter eventSubmitter) {

Review Comment:
   - suggest more purpose-driven semantic naming, like 
`summarizeDatasetOutcomes`
   
   - no need to take `eventSubmitter` any longer
   
   - `JobContext` is so big and broad, I'd love to get it out of this signature 
and leave it to the caller to pass in only the narrow params actually relevant 
to this work, such as:
   ```
   List<DatasetTaskSummary> summarizeDatasetOutcomes(
       Map<String, JobState.DatasetState> datasetStatesByUrns,
       JobCommitPolicy commitPolicy,
       boolean shouldIncludeFailedTasks)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to