phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1547045095


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +187,42 @@ public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Colle
     return datasetStatesByUrns;
   }
 
+  public void summarizeDatasetFileMetrics(Map<String, JobState.DatasetState> 
datasetStatesByUrns, JobContext jobContext, EventSubmitter eventSubmitter) {
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    // Only process successful datasets unless configuration to process failed 
datasets is set
+    boolean processFailedTasks =
+        
PropertiesUtils.getPropAsBoolean(jobContext.getJobState().getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS,
+            "false");
+    for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
+          && jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          // Certain writers may omit these metrics e.g. 
CompactionLauncherWriter
+          if ((taskState.getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED || processFailedTasks)) {
+            totalBytesWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+            totalRecordsWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+          }
+        }
+        log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: 
%d)", datasetState.getDatasetUrn(),
+            totalRecordsWritten, totalBytesWritten));
+        datasetTaskSummaries.add(
+            new DatasetTaskSummary(datasetState.getDatasetUrn(), 
totalRecordsWritten, totalBytesWritten,
+                datasetState.getState() == JobState.RunningState.COMMITTED));
+      } else if (datasetState.getState() == JobState.RunningState.FAILED) {
+        // Check if config is turned on for submitting writer metrics on 
failure due to non-atomic write semantics
+        if (jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+          log.info("Due to task failure, will report that no records or bytes 
were written for " + datasetState.getDatasetUrn());
+          datasetTaskSummaries.add(new 
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false));
+        }
+      }
+    }
+    TimingEvent jobSummaryTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SUMMARY);
+    jobSummaryTimer.addMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries));
+    jobSummaryTimer.stop();

Review Comment:
   better separation of concerns would be to calculate the value only, return 
that, and then elsewhere to submit the event.  
   
   returning `void` is not infrequently a sign of lumping together calculation 
w/ side-effects.  far more testable is to calculate results in "pure functions" 
and then separately perform side-effects on a value that itself is unchanged 
while performing effects (i.e. no further calculation)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to