[
https://issues.apache.org/jira/browse/GOBBLIN-2046?focusedWorklogId=915374&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-915374
]
ASF GitHub Bot logged work on GOBBLIN-2046:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 18/Apr/24 19:15
Start Date: 18/Apr/24 19:15
Worklog Time Spent: 10m
Work Description: Will-Lo commented on code in PR #3925:
URL: https://github.com/apache/gobblin/pull/3925#discussion_r1571249282
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/ProcessWorkUnitImpl.java:
##########
@@ -164,46 +160,58 @@ public void onTaskCommitCompletion(Task task) {
};
}
- protected String getCopyableFileSourcePathDesc(WorkUnit workUnit, String
workUnitPath) {
- return getOptFirstCopyableFile(Lists.newArrayList(workUnit), workUnitPath)
- .map(copyableFile -> copyableFile.getOrigin().getPath().toString())
- .orElse(
- "<<not a CopyableFile("
- + getOptCopyEntityClass(workUnit, workUnitPath)
- .map(Class::getSimpleName)
- .orElse("<<not a CopyEntity!>>")
- + "): '" + workUnitPath + "'"
- );
+ protected static Optional<String> getOptWorkUnitsDesc(List<WorkUnit>
workUnits, String workUnitsPath, JobState jobState) {
+ List<String> fileSourcePaths = workUnits.stream()
+ .map(workUnit -> getOptCopyableFileSourcePathDesc(workUnit,
workUnitsPath))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toList());
+ if (fileSourcePaths.isEmpty()) {
+ // TODO - describe WorkUnits other than `CopyableFile`
+ return Optional.empty();
+ } else {
+ return Optional.of(getSourcePathsToLog(fileSourcePaths,
jobState)).map(pathsToLog ->
+ "for copying source files: "
+ + (pathsToLog.size() == workUnits.size() ? "" : ("**first " +
pathsToLog.size() + " only** "))
+ + pathsToLog
+ );
+ }
}
- protected Optional<CopyableFile> getOptCopyableFile(TaskState taskState) {
- return getOptCopyableFile(taskState, "taskState '" + taskState.getTaskId()
+ "'");
+ protected static Optional<String> getOptCopyableFileSourcePathDesc(WorkUnit
workUnit, String workUnitPath) {
+ return getOptWorkUnitCopyEntityClass(workUnit,
workUnitPath).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, workUnit).map(copyableFile ->
+ copyableFile.getOrigin().getPath().toString()));
}
- protected Optional<CopyableFile> getOptCopyableFile(State state, String
logDesc) {
- return getOptCopyEntityClass(state, logDesc).flatMap(copyEntityClass -> {
- log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
- if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
- String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
- if (serialization != null) {
- return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
- }
+ protected static Optional<CopyableFile> getOptCopyableFile(TaskState
taskState) {
+ return getOptTaskStateCopyEntityClass(taskState).flatMap(copyEntityClass ->
+ getOptCopyableFile(copyEntityClass, taskState));
+ }
+
+ protected static Optional<CopyableFile> getOptCopyableFile(Class<?>
copyEntityClass, State state) {
+ log.debug("(state) {} got (copyEntity) {}", state.getClass().getName(),
copyEntityClass.getName());
+ if (CopyableFile.class.isAssignableFrom(copyEntityClass)) {
+ String serialization =
state.getProp(CopySource.SERIALIZED_COPYABLE_FILE);
+ if (serialization != null) {
+ return Optional.of((CopyableFile)
CopyEntity.deserialize(serialization));
}
- return Optional.empty();
- });
+ }
+ return Optional.empty();
+ }
+
+ protected static Optional<Class<?>> getOptWorkUnitCopyEntityClass(WorkUnit
workUnit, String workUnitPath) {
+ return getOptCopyEntityClass(workUnit, "workUnit '" + workUnitPath + "'");
}
- protected Optional<CopyableFile> getOptFirstCopyableFile(List<WorkUnit>
workUnits, String workUnitPath) {
- return Optional.of(workUnits).filter(wus -> wus.size() > 0).flatMap(wus ->
- getOptCopyableFile(wus.get(0), "workUnit '" + workUnitPath + "'")
- );
+ protected static Optional<Class<?>> getOptTaskStateCopyEntityClass(TaskState
taskState) {
+ return getOptCopyEntityClass(taskState, "taskState '" +
taskState.getTaskId() + "'");
}
Review Comment:
Wondering if we need to get copyEntityClass at a workunit level explicitly
here since these functions imply that you can have multiple at a taskstate and
workunit level, afaik these are tied to the source class and there can only be
one source class per Gobblin job?
Issue Time Tracking
-------------------
Worklog Id: (was: 915374)
Time Spent: 20m (was: 10m)
> Generalize gobblin-on-temporal `ProcessWorkUnitImpl` logging not to presume
> `CopyEntity`
> ----------------------------------------------------------------------------------------
>
> Key: GOBBLIN-2046
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2046
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 20m
> Remaining Estimate: 0h
>
> there are many kinds of `WorkUnit`, beyond the `CopyEntity` produced by
> `CopySource`.
> as it remains helpful, continue to log `CopyEntity`-specific info, but do so
> w/o suggesting something is missing when the WU is not a `CopyEntity`
--
This message was sent by Atlassian Jira
(v8.20.10#820010)