phet commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571370125
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
};
}
- protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
- return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
- .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
- .orElse(
- "<<not a CopyableFile("
- + getOptCopyEntityClass(workUnit, workUnitPath)
- .map(Class::getSimpleName)
- .orElse("<<not a CopyEntity!>>")
- + "): '" + workUnitPath + "'"
- );
+ protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit>
workUnits, String workUnitsPath, JobState jobState) {
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit,
workUnitsPath))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (fileSourcePaths.isEmpty()) {
+ // TODO - describe WorkUnits other than `CopyableFile`
+ return Optional.empty();
+ } else {
+ return Optional.of(getSourcePathsToLog(fileSourcePaths,
jobState)).map(pathsToLog ->
+ "for copying source files: "
+ + (pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "))
+ + pathsToLog
+ );
+ }
}
- protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
- return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit
workUnit, String workUnitPath) {
+ return getOptWorkUnitCopyEntityClass(workUnit,
workUnitPath).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+ copyableFile.getOrigin().getPath().toString()));
}
- protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
- return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
- log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
- if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
- String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
- if (serialization != null) {
- return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
- }
+ protected static Optional<CopyableFile> getOptCopyableFile(TaskState
taskState) {
+ return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, taskState));
+ }
+
+ protected static Optional<CopyableFile> getOptCopyableFile(Class<?>
copyEntityClass, State state) {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
}
- return Optional.empty();
- });
+ }
+ return Optional.empty();
+ }
+
+ protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit
workUnit, String workUnitPath) {
+ return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
+ protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState
taskState) {
+ return getOptCopyEntityClass(taskState, "taskState '" +
taskState.getTaskId() + "'");
}
Review Comment:
is your suggestion to not consult each individual WU, but instead just
sample one and acquire the info from that, presuming all WUs to be homogenous?
semantically you're probably right about current practice of a single source
producing homogenous WUs (not 100% certain, but I suspect it's true). I also
have no any reason to believe that would change any time soon.
more as a general rule, my pref is to avoid spreading presumptions around
various unrelated parts of code, but instead to code defensively. e.g. I can
*imagine* blending WUs of different types (even if we don't do that today). if
that were to ever come about, it would break such a presumption here. hence
the defensive approach of avoiding the temptation to presume an input
characteristic on the basis of prior, outside knowledge.
anyway that's my pref, but what do you think? (...or did I misunderstand
the intent behind your commment?)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
};
}
- protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
- return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
- .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
- .orElse(
- "<<not a CopyableFile("
- + getOptCopyEntityClass(workUnit, workUnitPath)
- .map(Class::getSimpleName)
- .orElse("<<not a CopyEntity!>>")
- + "): '" + workUnitPath + "'"
- );
+ protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit>
workUnits, String workUnitsPath, JobState jobState) {
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit,
workUnitsPath))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (fileSourcePaths.isEmpty()) {
+ // TODO - describe WorkUnits other than `CopyableFile`
+ return Optional.empty();
+ } else {
+ return Optional.of(getSourcePathsToLog(fileSourcePaths,
jobState)).map(pathsToLog ->
+ "for copying source files: "
+ + (pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "))
+ + pathsToLog
+ );
+ }
}
- protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
- return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit
workUnit, String workUnitPath) {
+ return getOptWorkUnitCopyEntityClass(workUnit,
workUnitPath).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+ copyableFile.getOrigin().getPath().toString()));
}
- protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
- return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
- log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
- if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
- String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
- if (serialization != null) {
- return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
- }
+ protected static Optional<CopyableFile> getOptCopyableFile(TaskState
taskState) {
+ return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, taskState));
+ }
+
+ protected static Optional<CopyableFile> getOptCopyableFile(Class<?>
copyEntityClass, State state) {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
}
- return Optional.empty();
- });
+ }
+ return Optional.empty();
+ }
+
+ protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit
workUnit, String workUnitPath) {
+ return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
+ protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState
taskState) {
+ return getOptCopyEntityClass(taskState, "taskState '" +
taskState.getTaskId() + "'");
}
Review Comment:
is your suggestion to not consult each individual WU, but instead just
sample one and acquire the info from that, presuming all WUs to be homogenous?
semantically you're probably right about current practice of a single source
producing homogenous WUs (not 100% certain, but I suspect it's true). I also
have no any reason to believe that would change any time soon.
more as a general rule, my pref is to avoid spreading presumptions around
various unrelated parts of code, but instead to code defensively. e.g. I can
*imagine* blending WUs of different types (even if we don't do that today). if
that were to ever come about, it would break such a presumption here. hence
the defensive approach of avoiding the temptation to presume an input
characteristic on the basis of prior, outside knowledge.
anyway that's my pref, but what do you think? (...or did I misunderstand
the intent behind your comment?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]