phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1562777405
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java:
##########
@@ -118,4 +106,19 @@ protected ProcessWorkUnitsWorkflow
createProcessWorkUnitsWorkflow(Properties job
.build();
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
}
+
+ protected static WUProcessingSpec createProcessingSpec(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
Review Comment:
NBD: but `JobState` does have `contains` and `getPropAsInt`, which would
allow you to pass in the `JobState` as a param, for better abstraction
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +195,40 @@ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Colle
return datasetStatesByUrns;
}
+ public List<DatasetTaskSummary> generateDatasetTaskSummaries(Map<String,
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext,
EventSubmitter eventSubmitter) {
+ List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+ // Only process successful datasets unless configuration to process failed
datasets is set
+ boolean processFailedTasks =
+
PropertiesUtils.getPropAsBoolean(jobContext.getJobState().getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS,
+ "false");
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+ if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
+ && jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ long totalBytesWritten = 0;
+ long totalRecordsWritten = 0;
+ for (TaskState taskState : datasetState.getTaskStates()) {
+ // Certain writers may omit these metrics e.g.
CompactionLauncherWriter
+ if ((taskState.getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED || processFailedTasks)) {
+ totalBytesWritten +=
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+ totalRecordsWritten +=
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+ }
+ }
+ log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes:
%d)", datasetState.getDatasetUrn(),
+ totalRecordsWritten, totalBytesWritten));
+ datasetTaskSummaries.add(
+ new DatasetTaskSummary(datasetState.getDatasetUrn(),
totalRecordsWritten, totalBytesWritten,
+ datasetState.getState() == JobState.RunningState.COMMITTED));
+ } else if (datasetState.getState() == JobState.RunningState.FAILED) {
+ // Check if config is turned on for submitting writer metrics on
failure due to non-atomic write semantics
+ if (jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
Review Comment:
for clarity (and to parallel the `&&` in the `if` alternative above, suggest
to combine the nested `if` via `&&`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -87,12 +97,19 @@ public int commit(WUProcessingSpec workSpec) {
return 0;
}
Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
- commitTaskStates(jobState, ImmutableList.copyOf(taskStateQueue),
globalGobblinContext);
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(ImmutableList.copyOf(taskStateQueue));
+ commitTaskStates(jobState, datasetStatesByUrns, globalGobblinContext,
jobNameOpt);
+ List<DatasetTaskSummary> datasetTaskSummaries =
generateDatasetTaskSummaries(datasetStatesByUrns, globalGobblinContext,
workSpec.getEventSubmitterContext().create());
+ // Submit event that summarizes work done
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ TemporalEventTimer eventTimer =
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY);
Review Comment:
I'd love to streamline... what do you think about:
```
TemporalEventTimer eventTimer =
TemporalEventTimer.Factory.createEvent(workSpec.getEventSubmitterContext(),
TimingEvent.LauncherTimings.JOB_SUMMARY);
```
?
personally, I'd prefer a more fluent API, to support:
```
TemporalEventTimer.Factory.createEvent(workSpec.getEventSubmitterContext(),
TimingEvent.LauncherTimings.JOB_SUMMARY)
.withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries))
.submit();
```
for that, we'd just need:
a. either to change the `addMetadata` return type to support chaining or
else introduce a `withMetadata` alt. version that does
b. alias `stop()` to `submit()`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +195,40 @@ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Colle
return datasetStatesByUrns;
}
+ public List<DatasetTaskSummary> generateDatasetTaskSummaries(Map<String,
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext,
EventSubmitter eventSubmitter) {
Review Comment:
- suggest more purpose-driven semantic naming, like
`summarizeDatasetOutcomes`
- no need to take `eventSubmitter` any longer
- `JobContext` is so big and broad, I'd love to get it out of this signature
and leave it to the caller to pass in only the narrow params actually relevant
to this work, such as:
```
List<DatasetTaskSummary> summarizeDatasetOutcomes(
Map<String, JobState.DatasetState> datasetStatesByUrns,
JobCommitPolicy commitPolicy,
boolean shouldIncludeFailedTasks)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]