lokeshj1703 commented on code in PR #9422:
URL: https://github.com/apache/hudi/pull/9422#discussion_r1298194906


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -202,10 +246,23 @@ private void 
testBaseFileAndLogFileUpdateMatchesHelper(Boolean shouldAsyncCompac
       assertEquals(0, 
readMatchingRecords().except(batch1.union(updatedRecord)).count());
     }
 
-    //Corrupt to prove that colstats does not exclude filegroup
-    filesToCorrupt.forEach(TestMORColstats::corruptFile);
-    assertEquals(1, filesToCorrupt.size());
-    assertThrows(SparkException.class, () -> readMatchingRecords().count());
+    if (shouldExecuteCompaction) {
+      doCompaction();
+      filesToCorrupt = getFilesToCorrupt();
+      filesToCorrupt.forEach(TestMORColstats::corruptFile);
+      if (shouldDelete || shouldRollback) {
+        //we corrupt both files in the fg
+        assertEquals(2, filesToCorrupt.size());

Review Comment:
   The first file group should match the condition right. So how are we able to 
corrupt both the files?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -237,24 +232,27 @@ private void 
testBaseFileAndLogFileUpdateMatchesHelper(Boolean shouldAsyncCompac
     doWrite(updatedRecord);
     if (shouldRollback) {
       deleteLatestDeltacommit();
+      enableInlineCompaction(shouldInlineCompact);

Review Comment:
   Is the function call `enableInlineCompaction` required here or can it be 
moved in the if block.
   `    if (shouldInlineCompact) {`



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
+import static org.apache.spark.sql.SaveMode.Append;
+import static org.apache.spark.sql.SaveMode.Overwrite;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test mor with colstats enabled in scenarios to ensure that files
+ * are being appropriately read or not read.
+ * The strategy employed is to corrupt targeted base files. If we want
+ * to prove the file is read, we assert that an exception will be thrown.
+ * If we want to prove the file is not read, we expect the read to
+ * successfully execute.
+ */
+public class TestMORColstats extends HoodieSparkClientTestBase {

Review Comment:
   rename to TestDataSkippingWithMORColstats?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
+import static org.apache.spark.sql.SaveMode.Append;
+import static org.apache.spark.sql.SaveMode.Overwrite;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test mor with colstats enabled in scenarios to ensure that files
+ * are being appropriately read or not read.
+ * The strategy employed is to corrupt targeted base files. If we want
+ * to prove the file is read, we assert that an exception will be thrown.
+ * If we want to prove the file is not read, we expect the read to
+ * successfully execute.
+ */
+public class TestMORColstats extends HoodieSparkClientTestBase {
+
+  private static String matchCond = "trip_type = 'UBERX'";
+  private static String nonMatchCond = "trip_type = 'BLACK'";
+  private static String[] dropColumns = {"_hoodie_commit_time", 
"_hoodie_commit_seqno",
+      "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"};
+
+  private Boolean shouldOverwrite;
+  Map<String, String> options;
+  @TempDir
+  public java.nio.file.Path basePath;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator();
+    shouldOverwrite = true;
+    options = getOptions();
+    Properties props = new Properties();
+    props.putAll(options);
+    try {
+      metaClient = HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath.toString(), props);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    metaClient = null;
+  }
+
+  /**
+   * Create two files, one should be excluded by colstats
+   */
+  @Test
+  public void testBaseFileOnly() {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    filesToCorrupt.forEach(TestMORColstats::corruptFile);
+    assertEquals(0, readMatchingRecords().except(batch1).count());
+    //Read without data skipping to show that it will fail
+    //Reading with data skipping succeeded so that means that data skipping is 
working and the corrupted
+    //file was not read
+    assertThrows(SparkException.class, () -> 
readMatchingRecords(false).count());
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatches() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * Then do a compaction
+   * Now you have two base files that match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDoCompaction() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file for each filegroup that contains exactly the same 
records as the base file
+   * Then schedule an async compaction
+   * Then add a log file so that both file groups match the condition
+   * The new log file is a member of a newer file slice
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAsyncCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(true, false,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * both file groups must still be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * Then compact
+   * Only the first file group needs to be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * both filegroups should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, true);
+  }
+
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * Do Compaction
+   * Only 1 filegroup should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBackCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, true);
+  }
+
+  /**
+   * Test where one filegroup doesn't match the condition, then update so both 
filegroups match
+   */
+  private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean 
shouldScheduleCompaction,

Review Comment:
   Also can we try and reduce the length of this function and divide into 
smaller sub functions as much as possible? I feel the compaction related 
assertions can move to a separate function.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -162,19 +192,33 @@ public void 
testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
    * Then delete the deltacommit and write the original value for the
    *    record so that a rollback is triggered and the file group no
    *    longer matches the condition
-   * Rollback should still happen because
+   * both filegroups should be read
    */
   @Test
   public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
-    testBaseFileAndLogFileUpdateMatchesHelper(false, false, true);
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, true);
   }
 
+
   /**
-   * Test where one filegroup doesn't match the condition, then update so both 
filegroups match,
-   * finally, add another update so that the file group doesn't match again
-   * In all three cases, dataskipping should not exclude the filegroup
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * Do Compaction
+   * Only 1 filegroup should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBackCompact() {

Review Comment:
   We should probably keep the one with compact. The one with no compaction 
test case would already be covered here?
   Similar for delete test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to