lokeshj1703 commented on code in PR #9422:
URL: https://github.com/apache/hudi/pull/9422#discussion_r1296104426


##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMORColstats.java:
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+
+import org.apache.spark.SparkException;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
+import static org.apache.spark.sql.SaveMode.Append;
+import static org.apache.spark.sql.SaveMode.Overwrite;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test mor with colstats enabled in scenarios to ensure that files
+ * are being appropriately read or not read.
+ * The strategy employed is to corrupt targeted base files. If we want
+ * to prove the file is read, we assert that an exception will be thrown.
+ * If we want to prove the file is not read, we expect the read to
+ * successfully execute.
+ */
+public class TestMORColstats extends HoodieSparkClientTestBase {
+
+  private static String matchCond = "trip_type = 'UBERX'";
+  private static String nonMatchCond = "trip_type = 'BLACK'";
+  private static String[] dropColumns = {"_hoodie_commit_time", 
"_hoodie_commit_seqno",
+      "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"};
+
+  private Boolean shouldOverwrite;
+  Map<String, String> options;
+  @TempDir
+  public java.nio.file.Path basePath;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initSparkContexts();
+    dataGen = new HoodieTestDataGenerator();
+    shouldOverwrite = true;
+    options = getOptions();
+    Properties props = new Properties();
+    props.putAll(options);
+    try {
+      metaClient = HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, 
basePath.toString(), props);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupSparkContexts();
+    cleanupTestDataGenerator();
+    metaClient = null;
+  }
+
+  /**
+   * Create two files, one should be excluded by colstats
+   */
+  @Test
+  public void testBaseFileOnly() {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    filesToCorrupt.forEach(TestMORColstats::corruptFile);
+    assertEquals(0, readMatchingRecords().except(batch1).count());
+    //No options so data skipping is false
+    assertThrows(SparkException.class, () -> 
readMatchingRecords(false).count());
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatches() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file for each filegroup that contains exactly the same 
records as the base file
+   * Then schedule an async compaction
+   * Then add a log file so that both file groups match the condition
+   * The new log file is a member of a newer file slice
+   * both file groups must be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAsyncCompact() {
+    testBaseFileAndLogFileUpdateMatchesHelper(true, false, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then add a delete for that record so that the file group no longer 
matches the condition
+   * both file groups must still be read
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, true, false);
+  }
+
+  /**
+   * Create two base files, One base file doesn't match the condition
+   * Then add a log file so that both file groups match the condition
+   * Then delete the deltacommit and write the original value for the
+   *    record so that a rollback is triggered and the file group no
+   *    longer matches the condition
+   * Rollback should still happen because
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateMatchesAndRollBack() {
+    testBaseFileAndLogFileUpdateMatchesHelper(false, false, true);
+  }
+  
+  private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean 
shouldAsyncCompact,
+                                                         Boolean shouldDelete,
+                                                         Boolean 
shouldRollback) {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    doWrite(batch1);
+    doWrite(batch2);
+    if (shouldAsyncCompact) {
+      doWrite(inserts);
+      scheduleAsyncCompaction();
+    }
+    List<Path> filesToCorrupt = getFilesToCorrupt();
+    assertEquals(1, filesToCorrupt.size());
+    Dataset<Row> recordToUpdate = batch2.limit(1);
+    Dataset<Row> updatedRecord = makeRecordMatch(recordToUpdate);
+    doWrite(updatedRecord);
+    if (shouldRollback) {
+      deleteLatestDeltacommit();
+      doWrite(recordToUpdate);
+      assertEquals(0, readMatchingRecords().except(batch1).count());
+    } else if (shouldDelete) {
+      doDelete(updatedRecord);
+      assertEquals(0, readMatchingRecords().except(batch1).count());
+    } else {
+      assertEquals(0, 
readMatchingRecords().except(batch1.union(updatedRecord)).count());
+    }
+
+    //Corrupt to prove that colstats does not exclude filegroup
+    filesToCorrupt.forEach(TestMORColstats::corruptFile);
+    assertThrows(SparkException.class, () -> readMatchingRecords().count());
+  }
+
+  /**
+   * Create two base files, One base file all records match the condition.
+   * The other base file has one record that matches the condition.
+   * Then add a log file that makes that one matching record not match anymore.
+   * both file groups must be read even though no records from the second file 
slice
+   * will pass the condition after mor merging
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateUnmatches() {
+    testBaseFileAndLogFileUpdateUnmatchesHelper(false);
+  }
+
+  /**
+   * Create two base files, One base file all records match the condition.
+   * The other base file has one record that matches the condition.
+   * Then add a log file for each filegroup that contains exactly the same 
records as the base file
+   * Then schedule an async compaction
+   * Then add a log file that makes that one matching record not match anymore.
+   * The new log file is a member of a newer file slice
+   * both file groups must be read even though no records from the second file 
slice
+   * will pass the condition after mor merging
+   */
+  @Test
+  public void testBaseFileAndLogFileUpdateUnmatchesAsyncCompact() {
+    testBaseFileAndLogFileUpdateUnmatchesHelper(true);
+  }
+
+  private void testBaseFileAndLogFileUpdateUnmatchesHelper(Boolean 
shouldAsyncCompact) {
+    Dataset<Row> inserts = makeInsertDf("000", 100);
+    Dataset<Row> batch1 = inserts.where(matchCond);
+    doWrite(batch1);
+    //no matches in batch2
+    Dataset<Row> batch2 = inserts.where(nonMatchCond);
+    //make 1 record match
+    Dataset<Row> recordToMod = batch2.limit(1);
+    Dataset<Row> initialRecordToMod = makeRecordMatch(recordToMod);
+    Dataset<Row> modBatch2 = removeRecord(batch2, 
recordToMod).union(initialRecordToMod);
+    doWrite(modBatch2);
+    if (shouldAsyncCompact) {
+      doWrite(batch1.union(modBatch2));
+      scheduleAsyncCompaction();
+    }
+
+    //update batch2 so no matching records in the filegroup
+    doWrite(recordToMod);

Review Comment:
   It would be good to add an assertion that compaction was actually completed 
at this point. You can refer 
`org.apache.hudi.functional.RecordLevelIndexTestBase#getLatestCompactionInstant`
 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to