phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1547037631


##########
gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java:
##########
@@ -44,6 +45,7 @@
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;

Review Comment:
   these two imports may have snuck in... seem unused



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -141,22 +151,18 @@ public Callable<Void> apply(final Map.Entry<String, 
JobState.DatasetState> entry
 
       IteratorExecutor.logFailures(result, null, 10);
 
-      Set<String> failedDatasetUrns = new HashSet<>();
-      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
-        // Set the overall job state to FAILED if the job failed to process 
any dataset
-        if (datasetState.getState() == JobState.RunningState.FAILED) {
-          failedDatasetUrns.add(datasetState.getDatasetUrn());
-        }
-      }
+      Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
+          .filter(datasetState -> datasetState.getState() == 
JobState.RunningState.FAILED)
+          .collect(HashSet::new, (set, datasetState) -> 
set.add(datasetState.getDatasetUrn()), HashSet::addAll);

Review Comment:
   can you use
   ```
   .map(JobState.DatasetState::getDatasetUrn)
   .collect(Collectors.toCollection(HashSet::new))
   ```
   ?



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java:
##########
@@ -77,9 +77,9 @@ public static String 
qualifyNamePerExecWithoutFlowExecId(String name, Config wor
   }
 
   /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */

Review Comment:
   looks like `workerConfig` is out-of-date



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -68,11 +76,11 @@ public class CommitActivityImpl implements CommitActivity {
   public int commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
-    String jobName = UNDEFINED_JOB_NAME;
+    Optional<String> jobNameOpt = Optional.empty();
     try {
       FileSystem fs = Help.loadFileSystem(workSpec);
       JobState jobState = Help.loadJobState(workSpec, fs);
-      jobName = jobState.getJobName();
+      jobNameOpt = Optional.of(jobState.getJobName());

Review Comment:
   `Optional::ofNullable`?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java:
##########
@@ -93,7 +93,7 @@ public class JobContext implements Closeable {
   private final String jobId;
   private final String jobSequence;
   private final JobState jobState;
-  @Getter(AccessLevel.PACKAGE)
+  @Getter(AccessLevel.PUBLIC)

Review Comment:
   I believe you can omit the `AccessLevel`, as it's `public`



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +187,42 @@ public static Map<String, JobState.DatasetState> 
createDatasetStatesByUrns(Colle
     return datasetStatesByUrns;
   }
 
+  public void summarizeDatasetFileMetrics(Map<String, JobState.DatasetState> 
datasetStatesByUrns, JobContext jobContext, EventSubmitter eventSubmitter) {
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    // Only process successful datasets unless configuration to process failed 
datasets is set
+    boolean processFailedTasks =
+        
PropertiesUtils.getPropAsBoolean(jobContext.getJobState().getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS,
+            "false");
+    for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
+          && jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          // Certain writers may omit these metrics e.g. 
CompactionLauncherWriter
+          if ((taskState.getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED || processFailedTasks)) {
+            totalBytesWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+            totalRecordsWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+          }
+        }
+        log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: 
%d)", datasetState.getDatasetUrn(),
+            totalRecordsWritten, totalBytesWritten));
+        datasetTaskSummaries.add(
+            new DatasetTaskSummary(datasetState.getDatasetUrn(), 
totalRecordsWritten, totalBytesWritten,
+                datasetState.getState() == JobState.RunningState.COMMITTED));
+      } else if (datasetState.getState() == JobState.RunningState.FAILED) {
+        // Check if config is turned on for submitting writer metrics on 
failure due to non-atomic write semantics
+        if (jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+          log.info("Due to task failure, will report that no records or bytes 
were written for " + datasetState.getDatasetUrn());
+          datasetTaskSummaries.add(new 
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false));
+        }
+      }
+    }
+    TimingEvent jobSummaryTimer = 
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SUMMARY);
+    jobSummaryTimer.addMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries));
+    jobSummaryTimer.stop();

Review Comment:
   better separation of concerns would be to calculate the value only, return 
that, and then elsewhere to submit the event.  
   
   returning `void` is not infrequently a sign of lumping together calculation 
w/ side-effects.  far more testable is to calculate results in "pure functions" 
and then separately perform side-effects on whatever value that undergoes no 
further calculation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to