[
https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=948163&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-948163
]
ASF GitHub Bot logged work on GOBBLIN-2175:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/Dec/24 06:39
Start Date: 13/Dec/24 06:39
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1883352538
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java:
##########
@@ -59,13 +60,21 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
- timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
- .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
- .submit();
+
+ if (!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
+
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .submit();// emit job summary info on both full and partial commit
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
+ }
+ if (commitGobblinStats.getOptFailure().isPresent()) {
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Failed to commit dataset state for some dataset(s)"),
commitGobblinStats.getOptFailure().get().getClass().toString(),
Review Comment:
NBD, but no need for `String.format`, when a string literal would do
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
- CommitStats result = commitWorkflow.commit(workSpec);
- if (result.getNumCommittedWorkUnits() == 0) {
- log.warn("No work units committed at the job level. They could have
been committed at the task level.");
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
+ Optional.empty()));
+ } catch (Exception e) {
+ log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
+
+ try {
+ performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);// Attempt partial commit before surfacing the failure
Review Comment:
needs space before `//`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
- CommitStats result = commitWorkflow.commit(workSpec);
- if (result.getNumCommittedWorkUnits() == 0) {
- log.warn("No work units committed at the job level. They could have
been committed at the task level.");
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
+ Optional.empty()));
+ } catch (Exception e) {
+ log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
+
+ try {
+ performCommitIfAnyWorkUnitsProcessed(workSpec, searchAttributes,
workunitsProcessed);// Attempt partial commit before surfacing the failure
+ } catch (Exception commitException) {
+ // Combine current and commit exception messages for a more complete
context
+ String combinedMessage = String.format(
+ "Processing failure: %s. Commit workflow failure: %s",
+ e.getMessage(),
+ commitException.getMessage()
+ );
+ log.error(combinedMessage);
+ throw ApplicationFailure.newNonRetryableFailureWithCause(
+ String.format("Processing failure: %s. Partial commit failure:
%s", combinedMessage, commitException),
Review Comment:
1. maybe add intro context plus a newline to separate the msgs. e.g.
```
"ProcessWorkUnits failure (as expected) led to failure during partial commit
attempt -\n ProcessWorkUnits failure: %s\n CommitStep failure: %s"
```
2. also, can't you reuse `combinedMessage` on L96? or is more of
`commitException` than just the msg getting used the second time?
3. `e` will not lose its stack trace, so no need to wrap it as `new
Exception(e)`, unless you want someone to know you rethrew it from this
particular place. that said, I'd avoid wrapping, since that just adds more
layers to peel back while debugging
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,20 +73,48 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
- CommitStepWorkflow commitWorkflow =
createCommitStepWorkflow(searchAttributes);
- CommitStats result = commitWorkflow.commit(workSpec);
- if (result.getNumCommittedWorkUnits() == 0) {
- log.warn("No work units committed at the job level. They could have
been committed at the task level.");
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(),
+ Optional.empty()));
+ } catch (Exception e) {
+ log.error("ProcessWorkUnits failure - attempting partial commit before
re-throwing exception", e);
Review Comment:
I may even have suggested this text, but reading again, it's ambiguous (e.g.
was the failure *while* attempting partial commit?)
this would be clearer:
```
"ProcessWorkUnits failure - will attempt partial commit..."
```
Issue Time Tracking
-------------------
Worklog Id: (was: 948163)
Time Spent: 2h 50m (was: 2h 40m)
> Fix partial commit in temporal flow
> -----------------------------------
>
> Key: GOBBLIN-2175
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2175
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Aditya Pratap Singh
> Priority: Major
> Time Spent: 2h 50m
> Remaining Estimate: 0h
>
> Fix partial commit in temporal flow
--
This message was sent by Atlassian Jira
(v8.20.10#820010)