This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 256957a689e088dcb1b54ced68b742e3aa4221ae Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Sat Aug 26 14:01:02 2023 -0400 [HUDI-6681] Ensure MOR Column Stats Index skips reading filegroups correctly (#9422) - Create tests for MOR col stats index to ensure that filegroups are read as expected Co-authored-by: Jonathan Vexler <=> --- .../TestDataSkippingWithMORColstats.java | 483 +++++++++++++++++++++ 1 file changed, 483 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java new file mode 100644 index 00000000000..64d6c31c2fa --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestDataSkippingWithMORColstats.java @@ -0,0 +1,483 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.testutils.HoodieSparkClientTestBase; + +import org.apache.spark.SparkException; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; +import static org.apache.hudi.config.HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS; +import static org.apache.spark.sql.SaveMode.Append; +import static org.apache.spark.sql.SaveMode.Overwrite; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Test mor with colstats enabled in scenarios to ensure that files + * are being appropriately read or not read. + * The strategy employed is to corrupt targeted base files. If we want + * to prove the file is read, we assert that an exception will be thrown. + * If we want to prove the file is not read, we expect the read to + * successfully execute. + */ +public class TestDataSkippingWithMORColstats extends HoodieSparkClientTestBase { + + private static String matchCond = "trip_type = 'UBERX'"; + private static String nonMatchCond = "trip_type = 'BLACK'"; + private static String[] dropColumns = {"_hoodie_commit_time", "_hoodie_commit_seqno", + "_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name"}; + + private Boolean shouldOverwrite; + Map<String, String> options; + @TempDir + public java.nio.file.Path basePath; + + @BeforeEach + public void setUp() throws Exception { + initSparkContexts(); + dataGen = new HoodieTestDataGenerator(); + shouldOverwrite = true; + options = getOptions(); + Properties props = new Properties(); + props.putAll(options); + try { + metaClient = HoodieTableMetaClient.initTableAndGetMetaClient(hadoopConf, basePath.toString(), props); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @AfterEach + public void tearDown() throws IOException { + cleanupSparkContexts(); + cleanupTestDataGenerator(); + metaClient = null; + } + + /** + * Create two files, one should be excluded by colstats + */ + @Test + public void testBaseFileOnly() { + Dataset<Row> inserts = makeInsertDf("000", 100); + Dataset<Row> batch1 = inserts.where(matchCond); + Dataset<Row> batch2 = inserts.where(nonMatchCond); + doWrite(batch1); + doWrite(batch2); + List<Path> filesToCorrupt = getFilesToCorrupt(); + assertEquals(1, filesToCorrupt.size()); + filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile); + assertEquals(0, readMatchingRecords().except(batch1).count()); + //Read without data skipping to show that it will fail + //Reading with data skipping succeeded so that means that data skipping is working and the corrupted + //file was not read + assertThrows(SparkException.class, () -> readMatchingRecords(false).count()); + } + + /** + * Create two base files, One base file doesn't match the condition + * Then add a log file so that both file groups match + * both file groups must be read + */ + @Test + public void testBaseFileAndLogFileUpdateMatches() { + testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, false); + } + + /** + * Create two base files, One base file doesn't match the condition + * Then add a log file so that both file groups match + * Then do a compaction + * Now you have two base files that match + * both file groups must be read + */ + @Test + public void testBaseFileAndLogFileUpdateMatchesDoCompaction() { + testBaseFileAndLogFileUpdateMatchesHelper(false, true,false, false); + } + + /** + * Create two base files, One base file doesn't match the condition + * Then add a log file for each filegroup that contains exactly the same records as the base file + * Then schedule an async compaction + * Then add a log file so that both file groups match the condition + * The new log file is a member of a newer file slice + * both file groups must be read + */ + @Test + public void testBaseFileAndLogFileUpdateMatchesScheduleCompaction() { + testBaseFileAndLogFileUpdateMatchesHelper(true, false,false, false); + } + + /** + * Create two base files, One base file doesn't match the condition + * Then add a log file so that both file groups match the condition + * Then add a delete for that record so that the file group no longer matches the condition + * both file groups must still be read + */ + @Test + public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() { + testBaseFileAndLogFileUpdateMatchesHelper(false, false,true, false); + } + + /** + * Create two base files, One base file doesn't match the condition + * Then add a log file so that both file groups match the condition + * Then add a delete for that record so that the file group no longer matches the condition + * Then compact + * Only the first file group needs to be read + */ + @Test + public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() { + testBaseFileAndLogFileUpdateMatchesHelper(false, true,true, false); + } + + /** + * Create two base files, One base file doesn't match the condition + * Then add a log file so that both file groups match the condition + * Then delete the deltacommit and write the original value for the + * record so that a rollback is triggered and the file group no + * longer matches the condition + * both filegroups should be read + */ + @Test + public void testBaseFileAndLogFileUpdateMatchesAndRollBack() { + testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, true); + } + + /** + * Test where one filegroup doesn't match the condition, then update so both filegroups match + */ + private void testBaseFileAndLogFileUpdateMatchesHelper(Boolean shouldScheduleCompaction, + Boolean shouldInlineCompact, + Boolean shouldDelete, + Boolean shouldRollback) { + Dataset<Row> inserts = makeInsertDf("000", 100); + Dataset<Row> batch1 = inserts.where(matchCond); + Dataset<Row> batch2 = inserts.where(nonMatchCond); + doWrite(batch1); + doWrite(batch2); + if (shouldScheduleCompaction) { + doWrite(inserts); + scheduleCompaction(); + } + List<Path> filesToCorrupt = getFilesToCorrupt(); + assertEquals(1, filesToCorrupt.size()); + Dataset<Row> recordToUpdate = batch2.limit(1); + Dataset<Row> updatedRecord = makeRecordMatch(recordToUpdate); + doWrite(updatedRecord); + if (shouldRollback) { + deleteLatestDeltacommit(); + enableInlineCompaction(shouldInlineCompact); + doWrite(recordToUpdate); + assertEquals(0, readMatchingRecords().except(batch1).count()); + } else if (shouldDelete) { + enableInlineCompaction(shouldInlineCompact); + doDelete(updatedRecord); + assertEquals(0, readMatchingRecords().except(batch1).count()); + } else { + assertEquals(0, readMatchingRecords().except(batch1.union(updatedRecord)).count()); + } + + if (shouldInlineCompact) { + filesToCorrupt = getFilesToCorrupt(); + filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile); + if (shouldDelete || shouldRollback) { + assertEquals(1, filesToCorrupt.size()); + assertEquals(0, readMatchingRecords().except(batch1).count()); + } else { + enableInlineCompaction(true); + doWrite(updatedRecord); + assertEquals(0, filesToCorrupt.size()); + } + } else { + //Corrupt to prove that colstats does not exclude filegroup + filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile); + assertEquals(1, filesToCorrupt.size()); + assertThrows(SparkException.class, () -> readMatchingRecords().count()); + } + } + + /** + * Create two base files, One base file all records match the condition. + * The other base file has one record that matches the condition. + * Then add a log file that makes that one matching record not match anymore. + * both file groups must be read even though no records from the second file slice + * will pass the condition after mor merging + */ + @Test + public void testBaseFileAndLogFileUpdateUnmatches() { + testBaseFileAndLogFileUpdateUnmatchesHelper(false); + } + + /** + * Create two base files, One base file all records match the condition. + * The other base file has one record that matches the condition. + * Then add a log file for each filegroup that contains exactly the same records as the base file + * Then schedule a compaction + * Then add a log file that makes that one matching record not match anymore. + * The new log file is a member of a newer file slice + * both file groups must be read even though no records from the second file slice + * will pass the condition after mor merging + */ + @Test + public void testBaseFileAndLogFileUpdateUnmatchesScheduleCompaction() { + testBaseFileAndLogFileUpdateUnmatchesHelper(true); + } + + /** + * Test where one filegroup all records match the condition and the other has only a single record that matches + * an update is added that makes the second filegroup no longer match + * Dataskipping should not exclude the second filegroup + */ + private void testBaseFileAndLogFileUpdateUnmatchesHelper(Boolean shouldScheduleCompaction) { + Dataset<Row> inserts = makeInsertDf("000", 100); + Dataset<Row> batch1 = inserts.where(matchCond); + doWrite(batch1); + //no matches in batch2 + Dataset<Row> batch2 = inserts.where(nonMatchCond); + //make 1 record match + Dataset<Row> recordToMod = batch2.limit(1); + Dataset<Row> initialRecordToMod = makeRecordMatch(recordToMod); + Dataset<Row> modBatch2 = removeRecord(batch2, recordToMod).union(initialRecordToMod); + doWrite(modBatch2); + if (shouldScheduleCompaction) { + doWrite(batch1.union(modBatch2)); + scheduleCompaction(); + } + + //update batch2 so no matching records in the filegroup + doWrite(recordToMod); + assertEquals(0, readMatchingRecords().except(batch1).count()); + + //Corrupt to prove that colstats does not exclude filegroup + List<Path> filesToCorrupt = getFilesToCorrupt(); + assertEquals(1, filesToCorrupt.size()); + filesToCorrupt.forEach(TestDataSkippingWithMORColstats::corruptFile); + assertThrows(SparkException.class, () -> readMatchingRecords().count()); + } + + private Map<String, String> getOptions() { + Map<String, String> options = new HashMap<>(); + options.put(HoodieMetadataConfig.ENABLE.key(), "true"); + options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); + options.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key(), "trip_type"); + options.put(DataSourceReadOptions.ENABLE_DATA_SKIPPING().key(), "true"); + options.put(DataSourceWriteOptions.TABLE_TYPE().key(), DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL()); + options.put(HoodieWriteConfig.TBL_NAME.key(), "testTable"); + options.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + options.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + options.put("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator"); + options.put(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"); + options.put(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(), "false"); + options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"); + return options; + } + + private void scheduleCompaction() { + HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(basePath.toString()) + .withRollbackUsingMarkers(false) + .withAutoCommit(false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withMetadataIndexColumnStats(true) + .withColumnStatsIndexForColumns("trip_type").build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(0) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build()) + .forTable("testTable") + .withKeyGenerator("org.apache.hudi.keygen.NonpartitionedKeyGenerator") + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + client.scheduleCompactionAtInstant(HoodieActiveTimeline.createNewInstantTime(), Option.empty()); + } + } + + /** + * remove recordToRemove from batch + * recordToRemove is expected to only have 1 row + */ + private Dataset<Row> removeRecord(Dataset<Row> batch, Dataset<Row> recordToRemove) { + return batch.where("_row_key != '" + recordToRemove.first().getString(1) + "'"); + } + + /** + * Returns a list of the base parquet files for the latest fileslice in it's filegroup where + * no records match the condition + */ + private List<Path> getFilesToCorrupt() { + Set<String> fileNames = new HashSet<>(); + sparkSession.read().format("hudi").load(basePath.toString()) + .where(matchCond) + .select("_hoodie_file_name").distinct() + .collectAsList().forEach(row -> { + String fileName = row.getString(0); + if (fileName.contains(".parquet")) { + fileNames.add(FSUtils.getFileId(fileName)); + } else { + fileNames.add(fileName); + } + }); + + try (Stream<Path> stream = Files.list(basePath)) { + Map<String,Path> latestBaseFiles = new HashMap<>(); + List<Path> files = stream + .filter(file -> !Files.isDirectory(file)) + .filter(file -> file.toString().contains(".parquet")) + .filter(file -> !file.toString().contains(".crc")) + .filter(file -> !fileNames.contains(FSUtils.getFileId(file.getFileName().toString()))) + .collect(Collectors.toList()); + files.forEach(f -> { + String fileID = FSUtils.getFileId(f.getFileName().toString()); + if (!latestBaseFiles.containsKey(fileID) || FSUtils.getCommitTime(f.getFileName().toString()) + .compareTo(FSUtils.getCommitTime(latestBaseFiles.get(fileID).getFileName().toString())) > 0) { + latestBaseFiles.put(fileID, f); + } + }); + return new ArrayList<>(latestBaseFiles.values()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void doWrite(Dataset<Row> df) { + if (shouldOverwrite) { + shouldOverwrite = false; + df.write().format("hudi").options(options).mode(Overwrite).save(basePath.toString()); + } else { + df.write().format("hudi").options(options).mode(Append).save(basePath.toString()); + } + } + + private void doDelete(Dataset<Row> df) { + df.write().format("hudi").options(options).option(DataSourceWriteOptions.OPERATION().key(), + DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL()).mode(Append).save(basePath.toString()); + } + + /** + * update rowToMod to make it match the condition. + * rowToMod is expected to only have 1 row + */ + private Dataset<Row> makeRecordMatch(Dataset<Row> rowToMod) { + return updateTripType(rowToMod, "UBERX"); + } + + private Dataset<Row> updateTripType(Dataset<Row> rowToMod, String value) { + rowToMod.createOrReplaceTempView("rowToMod"); + return sparkSession.sqlContext().createDataFrame(sparkSession.sql("select _hoodie_is_deleted, _row_key, " + + "begin_lat, begin_lon, current_date, current_ts, distance_in_meters, driver, end_lat, end_lon, fare, height, " + + "nation, partition, partition_path, rider, seconds_since_epoch, timestamp, tip_history, '" + value + + "' as trip_type, weight from rowToMod").rdd(), rowToMod.schema()); + } + + /** + * Read records from Hudi that match the condition + * and drop the meta cols + */ + private Dataset<Row> readMatchingRecords() { + return readMatchingRecords(true); + } + + public Dataset<Row> readMatchingRecords(Boolean useDataSkipping) { + if (useDataSkipping) { + return sparkSession.read().format("hudi").options(options) + .load(basePath.toString()).where(matchCond).drop(dropColumns); + } else { + return sparkSession.read().format("hudi") + .option(DataSourceReadOptions.ENABLE_DATA_SKIPPING().key(), "false") + .load(basePath.toString()).where(matchCond).drop(dropColumns); + } + } + + /** + * Corrupt a parquet file by deleting it and replacing + * it with an empty file + */ + protected static void corruptFile(Path path) { + File fileToCorrupt = path.toFile(); + fileToCorrupt.delete(); + try { + fileToCorrupt.createNewFile(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + protected Dataset<Row> makeInsertDf(String instantTime, Integer n) { + List<String> records = dataGen.generateInserts(instantTime, n).stream() + .map(r -> recordToString(r).get()).collect(Collectors.toList()); + JavaRDD<String> rdd = jsc.parallelize(records); + //cant do df.except with city_to_state and our testing is for the + //col stats index so it is ok to just drop this here + return sparkSession.read().json(rdd).drop("city_to_state"); + } + + public void deleteLatestDeltacommit() { + String filename = metaClient.getActiveTimeline().lastInstant().get().getFileName(); + File deltacommit = new File(metaClient.getBasePathV2() + "/.hoodie/" + filename); + deltacommit.delete(); + } + + /** + * Need to enable inline compaction before final write. We need to do this + * before the final write instead of setting a num delta commits number + * because in the case of rollback, we do 3 updates and then rollback + * and do an update, but we only want to compact the second time + * we have 3 + */ + public void enableInlineCompaction(Boolean shouldEnable) { + if (shouldEnable) { + this.options.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + this.options.put(INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1"); + } + } +}