phet commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1876806190
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java:
##########
@@ -34,11 +36,13 @@
@Data
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
+@Accessors(chain = true)
public class CommitStats {
@NonNull private Map<String, DatasetStats> datasetStats;
@NonNull private int numCommittedWorkUnits;
+ @NonNull private Optional<Exception> optFailure;
Review Comment:
let's treat these `@Data` POJOs as immutable and construct them, fully
initialized.
all these fields actually would have been `private final`, but that doesn't
play well w/ jackson's JSON deserialization, so they're forced to be `@NonNull
private` instead
(hence no `@Accessors`)
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -67,6 +67,8 @@
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
+import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
Review Comment:
please alphabetize
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
- timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
- .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
- .submit();
+
+ if(!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .submit();
+ }
+ if(commitGobblinStats.getOptFailure().isPresent()){
Review Comment:
`if` needs a space before opening parens. also needs a space before the `{`
worth a comment like:
```
// emit job summary info on both full and partial commit (ultimately for
`GaaSJobObservabilityEvent.datasetsMetrics`)
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) {
Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates),
Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
- commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+ CommitStats commitStats = CommitStats.createEmpty();
+ try {
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+ } catch (FailedDatasetUrnsException exception) {
+ log.info("Some datasets failed to be committed, proceeding with
publishing commit step");
+ commitStats.setOptFailure(Optional.of(exception));
+ }
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
- return new CommitStats(datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+ return commitStats.setDatasetStats(datasetTaskSummaries)
+ .setNumCommittedWorkUnits(
+
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
Review Comment:
here, please treat `CommitStats` as immutable:
```
Optional<Exception> optFailure = Optional.empty();
try {
commitTaskStates(jobState, datasetStatesByUrns, jobContext);
} catch (FailedDatasetUrnsException e) {
log.warn("Some datasets failed to be committed, proceeding with publishing
commit step", e);
optFailure = Optional.of(exception);
}
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
return new CommitStats(
datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum(),
optFailure
);
```
NOTE: also upgrading the logging to `.warn` and including the specific
exception
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/exception/FailedDatasetUrnsException.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.temporal.exception;
+
+import java.io.IOException;
+
+/**
+ * An exception thrown when a set of dataset URNs fail to be processed.
+ */
+public class FailedDatasetUrnsException extends IOException {
+
+
+ /**
+ * Creates a new instance of this exception with the failed dataset URNs.
+ *
+ * @param failedDatasetUrns the String of failed dataset URNs joined by comma
+ */
+ public FailedDatasetUrnsException(String failedDatasetUrns) {
+ super("Failed to process the following dataset URNs: " +
failedDatasetUrns);
+ }
Review Comment:
rather than passing in a string with multiple URNs already joined, have the
ctor take a `List<String>` that keeps them separate. also add a member that
whoever catches the exception may access the list. you can still format the
`super` call w/ the joined URNs as you currently are.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +72,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
- CommitStats result = commitWorkflow.commit(workSpec);
- if (result.getNumCommittedWorkUnits() == 0) {
- log.warn("No work units committed at the job level. They could have
been committed at the task level.");
- }
- return result;
- } else {
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
+ Optional.empty()));
+ } catch (Exception e) {
+ log.error("ProcessWorkUnits failure - will attempt partial commit before
announcing error", e);
+ performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);
+ throw e; //We want to proceed with partial commit and throw exception so
that the parent workflow ExecuteGobblinWorkflowImpl can throw the failure event
Review Comment:
need space after `//` and also could streamline comment:
```
throw e; // re-throw after any partial commit, to fail the parent workflow
```
but anyway, that `throw` on L84 won't run will it? the partial commit
(`CommitStepWorkflow`) will throw an exception that unwinds the stack
beforehand. am I correct that at best this `throw` is a "should never happen"
fallback?
ITO which we'd prefer to surface (possibly both) - what specific info is in
the orig exception relative to the one currently being throw? would it make
sense to combine messages from the two or simply throw one or the other?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,10 +61,19 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
- timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
- .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
- .submit();
+
+ if(!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .submit();
+ }
+ if(commitGobblinStats.getOptFailure().isPresent()){
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed to commit dataset state for some dataset(s)"),
FailedDatasetUrnsException.class.toString(),
+ commitGobblinStats.getOptFailure().get());
+ }
Review Comment:
this may not be type-safe at runtime, if the type of `getOptFailure` is not
`FailedDatasetUrnsException`. if that's the only possibility, then change the
field type in `CommitStats`. otherwise:
```
Exception failure = commitGobblinStats.getOptFailure().get();
throw ApplicationFailure.newNonRetryableFailureWithCause(
...,
failure.getClass().getName(),
failure);
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -205,7 +215,7 @@ private Map<String, DatasetStats>
summarizeDatasetOutcomes(Map<String, JobState.
// Only process successful datasets unless configuration to process failed
datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
- && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS ||
commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {
Review Comment:
good catch, the impl previously missed the equivalent "other" naming. given
the potential for similar omission in the future, I suggest to add a field on
the `enum`:
```
@Getter private final boolean allowPartialCommit;
```
then it's encapsulated via `commitPolicy.isAllowPartialCommit()` (if you
like use "fluent" form)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]