[
https://issues.apache.org/jira/browse/GOBBLIN-2054?focusedWorklogId=916309&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-916309
]
ASF GitHub Bot logged work on GOBBLIN-2054:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 25/Apr/24 05:18
Start Date: 25/Apr/24 05:18
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3934:
URL: https://github.com/apache/gobblin/pull/3934#discussion_r1578858626
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java:
##########
@@ -544,8 +544,7 @@ public static Optional<Class<? extends DataPublisher>>
getJobDataPublisherClass(
* or {@link ConfigurationKeys#PUBLISH_DATA_AT_JOB_LEVEL} is set to true.
*/
public static boolean shouldCommitDataInJob(State state) {
- boolean jobCommitPolicyIsFull =
- JobCommitPolicy.getCommitPolicy(state.getProperties()) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
+ boolean jobCommitPolicyIsFull = JobCommitPolicy.getCommitPolicy(state) ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
Review Comment:
true, the naming doesn't give any hints. `State.getProperties` are actually
synthesized from several underlying separate `Properties`, so to assemble them
all together necessarily requires a copy. the only way to avoid would be if
`State` were immutable and itself based on an immutable data structure, which
of course `java.util.Properties` is NOT.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
- String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
+ String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}
+ /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>}
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+ private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem
fs, JobState jobState, int numThreads) throws IOException {
+ // TODO - decide whether to replace this method by adapting
TaskStateCollectorService::collectOutputTaskStates (whence much of this code
was drawn)
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ // NOTE: TaskState dir is assumed to be a sibling to the workunits dir
(following conventions of `MRJobLauncher`)
+ String jobIdPathName = new
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+ log.info("TaskStateStore path (name component): '{}' (fs: '{}')",
jobIdPathName, fs.getUri());
+ Optional<Queue<TaskState>> taskStateQueueOpt =
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobIdPathName, numThreads);
+ return taskStateQueueOpt.map(taskStateQueue ->
+ taskStateQueue.stream().peek(taskState ->
Review Comment:
I'd stay away from that since there is more to
`TaskStateCollectorService::collectOutputTaskStates`, namely creating a
circular ref between the `JobState` and `TaskState`. that's only something I'd
do if absolutely necessary, and clearly there's not a need for
gobblin-on-temporal's `CommitActivityImpl`.
as this is long-standing gobblin-on-MR behavior, it's a big potentially risk
to rework, which I don't actually foresee enough payoff to justify.
rather than change the `TaskStateCollectorService` impl, this is the TODO I
did also just add:
```
// TODO - decide whether to replace this method by adapting
TaskStateCollectorService::collectOutputTaskStates (whence much of this code
was drawn)
```
pragmatically weighing between the current one-line bug fix and doing a full
rework/replacement, I went w/ the former out of expedience
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -166,44 +163,39 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
}
if (!IteratorExecutor.verifyAllSuccessful(result)) {
// TODO: propagate cause of failure and determine whether or not this
is retryable to throw a non-retryable failure exception
- String jobName =
jobState.getProperties().getProperty(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
+ String jobName = jobState.getProp(ConfigurationKeys.JOB_NAME_KEY,
UNDEFINED_JOB_NAME);
throw new IOException("Failed to commit dataset state for some
dataset(s) of job " + jobName);
}
} catch (InterruptedException exc) {
throw new IOException(exc);
}
}
+ /** @return {@link TaskState}s loaded from the {@link StateStore<TaskState>}
indicated by the {@link WUProcessingSpec} and {@link FileSystem} */
+ private List<TaskState> loadTaskStates(WUProcessingSpec workSpec, FileSystem
fs, JobState jobState, int numThreads) throws IOException {
+ // TODO - decide whether to replace this method by adapting
TaskStateCollectorService::collectOutputTaskStates (whence much of this code
was drawn)
+ StateStore<TaskState> taskStateStore = Help.openTaskStateStore(workSpec,
fs);
+ // NOTE: TaskState dir is assumed to be a sibling to the workunits dir
(following conventions of `MRJobLauncher`)
+ String jobIdPathName = new
Path(workSpec.getWorkUnitsDir()).getParent().getName();
+ log.info("TaskStateStore path (name component): '{}' (fs: '{}')",
jobIdPathName, fs.getUri());
+ Optional<Queue<TaskState>> taskStateQueueOpt =
TaskStateCollectorService.deserializeTaskStatesFromFolder(taskStateStore,
jobIdPathName, numThreads);
+ return taskStateQueueOpt.map(taskStateQueue ->
+ taskStateQueue.stream().peek(taskState ->
+ // CRITICAL: although some `WorkUnit`s, like those created by
`CopySource::FileSetWorkUnitGenerator` for each `CopyEntity`
+ // already themselves contain every prop of their `JobState`,
not all do.
+ // `TaskState extends WorkUnit` serialization will include its
constituent `WorkUnit`, but not the constituent `JobState`.
+ // given some `JobState` props may be essential for
commit/publish, deserialization must re-associate each `TaskState` w/ `JobState`
+ taskState.setJobState(jobState)
+ // TODO - decide whether something akin necessary to
streamline cumulative in-memory size of all issues:
consumeTaskIssues(taskState);
+ ).collect(Collectors.toList())
+ ).orElseGet(() -> {
+ log.error("TaskStateStore successfully opened, but no task states found
under (name) '{}'", jobIdPathName);
+ return Lists.newArrayList();
+ });
+ }
+
/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
}
-
- /**
- * Organize task states by dataset urns.
- * @param taskStates
- * @return A map of dataset urns to dataset task states.
- */
- public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Collection<TaskState> taskStates) {
- Map<String, JobState.DatasetState> datasetStatesByUrns = Maps.newHashMap();
-
- //TODO: handle skipped tasks?
- for (TaskState taskState : taskStates) {
- String datasetUrn = createDatasetUrn(datasetStatesByUrns, taskState);
- datasetStatesByUrns.get(datasetUrn).incrementTaskCount();
- datasetStatesByUrns.get(datasetUrn).addTaskState(taskState);
- }
-
- return datasetStatesByUrns;
- }
-
- private static String createDatasetUrn(Map<String, JobState.DatasetState>
datasetStatesByUrns, TaskState taskState) {
- String datasetUrn = taskState.getProp(ConfigurationKeys.DATASET_URN_KEY,
ConfigurationKeys.DEFAULT_DATASET_URN);
- if (!datasetStatesByUrns.containsKey(datasetUrn)) {
- JobState.DatasetState datasetState = new JobState.DatasetState();
Review Comment:
I tried to see if possible, but there are existing uses, such as in
`FsDatasetStateStore`. if we'd like to pursue further, suggest to do separately
Issue Time Tracking
-------------------
Worklog Id: (was: 916309)
Time Spent: 1h 20m (was: 1h 10m)
> `CommitActivityImpl` fails for job types (sources) other than Iceberg-Distcp
> ----------------------------------------------------------------------------
>
> Key: GOBBLIN-2054
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2054
> Project: Apache Gobblin
> Issue Type: New Feature
> Components: gobblin-core
> Reporter: Kip Kohn
> Assignee: Abhishek Tiwari
> Priority: Major
> Time Spent: 1h 20m
> Remaining Estimate: 0h
>
> gobblin-on-temporal execution has been failing for other job types than
> iceberg-distcp (which uses `CopySource`). in particular Commit fails with:
> {code}
> java.lang.IllegalArgumentException: Missing required property
> writer.output.dir
> at
> com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.gobblin.util.WriterUtils.getWriterOutputDir(WriterUtils.java:121)
> at
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:390)
> at
> org.apache.gobblin.publisher.BaseDataPublisher.publishMultiTaskData(BaseDataPublisher.java:379)
> at
> org.apache.gobblin.publisher.BaseDataPublisher.publishData(BaseDataPublisher.java:366)
> at
> org.apache.gobblin.publisher.DataPublisher.publish(DataPublisher.java:81)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.commitDataset(SafeDatasetCommit.java:260)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:168)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:64)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.gobblin.util.executors.MDCPropagatingRunnable.run(MDCPropagatingRunnable.java:39)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> this is odd because that same prop had already been used prior to commit,
> while processing the `WorkUnit`! moreover logging shows it to be present
> within the `JobState`
> anyway, even when using a private build that hard-coded that property, this
> later error arises:
> {code}
> Caused by: java.lang.IllegalArgumentException: Can not create a Path from a
> null string
> at org.apache.hadoop.fs.Path.checkPathArg(Path.java:159)
> at org.apache.hadoop.fs.Path.<init>(Path.java:175)
> at org.apache.hadoop.fs.Path.<init>(Path.java:110)
> at
> org.apache.gobblin.runtime.FsDatasetStateStore.sanitizeDatasetStatestoreNameFromDatasetURN(FsDatasetStateStore.java:175)
> at
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:386)
> at
> org.apache.gobblin.runtime.FsDatasetStateStore.persistDatasetState(FsDatasetStateStore.java:90)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.persistDatasetState(SafeDatasetCommit.java:418)
> at
> org.apache.gobblin.runtime.SafeDatasetCommit.call(SafeDatasetCommit.java:191)
> ... 8 more
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)