[ 
https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=947833&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-947833
 ]

ASF GitHub Bot logged work on GOBBLIN-2175:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Dec/24 18:44
            Start Date: 11/Dec/24 18:44
    Worklog Time Spent: 10m 
      Work Description: pratapaditya04 commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1880729655


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -205,7 +215,7 @@ private Map<String, DatasetStats> 
summarizeDatasetOutcomes(Map<String, JobState.
     // Only process successful datasets unless configuration to process failed 
datasets is set
     for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
       if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
-          && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+          && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS || 
commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {

Review Comment:
   good suggestion, added a field allowPartialCommit



##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) {
       Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), 
Lists.newArrayList());
       TaskState firstTaskState = taskStates.get(0);
       log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
-      commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      CommitStats commitStats = CommitStats.createEmpty();
+      try {
+        commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+      } catch (FailedDatasetUrnsException exception) {
+        log.info("Some datasets failed to be committed, proceeding with 
publishing commit step");
+        commitStats.setOptFailure(Optional.of(exception));
+      }
 
       boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
 
       Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
-      return new CommitStats(datasetTaskSummaries, 
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+      return commitStats.setDatasetStats(datasetTaskSummaries)
+          .setNumCommittedWorkUnits(
+              
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());

Review Comment:
   addressed





Issue Time Tracking
-------------------

    Worklog Id:     (was: 947833)
    Time Spent: 2.5h  (was: 2h 20m)

> Fix partial commit in temporal flow
> -----------------------------------
>
>                 Key: GOBBLIN-2175
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2175
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Aditya Pratap Singh
>            Priority: Major
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Fix partial commit in temporal flow



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to