phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1547037631
##########
gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java:
##########
@@ -44,6 +45,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
Review Comment:
these two imports may have snuck in... seem unused
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -141,22 +151,18 @@ public Callable<Void> apply(final Map.Entry<String,
JobState.DatasetState> entry
IteratorExecutor.logFailures(result, null, 10);
- Set<String> failedDatasetUrns = new HashSet<>();
- for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
- // Set the overall job state to FAILED if the job failed to process
any dataset
- if (datasetState.getState() == JobState.RunningState.FAILED) {
- failedDatasetUrns.add(datasetState.getDatasetUrn());
- }
- }
+ Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
+ .filter(datasetState -> datasetState.getState() ==
JobState.RunningState.FAILED)
+ .collect(HashSet::new, (set, datasetState) ->
set.add(datasetState.getDatasetUrn()), HashSet::addAll);
Review Comment:
can you use
```
.map(JobState.DatasetState::getDatasetUrn)
.collect(Collectors.toCollection(HashSet::new))
```
?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java:
##########
@@ -77,9 +77,9 @@ public static String
qualifyNamePerExecWithoutFlowExecId(String name, Config wor
}
/** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
Review Comment:
looks like `workerConfig` is out-of-date
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -68,11 +76,11 @@ public class CommitActivityImpl implements CommitActivity {
public int commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
- String jobName = UNDEFINED_JOB_NAME;
+ Optional<String> jobNameOpt = Optional.empty();
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
- jobName = jobState.getJobName();
+ jobNameOpt = Optional.of(jobState.getJobName());
Review Comment:
`Optional::ofNullable`?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java:
##########
@@ -93,7 +93,7 @@ public class JobContext implements Closeable {
private final String jobId;
private final String jobSequence;
private final JobState jobState;
- @Getter(AccessLevel.PACKAGE)
+ @Getter(AccessLevel.PUBLIC)
Review Comment:
I believe you can omit the `AccessLevel`, as it's `public`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -181,6 +187,42 @@ public static Map<String, JobState.DatasetState>
createDatasetStatesByUrns(Colle
return datasetStatesByUrns;
}
+ public void summarizeDatasetFileMetrics(Map<String, JobState.DatasetState>
datasetStatesByUrns, JobContext jobContext, EventSubmitter eventSubmitter) {
+ List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+ // Only process successful datasets unless configuration to process failed
datasets is set
+ boolean processFailedTasks =
+
PropertiesUtils.getPropAsBoolean(jobContext.getJobState().getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS,
+ "false");
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+ if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
+ && jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ long totalBytesWritten = 0;
+ long totalRecordsWritten = 0;
+ for (TaskState taskState : datasetState.getTaskStates()) {
+ // Certain writers may omit these metrics e.g.
CompactionLauncherWriter
+ if ((taskState.getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED || processFailedTasks)) {
+ totalBytesWritten +=
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+ totalRecordsWritten +=
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+ }
+ }
+ log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes:
%d)", datasetState.getDatasetUrn(),
+ totalRecordsWritten, totalBytesWritten));
+ datasetTaskSummaries.add(
+ new DatasetTaskSummary(datasetState.getDatasetUrn(),
totalRecordsWritten, totalBytesWritten,
+ datasetState.getState() == JobState.RunningState.COMMITTED));
+ } else if (datasetState.getState() == JobState.RunningState.FAILED) {
+ // Check if config is turned on for submitting writer metrics on
failure due to non-atomic write semantics
+ if (jobContext.getJobCommitPolicy() ==
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+ log.info("Due to task failure, will report that no records or bytes
were written for " + datasetState.getDatasetUrn());
+ datasetTaskSummaries.add(new
DatasetTaskSummary(datasetState.getDatasetUrn(), 0, 0, false));
+ }
+ }
+ }
+ TimingEvent jobSummaryTimer =
eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_SUMMARY);
+ jobSummaryTimer.addMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries));
+ jobSummaryTimer.stop();
Review Comment:
better separation of concerns would be to calculate the value only, return
that, and then elsewhere to submit the event.
returning `void` is not infrequently a sign of lumping together calculation
w/ side-effects. far more testable is to calculate results in "pure functions"
and then separately perform side-effects on whatever value that undergoes no
further calculation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]