[
https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=946827&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946827
]
ASF GitHub Bot logged work on GOBBLIN-2175:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 05/Dec/24 09:09
Start Date: 05/Dec/24 09:09
Worklog Time Spent: 10m
Work Description: pratapaditya04 commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1870954169
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()));
+ } catch (Exception e) {
+ log.error("Exception occurred in performing workload,proceeding with
commit step", e);
+ // We want to mark the GaaS flow as failure, in case performWorkFlow
fails, but we still want to go ahead with commiting the workunits which were
processed before failure
+ sendFailureEventToGaaS(workSpec);
+ return proceedWithCommitStepAndReturnCommitStats(workSpec,
searchAttributes, workunitsProcessed);
+ }
+ return proceedWithCommitStepAndReturnCommitStats(workSpec,
searchAttributes, workunitsProcessed);
+ }
+
+ private void sendFailureEventToGaaS(WUProcessingSpec workSpec) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
+ }
+
+ private CommitStats
proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec,
Review Comment:
makes sense , addressed
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -72,10 +73,33 @@ private CommitStats performWork(WUProcessingSpec workSpec) {
searchAttributes =
TemporalWorkFlowUtils.generateGaasSearchAttributes(jobState.getProperties());
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec, searchAttributes);
- int workunitsProcessed =
- processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
workSpec.getTuning().getMaxBranchesPerTree(),
- workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty());
- if (workunitsProcessed > 0) {
+
+ Optional<Integer> workunitsProcessed = Optional.empty();
+ try {
+ workunitsProcessed =
Optional.of(processingWorkflow.performWorkload(WorkflowAddr.ROOT, workload, 0,
+ workSpec.getTuning().getMaxBranchesPerTree(),
workSpec.getTuning().getMaxSubTreesPerTree(), Optional.empty()));
+ } catch (Exception e) {
+ log.error("Exception occurred in performing workload,proceeding with
commit step", e);
+ // We want to mark the GaaS flow as failure, in case performWorkFlow
fails, but we still want to go ahead with commiting the workunits which were
processed before failure
+ sendFailureEventToGaaS(workSpec);
+ return proceedWithCommitStepAndReturnCommitStats(workSpec,
searchAttributes, workunitsProcessed);
+ }
+ return proceedWithCommitStepAndReturnCommitStats(workSpec,
searchAttributes, workunitsProcessed);
+ }
+
+ private void sendFailureEventToGaaS(WUProcessingSpec workSpec) {
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
+ }
+
+ private CommitStats
proceedWithCommitStepAndReturnCommitStats(WUProcessingSpec workSpec,
+ Map<String, Object> searchAttributes, Optional<Integer>
workunitsProcessed) {
+ /*
+ !workunitsProcessed.isPresent() condition helps in case of partial commit,
+ workunitsProcessed will be Optional.Empty() only in cases performWorkload
throws an exception
+ we are only inhibiting commit when workunitsProcessed actually known to
be zero
+ * */
+ if (!workunitsProcessed.isPresent() || workunitsProcessed.get() > 0) {
Review Comment:
addressed
Issue Time Tracking
-------------------
Worklog Id: (was: 946827)
Time Spent: 1h (was: 50m)
> Fix partial commit in temporal flow
> -----------------------------------
>
> Key: GOBBLIN-2175
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2175
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Aditya Pratap Singh
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
> Fix partial commit in temporal flow
--
This message was sent by Atlassian Jira
(v8.20.10#820010)