Will-Lo commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571249282


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
     };
   }
 
-  protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String 
workUnitPath) {
-    return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
-        .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
-        .orElse(
-            "<<not a CopyableFile("
-                + getOptCopyEntityClass(workUnit, workUnitPath)
-                .map(Class::getSimpleName)
-                .orElse("<<not a CopyEntity!>>")
-                + "): '" + workUnitPath + "'"
-        );
+  protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit> 
workUnits, String workUnitsPath, JobState jobState) {
+    List<String> fileSourcePaths = workUnits.stream()
+        .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit, 
workUnitsPath))
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .collect(Collectors.toList());
+    if (fileSourcePaths.isEmpty()) {
+      // TODO - describe WorkUnits other than `CopyableFile`
+      return Optional.empty();
+    } else {
+      return Optional.of(getSourcePathsToLog(fileSourcePaths, 
jobState)).map(pathsToLog ->
+          "for copying source files: "
+              + (pathsToLog.size() == workUnits.size() ? "" : ("**first " + 
pathsToLog.size() + " only** "))
+              + pathsToLog
+      );
+    }
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
-    return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId() 
+ "'");
+  protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptWorkUnitCopyEntityClass(workUnit, 
workUnitPath).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+            copyableFile.getOrigin().getPath().toString()));
   }
 
-  protected Optional<CopyableFile> getOptCopyableFile(State state, String 
logDesc) {
-    return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
-      log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
-      if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
-        String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
-        if (serialization != null) {
-          return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
-        }
+  protected static Optional<CopyableFile> getOptCopyableFile(TaskState 
taskState) {
+    return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+        getOptCopyableFile(copyEntityClass, taskState));
+  }
+
+  protected static Optional<CopyableFile> getOptCopyableFile(Class<?> 
copyEntityClass, State state) {
+    log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(), 
copyEntityClass.getName());
+    if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+      String serialization = 
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+      if (serialization != null) {
+        return Optional.of((CopyableFile) 
CopyEntity.deserialize(serialization));
       }
-      return Optional.empty();
-    });
+    }
+    return Optional.empty();
+  }
+
+  protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit 
workUnit, String workUnitPath) {
+    return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
   }
 
-  protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit> 
workUnits, String workUnitPath) {
-    return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
-      getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
-    );
+  protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState 
taskState) {
+    return getOptCopyEntityClass(taskState, "taskState '" + 
taskState.getTaskId() + "'");
   }

Review Comment:
   Wondering if we need to get copyEntityClass at a workunit level explicitly 
here since these functions imply that you can have multiple at a taskstate and 
workunit level, afaik these are tied to the source class and there can only be 
one source class per Gobblin job?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to