phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1599162857
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -77,36 +77,37 @@ public class CommitActivityImpl implements CommitActivity {
static String UNDEFINED_JOB_NAME = "<job_name_stub>";
@Override
- public int commit(WUProcessingSpec workSpec) {
+ public CommitGobblinStats commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
- Optional<String> jobNameOpt = Optional.empty();
+ Optional<String> optJobName = Optional.empty();
+ AutomaticTroubleshooter troubleshooter = null;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
- jobNameOpt = Optional.ofNullable(jobState.getJobName());
+ optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState,
numDeserializationThreads);
- if (!taskStates.isEmpty()) {
- JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
- commitTaskStates(jobState, taskStates, jobContext);
+ if (taskStates.isEmpty()) {
+ return new CommitGobblinStats(new HashMap<>(), 0);
}
- Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
- Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(ImmutableList.copyOf(taskStateQueue));
- commitTaskStates(jobState, datasetStatesByUrns, globalGobblinContext,
jobNameOpt);
- List<DatasetTaskSummary> datasetTaskSummaries =
generateDatasetTaskSummaries(datasetStatesByUrns, globalGobblinContext,
workSpec.getEventSubmitterContext().create());
- // Submit event that summarizes work done
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
- TemporalEventTimer eventTimer =
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY);
- eventTimer.addMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries));
- eventTimer.stop();
- return taskStateQueue.size();
+
+ JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates),
Lists.newArrayList());
+ TaskState firstTaskState = taskStates.get(0);
+ log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+
+ boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
+
+ Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
+ return new CommitGobblinStats(datasetTaskSummaries,
datasetTaskSummaries.values().stream().reduce(0, (acc, datasetStats) -> acc +
datasetStats.getNumCommittedWorkunits(), Integer::sum));
Review Comment:
I believe you can do:
```
summaries.values().stream().mapToLong(DatasetStats::getNumCommittedWorkunits).sum()
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java:
##########
@@ -32,5 +34,5 @@ public interface CommitActivity {
* @return number of workunits committed
*/
@ActivityMethod
- int commit(WUProcessingSpec workSpec);
+ CommitGobblinStats commit(WUProcessingSpec workSpec);
Review Comment:
to me, "gobblin commit stats" sounds better than ""commit gobblin stats",
since of all the gobblin stats, these are the commit ones, rather than of all
the commit stats these are commit ones.
that said, as all this is within our gobblin impl, this could probably be
simply `CommitStats`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java:
##########
@@ -45,7 +45,7 @@ public interface EventTimer extends Closeable {
* @param key
* @param metadata
*/
- void addMetadata(String key, String metadata);
+ EventTimer withMetadata(String key, String metadata);
Review Comment:
AWESOME!
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java:
##########
@@ -29,5 +30,5 @@
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
@WorkflowMethod
- int process(WUProcessingSpec wuSpec);
+ CommitGobblinStats process(WUProcessingSpec wuSpec);
Review Comment:
in lieu of my last comment, IMO `CommitStats` does read nicely here
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java:
##########
@@ -37,5 +38,5 @@
public interface ExecuteGobblinWorkflow {
/** @return the number of {@link WorkUnit}s discovered and successfully
processed */
@WorkflowMethod
- int execute(Properties props, EventSubmitterContext eventSubmitterContext);
+ ExecGobblinStats execute(Properties props, EventSubmitterContext
eventSubmitterContext);
Review Comment:
given the workflow is named `ExecuteGobblin`, the `ExecGobblinStats`
actually does sound reasonable.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -64,14 +66,14 @@ private int performWork(WUProcessingSpec workSpec) {
);
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
- int result = commitWorkflow.commit(workSpec);
- if (result == 0) {
+ CommitGobblinStats result = commitWorkflow.commit(workSpec);
+ if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have
been committed at the task level.");
}
return result;
} else {
log.error("No work units processed, so no commit attempted.");
- return 0;
+ return new CommitGobblinStats(new HashMap<>(), 0);
Review Comment:
did I see this earlier as well? I'd probably add a static used as:
```
return CommitStats.createEmpty();
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java:
##########
@@ -85,8 +86,8 @@ public void submitJob(List<WorkUnit> workunits) {
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(this.jobProps)
.build();
- int numWorkUnits =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
- log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+ ExecGobblinStats execGobblinStats =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}",
execGobblinStats.getNumCommitted());
Review Comment:
log only the number? why not the full struct as JSON?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+@Data
+@NonNull
+@RequiredArgsConstructor
+@NoArgsConstructor
+public class DatasetStats {
Review Comment:
nit: javadoc (even short)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]