[
https://issues.apache.org/jira/browse/GOBBLIN-2175?focusedWorklogId=947833&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-947833
]
ASF GitHub Bot logged work on GOBBLIN-2175:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 11/Dec/24 18:44
Start Date: 11/Dec/24 18:44
Worklog Time Spent: 10m
Work Description: pratapaditya04 commented on code in PR #4078:
URL: https://github.com/apache/gobblin/pull/4078#discussion_r1880729655
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -205,7 +215,7 @@ private Map<String, DatasetStats>
summarizeDatasetOutcomes(Map<String, JobState.
// Only process successful datasets unless configuration to process failed
datasets is set
for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
- && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ && (commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS ||
commitPolicy == JobCommitPolicy.COMMIT_ON_PARTIAL_SUCCESS))) {
Review Comment:
good suggestion, added a field allowPartialCommit
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -97,12 +99,20 @@ public CommitStats commit(WUProcessingSpec workSpec) {
Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates),
Lists.newArrayList());
TaskState firstTaskState = taskStates.get(0);
log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
- commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+ CommitStats commitStats = CommitStats.createEmpty();
+ try {
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+ } catch (FailedDatasetUrnsException exception) {
+ log.info("Some datasets failed to be committed, proceeding with
publishing commit step");
+ commitStats.setOptFailure(Optional.of(exception));
+ }
boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
- return new CommitStats(datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
+ return commitStats.setDatasetStats(datasetTaskSummaries)
+ .setNumCommittedWorkUnits(
+
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
Review Comment:
addressed
Issue Time Tracking
-------------------
Worklog Id: (was: 947833)
Time Spent: 2.5h (was: 2h 20m)
> Fix partial commit in temporal flow
> -----------------------------------
>
> Key: GOBBLIN-2175
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2175
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Aditya Pratap Singh
> Priority: Major
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Fix partial commit in temporal flow
--
This message was sent by Atlassian Jira
(v8.20.10#820010)