[
https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=975539&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-975539
]
ASF GitHub Bot logged work on GOBBLIN-2204:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 21/Jul/25 20:57
Start Date: 21/Jul/25 20:57
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2220106656
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ TransferBytes bytes = getBytesReadAndWritten(this.state);
+ if (bytes == null) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.getBytesRead();
+ Long bytesWritten = bytes.getBytesWritten();
+
+ if(bytesRead == null || bytesWritten == null) {
Review Comment:
nit: add space after `if`
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ TransferBytes bytes = getBytesReadAndWritten(this.state);
+ if (bytes == null) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.getBytesRead();
+ Long bytesWritten = bytes.getBytesWritten();
+
+ if(bytesRead == null || bytesWritten == null) {
+ log.error("Missing value(s): bytesRead={}, bytesWritten={}", bytesRead,
bytesWritten);
+ return Result.FAILED;
+ }
+ Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+ if (sizeDifference == 0) {
+ return Result.PASSED;
+ }
+
+ log.warn("File size check failed - bytes read: {}, bytes written: {},
difference: {}",
+ bytesRead, bytesWritten, sizeDifference);
+ return Result.FAILED;
+ }
+
+ @Override
+ public String toString() {
+ TransferBytes bytes = getBytesReadAndWritten(this.state);
+ return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
bytes.getBytesRead(), bytes.getBytesWritten());
+ }
+
+ /**
+ * Helper class to hold transfer bytes information
+ */
+ @Getter
+ private static class TransferBytes {
+ final Long bytesRead;
+ final Long bytesWritten;
+ TransferBytes(Long bytesRead, Long bytesWritten) {
+ this.bytesRead = bytesRead;
+ this.bytesWritten = bytesWritten;
+ }
+ }
+
+ /**
+ * Extracts bytesRead and bytesWritten from the given state.
+ * Returns null if parsing fails.
+ */
+ private TransferBytes getBytesReadAndWritten(State state) {
Review Comment:
We can return `Optional<TransferBytes>` from this instead of relying on null
or constructing a partially invalid object. Optional clearly expresses the
possibility of absence and avoids null-filled objects like `TransferBytes(null,
null)`, making the failure path more explicit for the caller
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
Review Comment:
rename `faliedFilesCount -> failedFilesCount` to better reflect that these
are counts and match naming in metrics emission
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ int nonEvaluatedFilesSize = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesSize++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesSize++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesSize++;
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesSize++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {},
Failed: {}, Not Evaluated: {}",
+ totalFiles, passedFilesSize, failedFilesSize,
nonEvaluatedFilesSize);
+ return new DataQualityEvaluationResult(jobDataQuality, totalFiles,
passedFilesSize, failedFilesSize, nonEvaluatedFilesSize);
+ }
+
+ private static void emitMetrics(JobState jobState, int jobDataQuality, int
totalFiles,
+ int passedFilesSize, int failedFilesSize, int
nonEvaluatedFilesSize, String datasetUrn) {
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if(otelMetrics != null) {
+ Meter meter =
otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
+ AtomicLong jobDataQualityRef = new AtomicLong(jobDataQuality);
+ AtomicLong totalFilesRef = new AtomicLong(totalFiles);
+ AtomicLong passedFilesRef = new AtomicLong(passedFilesSize);
+ AtomicLong failedFilesRef = new AtomicLong(failedFilesSize);
+ AtomicLong nonEvaluatedFilesRef = new
AtomicLong(nonEvaluatedFilesSize);
Review Comment:
Any reason to use `AtomicLong`? It should not be needed since the metric
values are immutable snapshots and not being mutated or shared across threads
##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An enumeration for possible statuses for Data quality checks.
+ * Its values will be:
+ * - PASSED: When all data quality checks pass
+ * - FAILED: When any data quality check fails
+ * - NOT_EVALUATED: When data quality check evaluation is not performed
Review Comment:
add javadoc for `UNKNOWN`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
Review Comment:
`DATA_QUALITY_STATUS_METRIC_NAME` is defined as a counter, but adding 0 for
failures doesn’t emit anything, so we lose visibility into failed flows
completely. To fix this, please split into two explicit counters: one for
success and one for failure to ensure both outcomes are observable and can be
used to setup alerts.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ int nonEvaluatedFilesSize = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesSize++;
Review Comment:
when is `taskStatus: null`.. should that be counted as DQ failure status?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ int nonEvaluatedFilesSize = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesSize++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesSize++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesSize++;
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
+ } else {
Review Comment:
in what all cases would taskDataQuality be `NOT_EVALUATED`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ int nonEvaluatedFilesSize = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesSize++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesSize++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesSize++;
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
Review Comment:
add a warn log when DataQualityStatus.fromString() returns UNKNOWN
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ TransferBytes bytes = getBytesReadAndWritten(this.state);
+ if (bytes == null) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.getBytesRead();
+ Long bytesWritten = bytes.getBytesWritten();
+
+ if(bytesRead == null || bytesWritten == null) {
Review Comment:
We should avoid `new TransferBytes(null, null)`, as it creates a seemingly
valid object that requires downstream null checks. We can instead log any error
within `getBytesReadAndWritten` and return an `Optional.empty()` for any
failure case.
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
Review Comment:
The inline condition (status == PASSED ? 1 : 0) leaks metric encoding logic
at the caller. We can move this into the DataQualityStatus enum (eg.
toMetricValue()), or encapsulating it inside emitMetrics()
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus() ==
DataQualityStatus.PASSED? 1 : 0, result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.nonEvaluatedFiles, datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
Review Comment:
should we rename the variable to `jobDataQualityStatus`?
Issue Time Tracking
-------------------
Worklog Id: (was: 975539)
Time Spent: 1h (was: 50m)
> FileSize Data Quality implementation for FileBasedCopy
> ------------------------------------------------------
>
> Key: GOBBLIN-2204
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2204
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vaibhav Singhal
> Priority: Major
> Time Spent: 1h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)