[
https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=972219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-972219
]
ASF GitHub Bot logged work on GOBBLIN-2204:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 09/Jun/25 06:11
Start Date: 09/Jun/25 06:11
Worklog Time Spent: 10m
Work Description: khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2135036625
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ private final Long bytesRead;
+ private final Long bytesWritten;
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ String bytesReadString = state.getProp(BYTES_READ_KEY);
+ String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+ this.bytesRead = bytesReadString == null ? null :
Long.parseLong(bytesReadString);
+ this.bytesWritten = bytesWrittenString == null ? null :
Long.parseLong(bytesWrittenString);
+ }
+
+ @Override
+ public Result executePolicy() {
+ if(this.bytesRead == null || this.bytesWritten == null) {
+ log.error("No bytes read or bytes written for this request");
Review Comment:
it would be useful to log both values to pinpoint which one(in case only
one) was missing
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ private final Long bytesRead;
+ private final Long bytesWritten;
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ String bytesReadString = state.getProp(BYTES_READ_KEY);
+ String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+ this.bytesRead = bytesReadString == null ? null :
Long.parseLong(bytesReadString);
Review Comment:
`Long.parseLong` doesn't guard against non-numeric values and can throw
`NumberFormatException`. We can move this to `executoPolicy`, as it avoids
instantiation failure due to NumberFormatException and improves error locality
##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker;
+
+/**
+ * An enumeration for possible statuses for Data quality checks.
+ * Its values will be:
+ * - PASSED: When all data quality checks pass
+ * - FAILED: When any data quality check fails
+ * - NOT_EVALUATED: When data quality check evaluation is not performed
+ */
+public enum DataQualityStatus {
+ PASSED,
+ FAILED,
+ NOT_EVALUATED,
+ UNKNOWN
Review Comment:
how/when are we using this status?
##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+ public static final String COPY_PREFIX = "gobblin.copy";
+ public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+ public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+ private final Long bytesRead;
+ private final Long bytesWritten;
+
+ public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+ super(state, type);
+ String bytesReadString = state.getProp(BYTES_READ_KEY);
+ String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+ this.bytesRead = bytesReadString == null ? null :
Long.parseLong(bytesReadString);
+ this.bytesWritten = bytesWrittenString == null ? null :
Long.parseLong(bytesWrittenString);
+ }
+
+ @Override
+ public Result executePolicy() {
+ if(this.bytesRead == null || this.bytesWritten == null) {
+ log.error("No bytes read or bytes written for this request");
+ return Result.FAILED;
+ }
+ double sizeDifference = Math.abs(this.bytesRead - this.bytesWritten);
Review Comment:
Please use long instead of double for `sizeDifference`, since both bytesRead
and bytesWritten are long values
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
Review Comment:
as we can are emitting metrics, can rename to use `emit/report` in place of
`store`
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
+ }
+ }
+ log.info("Data quality status of this task is: " + qualityResult);
+ if (DataQualityStatus.PASSED != qualityResult) {
+ failedFilesSize++;
+ log.warn("Data quality not passed: " + qualityResult);
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
+ else {
+ passedFilesSize++;
+ }
+ }
+
+ super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
jobDataQuality.name());
+
+ // Emit OTEL metrics for data quality
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState);
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ DataQualityStatus finalJobDataQuality = jobDataQuality;
Review Comment:
please use `final` since this should not change
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
+ }
+ }
+ log.info("Data quality status of this task is: " + qualityResult);
+ if (DataQualityStatus.PASSED != qualityResult) {
+ failedFilesSize++;
+ log.warn("Data quality not passed: " + qualityResult);
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
+ else {
+ passedFilesSize++;
+ }
+ }
+
+ super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
jobDataQuality.name());
+
+ // Emit OTEL metrics for data quality
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState);
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ DataQualityStatus finalJobDataQuality = jobDataQuality;
+ log.info("Data quality status for this job is " + finalJobDataQuality);
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME)
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ log.info("Emitting metric for data quality");
+
measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 :
0, tags);
+ });
+
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+ .build()
+ .add(totalFiles, tags);
+ // Emit passed files count
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+ .build().add(passedFilesSize, tags);
+ // Emit failed files count
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+ .build().add(failedFilesSize, tags);
+ }
+ }
+
+ private Attributes getTagsForDataQualityMetrics(JobState jobState) {
+ Properties jobProperties = new Properties();
+ try {
+ jobProperties =
PropertiesUtils.deserialize(jobState.getProp("job.props", ""));
+ log.info("Job properties loaded: " + jobProperties);
+ } catch (IOException e) {
+ log.error("Could not deserialize job properties", e);
+ }
Review Comment:
consider logging a clearer warning or adding a fallback tag/metric to
indicate that job.props deserialization failed. Without this, OTEL metrics
would silently lack important context like source/destination, making debugging
or metric analysis harder.
##########
gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FileSizePolicyTest {
+
+ @Test
+ public void testPolicyPass() {
+ State state = new State();
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+ state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.PASSED);
+ }
+
+ @Test
+ public void testPolicyFail() {
+ State state = new State();
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+ state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L);
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ }
+
+ @Test
+ public void testMissingProperties() {
+ State state = new State();
+ // No properties set at all
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ }
+
+ @Test
+ public void testPartiallySetProperties() {
+ State state = new State();
+ // Only set bytes read, not bytes written
+ state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+
+ FileSizePolicy policy = new FileSizePolicy(state,
TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+
+ // Reset state and only set bytes written, not bytes read
+ state = new State();
+ state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+ policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
+ Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+ }
+
+}
Review Comment:
nit: add a newline at end of file
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by
(default 1.0)
+ * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size
(default 0)
+ */
+@Slf4j
+public class IncorrectSizeFileAwareInputStreamDataWriter extends
FileAwareInputStreamDataWriter {
+
+ public static final String INCORRECT_SIZE_RATIO_KEY =
CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio";
+ public static final String INCORRECT_SIZE_OFFSET_KEY =
CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset";
+ public static final double DEFAULT_INCORRECT_SIZE_RATIO = 1.0;
Review Comment:
should we use a default value as some incorrect ratio like `0.9`? Else we
can clarify in Javadoc that default config does not introduce discrepancy
unless overridden
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
Review Comment:
consider adding a fromString() static method in DataQualityStatus to
centralize the parsing and fallback logic. This will simplify code and ensure
consistent handling of unknown/malformed values.
```
public static DataQualityStatus fromString(String value) {
if (value == null) {
return UNKNOWN;
}
try {
return DataQualityStatus.valueOf(value.toUpperCase());
} catch (IllegalArgumentException e) {
log.error(...);
return UNKNOWN;
}
}
```
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
+ }
+ }
+ log.info("Data quality status of this task is: " + qualityResult);
+ if (DataQualityStatus.PASSED != qualityResult) {
+ failedFilesSize++;
+ log.warn("Data quality not passed: " + qualityResult);
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
+ else {
+ passedFilesSize++;
+ }
+ }
+
+ super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
jobDataQuality.name());
+
+ // Emit OTEL metrics for data quality
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState);
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ DataQualityStatus finalJobDataQuality = jobDataQuality;
+ log.info("Data quality status for this job is " + finalJobDataQuality);
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME)
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ log.info("Emitting metric for data quality");
+
measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 :
0, tags);
+ });
+
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+ .build()
+ .add(totalFiles, tags);
+ // Emit passed files count
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+ .build().add(passedFilesSize, tags);
+ // Emit failed files count
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+ .build().add(failedFilesSize, tags);
+ }
+ }
+
+ private Attributes getTagsForDataQualityMetrics(JobState jobState) {
+ Properties jobProperties = new Properties();
+ try {
+ jobProperties =
PropertiesUtils.deserialize(jobState.getProp("job.props", ""));
+ log.info("Job properties loaded: " + jobProperties);
+ } catch (IOException e) {
+ log.error("Could not deserialize job properties", e);
+ }
+
+ return Attributes.builder()
+ .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
jobState.getJobName())
+ .put(TimingEvent.DATASET_URN, this.getDatasetUrn())
+ .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+ .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+ .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+ .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD))
+ .put(TimingEvent.FlowEventConstants.FLOW_FABRIC
,jobState.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, null))
+ .put(TimingEvent.FlowEventConstants.FLOW_SOURCE
,jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))
+ .put(TimingEvent.FlowEventConstants.FLOW_DESTINATION,
jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY,
""))
+ .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD,
jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, ""))
+ .build();
+ }
+
+ /**
+ * Gets the overall data quality status of the dataset.
+ * @return "PASSED" if all tasks passed their quality checks, "FAILED"
otherwise
+ */
+ public String getDataQualityStatus() {
Review Comment:
can we return enum from here to avoid string logic downstream.. we can use
`.name()` from calling methods, if it requires the string representation
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
+ }
+ }
+ log.info("Data quality status of this task is: " + qualityResult);
Review Comment:
this would log for each task.. can we use `log.debug` instead
##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by
(default 1.0)
+ * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size
(default 0)
+ */
+@Slf4j
+public class IncorrectSizeFileAwareInputStreamDataWriter extends
FileAwareInputStreamDataWriter {
+
+ public static final String INCORRECT_SIZE_RATIO_KEY =
CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio";
+ public static final String INCORRECT_SIZE_OFFSET_KEY =
CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset";
+ public static final double DEFAULT_INCORRECT_SIZE_RATIO = 1.0;
+ public static final long DEFAULT_INCORRECT_SIZE_OFFSET = 0L;
+
+ private final double sizeRatio;
+ private final long sizeOffset;
+
+ public IncorrectSizeFileAwareInputStreamDataWriter(State state, int
numBranches, int branchId)
+ throws IOException {
+ this(state, numBranches, branchId, null);
+ }
+
+ public IncorrectSizeFileAwareInputStreamDataWriter(State state, int
numBranches, int branchId, String writerAttemptId)
+ throws IOException {
+ super(state, numBranches, branchId, writerAttemptId);
+ this.sizeRatio = state.getPropAsDouble(INCORRECT_SIZE_RATIO_KEY,
DEFAULT_INCORRECT_SIZE_RATIO);
+ this.sizeOffset = state.getPropAsLong(INCORRECT_SIZE_OFFSET_KEY,
DEFAULT_INCORRECT_SIZE_OFFSET);
Review Comment:
should we check for negative values here?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
+ }
+ }
+ log.info("Data quality status of this task is: " + qualityResult);
+ if (DataQualityStatus.PASSED != qualityResult) {
+ failedFilesSize++;
+ log.warn("Data quality not passed: " + qualityResult);
Review Comment:
please add a task identifier for failed DQ task in the log
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+ DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+ int totalFiles = 0;
+ int failedFilesSize = 0;
+ int passedFilesSize = 0;
+ for (TaskState taskState : getTaskStates()) {
+ totalFiles++;
+ DataQualityStatus qualityResult = null;
+ String result =
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+ if (result != null) {
+ try {
+ qualityResult = DataQualityStatus.valueOf(result);
+ } catch (IllegalArgumentException e) {
+ log.warn("Unknown data quality status encountered " + result);
+ qualityResult = DataQualityStatus.UNKNOWN;
+ }
+ }
+ log.info("Data quality status of this task is: " + qualityResult);
+ if (DataQualityStatus.PASSED != qualityResult) {
+ failedFilesSize++;
+ log.warn("Data quality not passed: " + qualityResult);
+ jobDataQuality = DataQualityStatus.FAILED;
+ }
+ else {
+ passedFilesSize++;
+ }
+ }
+
+ super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY,
jobDataQuality.name());
+
+ // Emit OTEL metrics for data quality
+ OpenTelemetryMetricsBase otelMetrics =
OpenTelemetryMetrics.getInstance(jobState);
+ if (otelMetrics != null) {
+ Attributes tags = getTagsForDataQualityMetrics(jobState);
+ // Emit data quality status (1 for PASSED, 0 for FAILED)
+ DataQualityStatus finalJobDataQuality = jobDataQuality;
+ log.info("Data quality status for this job is " + finalJobDataQuality);
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME)
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ log.info("Emitting metric for data quality");
+
measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 :
0, tags);
+ });
+
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+ .build()
+ .add(totalFiles, tags);
+ // Emit passed files count
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+ .build().add(passedFilesSize, tags);
+ // Emit failed files count
+ otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+ .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+ .build().add(failedFilesSize, tags);
Review Comment:
we can combine the three counter emissions into a single `.batchCallback()`
to improve efficiency and avoid duplication
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
return
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
}
+ /**
+ * Computes and stores the overall data quality status based on task-level
policy results.
+ * The status will be "PASSED" if all tasks passed their quality checks,
"FAILED" otherwise.
+ */
+ public void computeAndStoreDatasetQualityStatus(JobState jobState) {
Review Comment:
this method introduces metrics emission and data quality evaluation logic
directly into JobState, which is primarily a data container for job/task state.
To improve separation of concerns, please move this logic into a higher-level
component e.g. a DataQualityEvaluator/JobResultHandler
Issue Time Tracking
-------------------
Worklog Id: (was: 972219)
Time Spent: 40m (was: 0.5h)
> FileSize Data Quality implementation for FileBasedCopy
> ------------------------------------------------------
>
> Key: GOBBLIN-2204
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2204
> Project: Apache Gobblin
> Issue Type: Task
> Reporter: Vaibhav Singhal
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)