[
https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=977040&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-977040
]
ASF GitHub Bot logged work on GOBBLIN-2204:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 31/Jul/25 05:19
Start Date: 31/Jul/25 05:19
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2243350309
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if (!bytes.isPresent()) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.get().getBytesRead();
+ Long bytesWritten = bytes.get().getBytesWritten();
+
+ Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+ if (sizeDifference == 0) {
+ return Result.PASSED;
+ }
+
+ log.warn("File size check failed - bytes read: {}, bytes written: {},
difference: {}",
+ bytesRead, bytesWritten, sizeDifference);
+ return Result.FAILED;
+ }
+
+ @Override
+ public String toString() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if(bytes.isPresent()) {
+ return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+ } else{
+ return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+ }
+ }
+
+ /**
+ * Helper class to hold transfer bytes information
+ */
+ @Getter
+ private static class TransferBytes {
+ final Long bytesRead;
+ final Long bytesWritten;
+ TransferBytes(Long bytesRead, Long bytesWritten) {
+ this.bytesRead = bytesRead;
+ this.bytesWritten = bytesWritten;
+ }
+ }
+
+ /**
+ * Extracts bytesRead and bytesWritten from the given state.
+ * Returns null if parsing fails.
Review Comment:
update javadoc wrt returning null
##########
gobblin-core/src/test/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyCheckerTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker.task;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.TestTaskLevelPolicy;
+
+@Test
+public class TaskLevelPolicyCheckerTest {
+
+ @Test
+ public void testSinglePolicyPassed() {
+ // Create a state with a single policy that always passes
+ State state = new State();
+ List<TaskLevelPolicy> policies = new ArrayList<>();
+ policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
+
+ // Create checker and execute policies
+ TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+ TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+ // Verify results
+ Assert.assertEquals(results.getPolicyResults().size(), 1);
+ for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.PASSED);
+ Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL);
+ }
+ }
+
+ @Test
+ public void testSinglePolicyFailed() {
+ // Create a state with a single policy that always fails
+ State state = new State();
+ List<TaskLevelPolicy> policies = new ArrayList<>();
+ policies.add(new FailingTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
+
+ // Create checker and execute policies
+ TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+ TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+ // Verify results
+ Assert.assertEquals(results.getPolicyResults().size(), 1);
+ for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+ Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.FAIL);
+ }
+ }
+
+ @Test
+ public void testMultiplePoliciesMixedResults() {
+ // Create a state with multiple policies having mixed results
+ State state = new State();
+ List<TaskLevelPolicy> policies = new ArrayList<>();
+ policies.add(new TestTaskLevelPolicy(state, TaskLevelPolicy.Type.FAIL));
// Passes
+ policies.add(new FailingTaskLevelPolicy(state,
TaskLevelPolicy.Type.FAIL)); // Fails
+ policies.add(new TestTaskLevelPolicy(state,
TaskLevelPolicy.Type.OPTIONAL)); // Passes
+
+ // Create checker and execute policies
+ TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+ TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+ // Verify results
+ Assert.assertEquals(results.getPolicyResults().size(), 2);
+ int passedCount = 0;
+ int failedCount = 0;
+ for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ if (entry.getKey() == TaskLevelPolicy.Result.PASSED) {
+ passedCount++;
+ } else {
+ failedCount++;
+ }
+ }
+ Assert.assertEquals(passedCount, 1);
+ Assert.assertEquals(failedCount, 1);
+ }
+
+ @Test
+ public void testOptionalPolicyFailure() {
+ // Create a state with an optional policy that fails
+ State state = new State();
+ List<TaskLevelPolicy> policies = new ArrayList<>();
+ policies.add(new FailingTaskLevelPolicy(state,
TaskLevelPolicy.Type.OPTIONAL));
+
+ // Create checker and execute policies
+ TaskLevelPolicyChecker checker = new TaskLevelPolicyChecker(policies);
+ TaskLevelPolicyCheckResults results = checker.executePolicies();
+
+ // Verify results
+ Assert.assertEquals(results.getPolicyResults().size(), 1);
+ for (Map.Entry<TaskLevelPolicy.Result, TaskLevelPolicy.Type> entry :
results.getPolicyResults().entrySet()) {
+ Assert.assertEquals(entry.getKey(), TaskLevelPolicy.Result.FAILED);
+ Assert.assertEquals(entry.getValue(), TaskLevelPolicy.Type.OPTIONAL);
+ }
+ }
+
+ // Helper class for testing failing policies
+ private static class FailingTaskLevelPolicy extends TaskLevelPolicy {
+ public FailingTaskLevelPolicy(State state, Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ return Result.FAILED;
+ }
+ }
+}
Review Comment:
nit: add end of line
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
Review Comment:
this can be replaced with `@AllArgsConstructor` annotation on class
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
Review Comment:
indentation in this file is not right. Can you please fix it to 2 spaces for
indentation
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if (!bytes.isPresent()) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.get().getBytesRead();
+ Long bytesWritten = bytes.get().getBytesWritten();
+
+ Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+ if (sizeDifference == 0) {
+ return Result.PASSED;
+ }
+
+ log.warn("File size check failed - bytes read: {}, bytes written: {},
difference: {}",
+ bytesRead, bytesWritten, sizeDifference);
+ return Result.FAILED;
+ }
+
+ @Override
+ public String toString() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if(bytes.isPresent()) {
+ return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+ } else{
+ return "FileSizePolicy [bytesRead=null, bytesWritten=null]";
+ }
+ }
+
+ /**
+ * Helper class to hold transfer bytes information
+ */
+ @Getter
+ private static class TransferBytes {
+ final Long bytesRead;
+ final Long bytesWritten;
Review Comment:
since we are already validating and parsing as long, using boxed Long is not
required. Using primitives guarantees non-null and avoids unnecessary autoboxing
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if (!bytes.isPresent()) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.get().getBytesRead();
+ Long bytesWritten = bytes.get().getBytesWritten();
+
+ Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+ if (sizeDifference == 0) {
+ return Result.PASSED;
+ }
+
+ log.warn("File size check failed - bytes read: {}, bytes written: {},
difference: {}",
+ bytesRead, bytesWritten, sizeDifference);
+ return Result.FAILED;
+ }
+
+ @Override
+ public String toString() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if(bytes.isPresent()) {
Review Comment:
`TransferBytes transferBytes =
getBytesReadAndWritten(this.state).orElse(null);`
##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/task/TaskLevelPolicyChecker.java:
##########
@@ -28,20 +29,13 @@
* executes each one, and then stores the output
* in a PolicyCheckResults object
*/
+@Getter
public class TaskLevelPolicyChecker {
- /**
- * An enumeration for possible statuses for Data quality checks,
- * its values will be PASSED, FAILED, in case if data quality check
- * evaluation is not performed for Job, it will be NOT_EVALUATED
- */
- public enum DataQualityStatus {
- PASSED,
- FAILED,
- NOT_EVALUATED
- }
private final List<TaskLevelPolicy> list;
private static final Logger LOG =
LoggerFactory.getLogger(TaskLevelPolicyChecker.class);
+ public static final String TASK_LEVEL_POLICY_RESULT_KEY =
"gobblin.task.level.policy.result";
Review Comment:
if there are multiple policies and with as type `optional` & other as
`fail`, then the failure of `fail` policy would be masked and would cause
silent success even when the required policy is failing
`results.getPolicyResults().put(result, p.getType());`
eg. if two policies return below:
('FAILURE' -> 'FAIL')
(FAILURE' -> 'OPTIONAL')
eventually we will see (FAILURE' -> 'OPTIONAL') and not consider this as DQ
failure
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -90,6 +94,14 @@ public Void call()
metricContext = Instrumented.getMetricContext(datasetState,
SafeDatasetCommit.class);
finalizeDatasetStateBeforeCommit(this.datasetState);
+ // evaluate data quality at the dataset commit level, only when commit
source is CommitActivityImpl
+
if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){
+ log.info("Evaluating data quality for commit activity for dataset {}.",
this.datasetUrn);
+ evaluateAndEmitDatasetQuality();
+ } else {
+ log.warn("Skipping data quality evaluation for dataset {} as commit
source is {}", this.datasetUrn,
+ this.datasetCommitSrc);
Review Comment:
in what all cases, does it go in else? do we want to log this and if that's
expected scenario, this can be `info` log
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if (!bytes.isPresent()) {
+ return Result.FAILED;
+ }
Review Comment:
the `bytes.isPresent() -> bytes.get()` pattern works but can be simplified
as below:
```
TransferBytes transferBytes = getBytesReadAndWritten(...).orElse(null);
if (transferBytes == null) {
return Result.FAILED;
}
long bytesRead = bytes.getBytesRead();
...
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java:
##########
@@ -90,6 +94,14 @@ public Void call()
metricContext = Instrumented.getMetricContext(datasetState,
SafeDatasetCommit.class);
finalizeDatasetStateBeforeCommit(this.datasetState);
+ // evaluate data quality at the dataset commit level, only when commit
source is CommitActivityImpl
+
if(SafeDatasetCommit.COMMIT_SRC_COMMIT_ACTIVITY_IMPL.equals(this.datasetCommitSrc)){
Review Comment:
`if {`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus(),
result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesCount = 0;
+ int passedFilesCount = 0;
+ int nonEvaluatedFilesCount = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesCount++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesCount++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesCount++;
+ jobDataQualityStatus = DataQualityStatus.FAILED;
+ } else {
+ log.warn("Unexpected data quality status: " +
taskDataQuality + " for task: " + taskState.getTaskId());
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesCount++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {},
Failed: {}, Not Evaluated: {}",
+ totalFiles, passedFilesCount, failedFilesCount,
nonEvaluatedFilesCount);
+ return new DataQualityEvaluationResult(jobDataQualityStatus,
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+ }
+
+
+ private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount, final String datasetUrn) {
+ try {
+ // Check if OpenTelemetry is enabled
+ boolean otelEnabled =
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+ if (!otelEnabled) {
+ log.info("OpenTelemetry metrics disabled, skipping metrics
emission");
+ return;
+ }
+
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ log.info("OpenTelemetry instance obtained: {}", otelMetrics !=
null);
+
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState,
datasetUrn);
+ log.info("Tags for data quality metrics: " + tags.toString());
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ log.info("Data quality status for this job is " +
jobDataQuality);
+ if (jobDataQuality == DataQualityStatus.PASSED) {
+ log.info("Data quality passed for job: {}",
jobState.getJobName());
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
Review Comment:
we can avoid repeated getMeter(...) calls and assign the meter to a local
variable once and reuse it for all metrics:
```
Meter meter = otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME);
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus(),
result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesCount = 0;
+ int passedFilesCount = 0;
+ int nonEvaluatedFilesCount = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesCount++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesCount++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesCount++;
+ jobDataQualityStatus = DataQualityStatus.FAILED;
+ } else {
+ log.warn("Unexpected data quality status: " +
taskDataQuality + " for task: " + taskState.getTaskId());
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesCount++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {},
Failed: {}, Not Evaluated: {}",
+ totalFiles, passedFilesCount, failedFilesCount,
nonEvaluatedFilesCount);
+ return new DataQualityEvaluationResult(jobDataQualityStatus,
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+ }
+
+
+ private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount, final String datasetUrn) {
+ try {
+ // Check if OpenTelemetry is enabled
+ boolean otelEnabled =
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+ if (!otelEnabled) {
+ log.info("OpenTelemetry metrics disabled, skipping metrics
emission");
+ return;
+ }
+
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ log.info("OpenTelemetry instance obtained: {}", otelMetrics !=
null);
+
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState,
datasetUrn);
+ log.info("Tags for data quality metrics: " + tags.toString());
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ log.info("Data quality status for this job is " +
jobDataQuality);
+ if (jobDataQuality == DataQualityStatus.PASSED) {
+ log.info("Data quality passed for job: {}",
jobState.getJobName());
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+
.counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT)
+ .setDescription("Number of jobs that passed data
quality")
+ .build()
+ .add(1, tags);
+ } else {
+ log.info("Data quality failed for job: {}",
jobState.getJobName());
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+
.counterBuilder(ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT)
Review Comment:
since if/else differs in metric name only, we can consider determining the
metric name first and remove this if/else block using below:
```
String jobMetricName = (jobDataQuality == DataQualityStatus.PASSED)
? ServiceMetricNames.DATA_QUALITY_JOB_SUCCESS_COUNT
: ServiceMetricNames.DATA_QUALITY_JOB_FAILURE_COUNT;
```
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import java.util.Optional;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ }
+
+ @Override
+ public Result executePolicy() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if (!bytes.isPresent()) {
+ return Result.FAILED;
+ }
+ Long bytesRead = bytes.get().getBytesRead();
+ Long bytesWritten = bytes.get().getBytesWritten();
+
+ Long sizeDifference = Math.abs(bytesRead - bytesWritten);
+
+ if (sizeDifference == 0) {
+ return Result.PASSED;
+ }
+
+ log.warn("File size check failed - bytes read: {}, bytes written: {},
difference: {}",
+ bytesRead, bytesWritten, sizeDifference);
+ return Result.FAILED;
+ }
+
+ @Override
+ public String toString() {
+ Optional<TransferBytes> bytes = getBytesReadAndWritten(this.state);
+ if(bytes.isPresent()) {
+ return String.format("FileSizePolicy [bytesRead=%s, bytesWritten=%s]",
bytes.get().getBytesRead(), bytes.get().getBytesWritten());
+ } else{
Review Comment:
nit: add space after `else {`.. similarly update at other places. Please
refer to https://gobblin.apache.org/docs/developer-guide/CodingStyle/ to import
the codestyle xml in IDE which automatically handles this
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus(),
result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesCount = 0;
+ int passedFilesCount = 0;
+ int nonEvaluatedFilesCount = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesCount++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesCount++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesCount++;
+ jobDataQualityStatus = DataQualityStatus.FAILED;
+ } else {
+ log.warn("Unexpected data quality status: " +
taskDataQuality + " for task: " + taskState.getTaskId());
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesCount++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {},
Failed: {}, Not Evaluated: {}",
+ totalFiles, passedFilesCount, failedFilesCount,
nonEvaluatedFilesCount);
+ return new DataQualityEvaluationResult(jobDataQualityStatus,
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+ }
+
+
+ private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount, final String datasetUrn) {
+ try {
+ // Check if OpenTelemetry is enabled
+ boolean otelEnabled =
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+ if (!otelEnabled) {
+ log.info("OpenTelemetry metrics disabled, skipping metrics
emission");
+ return;
+ }
+
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ log.info("OpenTelemetry instance obtained: {}", otelMetrics !=
null);
+
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState,
datasetUrn);
+ log.info("Tags for data quality metrics: " + tags.toString());
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
Review Comment:
we are emitting different metrics now.. this comment can be updated
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus(),
result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesCount = 0;
+ int passedFilesCount = 0;
+ int nonEvaluatedFilesCount = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesCount++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesCount++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesCount++;
+ jobDataQualityStatus = DataQualityStatus.FAILED;
+ } else {
+ log.warn("Unexpected data quality status: " +
taskDataQuality + " for task: " + taskState.getTaskId());
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesCount++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {},
Failed: {}, Not Evaluated: {}",
+ totalFiles, passedFilesCount, failedFilesCount,
nonEvaluatedFilesCount);
+ return new DataQualityEvaluationResult(jobDataQualityStatus,
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+ }
+
+
+ private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount, final String datasetUrn) {
+ try {
+ // Check if OpenTelemetry is enabled
+ boolean otelEnabled =
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+ if (!otelEnabled) {
+ log.info("OpenTelemetry metrics disabled, skipping metrics
emission");
+ return;
+ }
+
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ log.info("OpenTelemetry instance obtained: {}", otelMetrics !=
null);
+
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState,
datasetUrn);
+ log.info("Tags for data quality metrics: " + tags.toString());
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ log.info("Data quality status for this job is " +
jobDataQuality);
Review Comment:
can combine multiple logging statements into one
```
log.info("Emitting DQ metrics for job={}, status={}, tags={}",
jobState.getJobName(), jobDataQuality, tags);
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java:
##########
@@ -613,6 +629,11 @@ private boolean checkDataQuality(Optional<Object> schema)
TaskLevelPolicyCheckResults taskResults =
this.taskContext.getTaskLevelPolicyChecker(this.forkTaskState,
this.branches > 1 ? this.index : -1)
.executePolicies();
+ boolean allRequiredPoliciesPassed =
taskResults.getPolicyResults().entrySet().stream()
+ .filter(e -> e.getValue() == TaskLevelPolicy.Type.FAIL)
+ .allMatch(e -> e.getKey() == TaskLevelPolicy.Result.PASSED);
+ forkTaskState.setProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY,
+ allRequiredPoliciesPassed ? DataQualityStatus.PASSED.name() :
DataQualityStatus.FAILED.name());
Review Comment:
we want to mark DQ status as failure if any policy fail, right? if yes,
shouldn't we check that there is no `Result.FAILURE` key and that
`Result.PASSED` is present and based on that, mark DQ status as SUCCESS
##########
gobblin-runtime/src/main/java/org/apache/gobblin/quality/DataQualityEvaluator.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.quality;
+
+import io.opentelemetry.api.common.Attributes;
+import java.util.List;
+import java.util.Properties;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.OpenTelemetryMetrics;
+import org.apache.gobblin.metrics.OpenTelemetryMetricsBase;
+import org.apache.gobblin.metrics.ServiceMetricNames;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.qualitychecker.DataQualityStatus;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.service.ServiceConfigKeys;
+
+/**
+ * Evaluates data quality for a set of task states and emits relevant metrics.
+ * This is a stateless utility class.
+ */
+@Slf4j
+public class DataQualityEvaluator {
+
+ private static final String GAAS_OBSERVABILITY_METRICS_GROUPNAME =
"gobblin.gaas.observability";
+
+ // Private constructor to prevent instantiation
+ private DataQualityEvaluator() {}
+
+ /**
+ * Result of a data quality evaluation containing the overall status and
metrics.
+ */
+ @Getter
+ public static class DataQualityEvaluationResult {
+ private final DataQualityStatus qualityStatus;
+ private final int totalFiles;
+ private final int passedFiles;
+ private final int failedFiles;
+ // Number of files that were not evaluated for data quality for
example files not found or not processed
+ private final int nonEvaluatedFiles;
+
+ public DataQualityEvaluationResult(DataQualityStatus qualityStatus,
int totalFiles, int passedFiles, int failedFiles, int nonEvaluatedFiles) {
+ this.qualityStatus = qualityStatus;
+ this.totalFiles = totalFiles;
+ this.passedFiles = passedFiles;
+ this.failedFiles = failedFiles;
+ this.nonEvaluatedFiles = nonEvaluatedFiles;
+ }
+ }
+
+ /**
+ * Evaluates the data quality of a dataset state and stores the result.
+ * This method is specifically designed for dataset-level quality
evaluation.
+ *
+ * @param datasetState The dataset state to evaluate and update
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateAndReportDatasetQuality(JobState.DatasetState datasetState, JobState
jobState) {
+ List<TaskState> taskStates = datasetState.getTaskStates();
+ DataQualityEvaluationResult result = evaluateDataQuality(taskStates,
jobState);
+
+ // Store the result in the dataset state
+ jobState.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
result.getQualityStatus().name());
+ // Emit dataset-specific metrics
+ emitMetrics(jobState, result.getQualityStatus(),
result.getTotalFiles(),
+ result.getPassedFiles(), result.getFailedFiles(),
result.getNonEvaluatedFiles(), datasetState.getDatasetUrn());
+
+ return result;
+ }
+
+ /**
+ * Evaluates the data quality of a set of task states and emits relevant
metrics.
+ *
+ * @param taskStates List of task states to evaluate
+ * @param jobState The job state containing additional context
+ * @return DataQualityEvaluationResult containing the evaluation results
+ */
+ public static DataQualityEvaluationResult
evaluateDataQuality(List<TaskState> taskStates, JobState jobState) {
+ DataQualityStatus jobDataQualityStatus = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesCount = 0;
+ int passedFilesCount = 0;
+ int nonEvaluatedFilesCount = 0;
+
+ for (TaskState taskState : taskStates) {
+ totalFiles++;
+
+ // Handle null task states gracefully
+ if (taskState == null) {
+ log.warn("Encountered null task state, skipping data quality
evaluation for this task");
+ nonEvaluatedFilesCount++;
+ continue;
+ }
+
+ DataQualityStatus taskDataQuality = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ taskDataQuality = DataQualityStatus.fromString(result);
+ if (taskDataQuality != DataQualityStatus.NOT_EVALUATED) {
+ log.debug("Data quality status of this task is: " +
taskDataQuality);
+ if (DataQualityStatus.PASSED == taskDataQuality) {
+ passedFilesCount++;
+ } else if (DataQualityStatus.FAILED == taskDataQuality){
+ failedFilesCount++;
+ jobDataQualityStatus = DataQualityStatus.FAILED;
+ } else {
+ log.warn("Unexpected data quality status: " +
taskDataQuality + " for task: " + taskState.getTaskId());
+ }
+ } else {
+ // Handle files without data quality evaluation
+ nonEvaluatedFilesCount++;
+ log.warn("No data quality evaluation for task: " +
taskState.getTaskId());
+ }
+ }
+
+ // Log summary of evaluation
+ log.info("Data quality evaluation summary - Total: {}, Passed: {},
Failed: {}, Not Evaluated: {}",
+ totalFiles, passedFilesCount, failedFilesCount,
nonEvaluatedFilesCount);
+ return new DataQualityEvaluationResult(jobDataQualityStatus,
totalFiles, passedFilesCount, failedFilesCount, nonEvaluatedFilesCount);
+ }
+
+
+ private static void emitMetrics(JobState jobState, final DataQualityStatus
jobDataQuality, final int totalFiles,
+ final int passedFilesCount, final int failedFilesCount, final int
nonEvaluatedFilesCount, final String datasetUrn) {
+ try {
+ // Check if OpenTelemetry is enabled
+ boolean otelEnabled =
jobState.getPropAsBoolean(ConfigurationKeys.METRICS_REPORTING_OPENTELEMETRY_ENABLED,
+
ConfigurationKeys.DEFAULT_METRICS_REPORTING_OPENTELEMETRY_ENABLED);
+
+ if (!otelEnabled) {
+ log.info("OpenTelemetry metrics disabled, skipping metrics
emission");
+ return;
+ }
+
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ log.info("OpenTelemetry instance obtained: {}", otelMetrics !=
null);
+
+ if (otelMetrics != null) {
Review Comment:
we can check for `if (otelMetrics == null)` and return early. It also
simplifies the rest of the method by reducing indentation and nesting, making
the main logic easier to follow.
Issue Time Tracking
-------------------
Worklog Id: (was: 977040)
Time Spent: 1h 40m (was: 1.5h)
> FileSize Data Quality implementation for FileBasedCopy
> ------------------------------------------------------
>
> Key: GOBBLIN-2204
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2204
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vaibhav Singhal
> Priority: Major
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)