[ 
https://issues.apache.org/jira/browse/GOBBLIN-2204?focusedWorklogId=972219&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-972219
 ]

ASF GitHub Bot logged work on GOBBLIN-2204:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Jun/25 06:11
            Start Date: 09/Jun/25 06:11
    Worklog Time Spent: 10m 
      Work Description: khandelwal-prateek commented on code in PR #4113:
URL: https://github.com/apache/gobblin/pull/4113#discussion_r2135036625


##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  private final Long bytesRead;
+  private final Long bytesWritten;
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+    String bytesReadString = state.getProp(BYTES_READ_KEY);
+    String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+    this.bytesRead = bytesReadString == null ? null : 
Long.parseLong(bytesReadString);
+    this.bytesWritten = bytesWrittenString == null ? null : 
Long.parseLong(bytesWrittenString);
+  }
+
+  @Override
+  public Result executePolicy() {
+    if(this.bytesRead == null || this.bytesWritten == null) {
+      log.error("No bytes read or bytes written for this request");

Review Comment:
   it would be useful to log both values to pinpoint which one(in case only 
one) was missing



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  private final Long bytesRead;
+  private final Long bytesWritten;
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+    String bytesReadString = state.getProp(BYTES_READ_KEY);
+    String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+    this.bytesRead = bytesReadString == null ? null : 
Long.parseLong(bytesReadString);

Review Comment:
   `Long.parseLong` doesn't guard against non-numeric values and can throw 
`NumberFormatException`. We can move this to `executoPolicy`, as it avoids 
instantiation failure due to NumberFormatException and improves error locality 



##########
gobblin-core/src/main/java/org/apache/gobblin/qualitychecker/DataQualityStatus.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.qualitychecker;
+
+/**
+ * An enumeration for possible statuses for Data quality checks.
+ * Its values will be:
+ * - PASSED: When all data quality checks pass
+ * - FAILED: When any data quality check fails
+ * - NOT_EVALUATED: When data quality check evaluation is not performed
+ */
+public enum DataQualityStatus {
+  PASSED,
+  FAILED,
+  NOT_EVALUATED,
+  UNKNOWN

Review Comment:
   how/when are we using this status?



##########
gobblin-core/src/main/java/org/apache/gobblin/policies/size/FileSizePolicy.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+
+/**
+ * A task-level policy that checks if the bytes read matches the bytes written 
for a file copy operation.
+ */
+@Slf4j
+public class FileSizePolicy extends TaskLevelPolicy {
+
+  public static final String COPY_PREFIX = "gobblin.copy";
+  public static final String BYTES_READ_KEY = COPY_PREFIX + ".bytesRead";
+  public static final String BYTES_WRITTEN_KEY = COPY_PREFIX + ".bytesWritten";
+
+  private final Long bytesRead;
+  private final Long bytesWritten;
+
+  public FileSizePolicy(State state, TaskLevelPolicy.Type type) {
+    super(state, type);
+    String bytesReadString = state.getProp(BYTES_READ_KEY);
+    String bytesWrittenString = state.getProp(BYTES_WRITTEN_KEY);
+    this.bytesRead = bytesReadString == null ? null : 
Long.parseLong(bytesReadString);
+    this.bytesWritten = bytesWrittenString == null ? null : 
Long.parseLong(bytesWrittenString);
+  }
+
+  @Override
+  public Result executePolicy() {
+    if(this.bytesRead == null || this.bytesWritten == null) {
+      log.error("No bytes read or bytes written for this request");
+      return Result.FAILED;
+    }
+    double sizeDifference = Math.abs(this.bytesRead - this.bytesWritten);

Review Comment:
   Please use long instead of double for `sizeDifference`, since both bytesRead 
and bytesWritten are long values



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {

Review Comment:
   as we can are emitting metrics, can rename to use `emit/report` in place of 
`store` 



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;
+          }
+        }
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (DataQualityStatus.PASSED != qualityResult) {
+          failedFilesSize++;
+          log.warn("Data quality not passed: " + qualityResult);
+          jobDataQuality = DataQualityStatus.FAILED;
+        }
+        else {
+          passedFilesSize++;
+        }
+      }
+
+      super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
jobDataQuality.name());
+
+      // Emit OTEL metrics for data quality
+      OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics != null) {
+        Attributes tags = getTagsForDataQualityMetrics(jobState);
+        // Emit data quality status (1 for PASSED, 0 for FAILED)
+        DataQualityStatus finalJobDataQuality = jobDataQuality;

Review Comment:
   please use `final` since this should not change



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;
+          }
+        }
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (DataQualityStatus.PASSED != qualityResult) {
+          failedFilesSize++;
+          log.warn("Data quality not passed: " + qualityResult);
+          jobDataQuality = DataQualityStatus.FAILED;
+        }
+        else {
+          passedFilesSize++;
+        }
+      }
+
+      super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
jobDataQuality.name());
+
+      // Emit OTEL metrics for data quality
+      OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics != null) {
+        Attributes tags = getTagsForDataQualityMetrics(jobState);
+        // Emit data quality status (1 for PASSED, 0 for FAILED)
+        DataQualityStatus finalJobDataQuality = jobDataQuality;
+        log.info("Data quality status for this job is " + finalJobDataQuality);
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME)
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+              log.info("Emitting metric for data quality");
+              
measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 
0, tags);
+            });
+
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+            .build()
+            .add(totalFiles, tags);
+        // Emit passed files count
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+            .build().add(passedFilesSize, tags);
+        // Emit failed files count
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+            .build().add(failedFilesSize, tags);
+      }
+    }
+
+    private Attributes getTagsForDataQualityMetrics(JobState jobState) {
+      Properties jobProperties = new Properties();
+      try {
+        jobProperties = 
PropertiesUtils.deserialize(jobState.getProp("job.props", ""));
+        log.info("Job properties loaded: " + jobProperties);
+      } catch (IOException e) {
+        log.error("Could not deserialize job properties", e);
+      }

Review Comment:
   consider logging a clearer warning or adding a fallback tag/metric to 
indicate that job.props deserialization failed. Without this, OTEL metrics 
would silently lack important context like source/destination, making debugging 
or metric analysis harder.



##########
gobblin-core/src/test/java/org/apache/gobblin/policies/size/FileSizePolicyTest.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.policies.size;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.qualitychecker.task.TaskLevelPolicy;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class FileSizePolicyTest {
+
+  @Test
+  public void testPolicyPass() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.PASSED);
+  }
+
+  @Test
+  public void testPolicyFail() {
+    State state = new State();
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 900L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+  }
+
+  @Test
+  public void testMissingProperties() {
+    State state = new State();
+    // No properties set at all
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+  }
+
+  @Test
+  public void testPartiallySetProperties() {
+    State state = new State();
+    // Only set bytes read, not bytes written
+    state.setProp(FileSizePolicy.BYTES_READ_KEY, 1000L);
+
+    FileSizePolicy policy = new FileSizePolicy(state, 
TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+
+    // Reset state and only set bytes written, not bytes read
+    state = new State();
+    state.setProp(FileSizePolicy.BYTES_WRITTEN_KEY, 1000L);
+
+    policy = new FileSizePolicy(state, TaskLevelPolicy.Type.FAIL);
+    Assert.assertEquals(policy.executePolicy(), TaskLevelPolicy.Result.FAILED);
+  }
+
+}

Review Comment:
   nit: add a newline at end of file



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to 
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports 
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by 
(default 1.0)
+ * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size 
(default 0)
+ */
+@Slf4j
+public class IncorrectSizeFileAwareInputStreamDataWriter extends 
FileAwareInputStreamDataWriter {
+
+  public static final String INCORRECT_SIZE_RATIO_KEY = 
CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio";
+  public static final String INCORRECT_SIZE_OFFSET_KEY = 
CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset";
+  public static final double DEFAULT_INCORRECT_SIZE_RATIO = 1.0;

Review Comment:
   should we use a default value as some incorrect ratio like `0.9`? Else we 
can clarify in Javadoc that default config does not introduce discrepancy 
unless overridden



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;

Review Comment:
   consider adding a fromString() static method in DataQualityStatus to 
centralize the parsing and fallback logic. This will simplify code and ensure 
consistent handling of unknown/malformed values.
   
   ```
     public static DataQualityStatus fromString(String value) {
       if (value == null) {
         return UNKNOWN;
       }
       try {
         return DataQualityStatus.valueOf(value.toUpperCase());
       } catch (IllegalArgumentException e) {
         log.error(...);
         return UNKNOWN;
       }
     }
   ```



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;
+          }
+        }
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (DataQualityStatus.PASSED != qualityResult) {
+          failedFilesSize++;
+          log.warn("Data quality not passed: " + qualityResult);
+          jobDataQuality = DataQualityStatus.FAILED;
+        }
+        else {
+          passedFilesSize++;
+        }
+      }
+
+      super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
jobDataQuality.name());
+
+      // Emit OTEL metrics for data quality
+      OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics != null) {
+        Attributes tags = getTagsForDataQualityMetrics(jobState);
+        // Emit data quality status (1 for PASSED, 0 for FAILED)
+        DataQualityStatus finalJobDataQuality = jobDataQuality;
+        log.info("Data quality status for this job is " + finalJobDataQuality);
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME)
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+              log.info("Emitting metric for data quality");
+              
measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 
0, tags);
+            });
+
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+            .build()
+            .add(totalFiles, tags);
+        // Emit passed files count
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+            .build().add(passedFilesSize, tags);
+        // Emit failed files count
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+            .build().add(failedFilesSize, tags);
+      }
+    }
+
+    private Attributes getTagsForDataQualityMetrics(JobState jobState) {
+      Properties jobProperties = new Properties();
+      try {
+        jobProperties = 
PropertiesUtils.deserialize(jobState.getProp("job.props", ""));
+        log.info("Job properties loaded: " + jobProperties);
+      } catch (IOException e) {
+        log.error("Could not deserialize job properties", e);
+      }
+
+      return Attributes.builder()
+          .put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobState.getJobName())
+          .put(TimingEvent.DATASET_URN, this.getDatasetUrn())
+          .put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD))
+          .put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD))
+          .put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD))
+          .put(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD, 
jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EDGE_FIELD))
+          .put(TimingEvent.FlowEventConstants.FLOW_FABRIC 
,jobState.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, null))
+          .put(TimingEvent.FlowEventConstants.FLOW_SOURCE 
,jobProperties.getProperty(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, ""))
+          .put(TimingEvent.FlowEventConstants.FLOW_DESTINATION, 
jobProperties.getProperty(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, 
""))
+          .put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
jobState.getProp(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, ""))
+          .build();
+    }
+
+    /**
+     * Gets the overall data quality status of the dataset.
+     * @return "PASSED" if all tasks passed their quality checks, "FAILED" 
otherwise
+     */
+    public String getDataQualityStatus() {

Review Comment:
   can we return enum from here to avoid string logic downstream.. we can use 
`.name()` from calling methods, if it requires the string representation



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;
+          }
+        }
+        log.info("Data quality status of this task is: " + qualityResult);

Review Comment:
   this would log for each task.. can we use `log.debug` instead



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/IncorrectSizeFileAwareInputStreamDataWriter.java:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.writer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.fs.Path;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.policies.size.FileSizePolicy;
+import org.apache.gobblin.writer.DataWriter;
+
+/**
+ * A {@link DataWriter} that extends {@link FileAwareInputStreamDataWriter} to 
intentionally report incorrect file sizes.
+ * This is useful for testing data quality checks that verify file sizes.
+ *
+ * The writer actually writes the correct data to the destination, but reports 
incorrect sizes in the bytesWritten() method.
+ * The size discrepancy can be configured through properties:
+ * - gobblin.copy.incorrect.size.ratio: Ratio to multiply actual size by 
(default 1.0)
+ * - gobblin.copy.incorrect.size.offset: Fixed offset to add to actual size 
(default 0)
+ */
+@Slf4j
+public class IncorrectSizeFileAwareInputStreamDataWriter extends 
FileAwareInputStreamDataWriter {
+
+  public static final String INCORRECT_SIZE_RATIO_KEY = 
CopyConfiguration.COPY_PREFIX + ".incorrect.size.ratio";
+  public static final String INCORRECT_SIZE_OFFSET_KEY = 
CopyConfiguration.COPY_PREFIX + ".incorrect.size.offset";
+  public static final double DEFAULT_INCORRECT_SIZE_RATIO = 1.0;
+  public static final long DEFAULT_INCORRECT_SIZE_OFFSET = 0L;
+
+  private final double sizeRatio;
+  private final long sizeOffset;
+
+  public IncorrectSizeFileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId)
+      throws IOException {
+    this(state, numBranches, branchId, null);
+  }
+
+  public IncorrectSizeFileAwareInputStreamDataWriter(State state, int 
numBranches, int branchId, String writerAttemptId)
+      throws IOException {
+    super(state, numBranches, branchId, writerAttemptId);
+    this.sizeRatio = state.getPropAsDouble(INCORRECT_SIZE_RATIO_KEY, 
DEFAULT_INCORRECT_SIZE_RATIO);
+    this.sizeOffset = state.getPropAsLong(INCORRECT_SIZE_OFFSET_KEY, 
DEFAULT_INCORRECT_SIZE_OFFSET);

Review Comment:
   should we check for negative values here?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;
+          }
+        }
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (DataQualityStatus.PASSED != qualityResult) {
+          failedFilesSize++;
+          log.warn("Data quality not passed: " + qualityResult);

Review Comment:
   please add a task identifier for failed DQ task in the log



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {
+      DataQualityStatus jobDataQuality = DataQualityStatus.PASSED;
+      int totalFiles = 0;
+      int failedFilesSize = 0;
+      int passedFilesSize = 0;
+      for (TaskState taskState : getTaskStates()) {
+        totalFiles++;
+        DataQualityStatus qualityResult = null;
+        String result = 
taskState.getProp(ConfigurationKeys.TASK_LEVEL_POLICY_RESULT_KEY);
+        if (result != null) {
+          try {
+            qualityResult = DataQualityStatus.valueOf(result);
+          } catch (IllegalArgumentException e) {
+            log.warn("Unknown data quality status encountered " + result);
+            qualityResult = DataQualityStatus.UNKNOWN;
+          }
+        }
+        log.info("Data quality status of this task is: " + qualityResult);
+        if (DataQualityStatus.PASSED != qualityResult) {
+          failedFilesSize++;
+          log.warn("Data quality not passed: " + qualityResult);
+          jobDataQuality = DataQualityStatus.FAILED;
+        }
+        else {
+          passedFilesSize++;
+        }
+      }
+
+      super.setProp(ConfigurationKeys.DATASET_QUALITY_STATUS_KEY, 
jobDataQuality.name());
+
+      // Emit OTEL metrics for data quality
+      OpenTelemetryMetricsBase otelMetrics = 
OpenTelemetryMetrics.getInstance(jobState);
+      if (otelMetrics != null) {
+        Attributes tags = getTagsForDataQualityMetrics(jobState);
+        // Emit data quality status (1 for PASSED, 0 for FAILED)
+        DataQualityStatus finalJobDataQuality = jobDataQuality;
+        log.info("Data quality status for this job is " + finalJobDataQuality);
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .gaugeBuilder(ServiceMetricNames.DATA_QUALITY_STATUS_METRIC_NAME)
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+              log.info("Emitting metric for data quality");
+              
measurement.record(DataQualityStatus.PASSED.equals(finalJobDataQuality) ? 1 : 
0, tags);
+            });
+
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_OVERALL_FILE_COUNT)
+            .build()
+            .add(totalFiles, tags);
+        // Emit passed files count
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_SUCCESS_FILE_COUNT)
+            .build().add(passedFilesSize, tags);
+        // Emit failed files count
+        otelMetrics.getMeter(GAAS_OBSERVABILITY_METRICS_GROUPNAME)
+            .counterBuilder(ServiceMetricNames.DATA_QUALITY_FAILURE_FILE_COUNT)
+            .build().add(failedFilesSize, tags);

Review Comment:
   we can combine the three counter emissions into a single `.batchCallback()` 
to improve efficiency and avoid duplication



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobState.java:
##########
@@ -788,12 +800,108 @@ public int getJobFailures() {
       return 
Integer.parseInt(super.getProp(ConfigurationKeys.JOB_FAILURES_KEY));
     }
 
+    /**
+     * Computes and stores the overall data quality status based on task-level 
policy results.
+     * The status will be "PASSED" if all tasks passed their quality checks, 
"FAILED" otherwise.
+     */
+    public void computeAndStoreDatasetQualityStatus(JobState jobState) {

Review Comment:
   this method introduces metrics emission and data quality evaluation logic 
directly into JobState, which is primarily a data container for job/task state. 
To improve separation of concerns, please move this logic into a higher-level 
component e.g. a DataQualityEvaluator/JobResultHandler





Issue Time Tracking
-------------------

    Worklog Id:     (was: 972219)
    Time Spent: 40m  (was: 0.5h)

> FileSize Data Quality implementation for FileBasedCopy
> ------------------------------------------------------
>
>                 Key: GOBBLIN-2204
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2204
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vaibhav Singhal
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Reply via email to