jonvex commented on code in PR #9422:
URL: https://github.com/apache/hudi/pull/9422#discussion_r1300201256


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -0,0 +1,481 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS;
+import static org.apache.spark.sql.SaveMode.Append;
+import static org.apache.spark.sql.SaveMode.Overwrite;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test mor with colstats enabled in scenarios to ensure that files
+ * are being appropriately read or not read.
+ * The strategy employed is to corrupt targeted base files. If we want
+ * to prove the file is read, we assert that an exception will be thrown.
+ * If we want to prove the file is not read, we expect the read to
+ * successfully execute.
+ */
+public class TestMORColstats extends HoodieSparkClientTestBase {
+
+  private static String matchCond = "trip_type = 'UBERX'";
+  private static String nonMatchCond = "trip_type = 'BLACK'";
+  private static String[] dropColumns = {"_hoodie_commit_time", 
"_hoodie_commit_seqno",
+      "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"};
+
+  private Boolean shouldOverwrite;
+  Map<String, String> options;
+  @TempDir
+  public java.nio.file.Path basePath;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator();
+    shouldOverwrite = true;
+    options = getOptions();
+    Properties props = new Properties();
+    props.putAll(options);
+    try {
+      metaClient = HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath.toString(), props);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    metaClient = null;
+  }
+
+  /**
+   * Create two files, one should be excluded by colstats
+   */
+  @Test
+  public void testBaseFileOnly() {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    filesToCorrupt.forEach(TestMORColstats::corruptFile);
+    assertEquals(0, readMatchingRecords().except(batch1).count());
+    //Read without data skipping to show that it will fail
+    //Reading with data skipping succeeded so that means that data skipping is 
working and the corrupted
+    //file was not read
+    assertThrows(SparkException.class, () -> 
readMatchingRecords(false).count());
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatches() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * Then do a compaction
+   * Now you have two base files that match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDoCompaction() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file for each filegroup that contains exactly the same 
records as the base file
+   * Then schedule an async compaction
+   * Then add a log file so that both file groups match the condition
+   * The new log file is a member of a newer file slice
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAsyncCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(true, false,false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * both file groups must still be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * Then compact
+   * Only the first file group needs to be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * both filegroups should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, true);
+  }
+
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * Do Compaction
+   * Only 1 filegroup should be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBackCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, true);
+  }
+
+  /**
+   * Test where one filegroup doesn't match the condition, then update so both 
filegroups match
+   */
+  private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean 
shouldScheduleCompaction,
+                                                         Boolean 
shouldInlineCompact,
+                                                         Boolean shouldDelete,
+                                                         Boolean 
shouldRollback) {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    if (shouldScheduleCompaction) {
+      doWrite(inserts);
+      scheduleCompaction();
+    }
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    Dataset<Row> recordToUpdate = batch2.limit(1);
+    Dataset<Row> updatedRecord = makeRecordMatch(recordToUpdate);
+    doWrite(updatedRecord);
+    if (shouldRollback) {
+      deleteLatestDeltacommit();
+      enableInlineCompaction(shouldInlineCompact);
+      doWrite(recordToUpdate);
+      assertEquals(0, readMatchingRecords().except(batch1).count());
+    } else if (shouldDelete) {
+      enableInlineCompaction(shouldInlineCompact);
+      doDelete(updatedRecord);
+      assertEquals(0, readMatchingRecords().except(batch1).count());
+    } else {
+      assertEquals(0, 
readMatchingRecords().except(batch1.union(updatedRecord)).count());
+    }
+
+    if (shouldInlineCompact) {
+      filesToCorrupt = getFilesToCorrupt();
+      filesToCorrupt.forEach(TestMORColstats::corruptFile);
+      if (shouldDelete || shouldRollback) {
+        //filesToCorrupt includes all base files in the filegroup
+        assertEquals(2, filesToCorrupt.size());
+        assertEquals(0, readMatchingRecords().except(batch1).count());
+      } else {
+        enableInlineCompaction(true);
+        doWrite(updatedRecord);
+        assertEquals(0, filesToCorrupt.size());
+      }
+    } else {
+      //Corrupt to prove that colstats does not exclude filegroup
+      filesToCorrupt.forEach(TestMORColstats::corruptFile);
+      assertEquals(1, filesToCorrupt.size());
+      assertThrows(SparkException.class, () -> readMatchingRecords().count());

Review Comment:
   
https://github.com/apache/hudi/blob/ad1f7173631b7bb7ae0f413929f70420ef245d6e/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java#L57
   
   According to this config description, if inline compaction is not enabled, 
then it will not execute a scheduled compaction. Additionally we got the file 
to corrupt before the write on line 226. If a compaction did happen, a snapshot 
read would read from the latest base file, so an exception would not be thrown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to