This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 6ef00d147c1 [HUDI-5816] List all partitions as the fallback mechanism in Hive and Glue Sync (#8388) 6ef00d147c1 is described below commit 6ef00d147c12e41f71958299a6cb54eae1dce2f6 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Sun May 7 21:44:22 2023 -0700 [HUDI-5816] List all partitions as the fallback mechanism in Hive and Glue Sync (#8388) - Avoid loading archived timeline during Hive and Glue Sync. - Add the fallback mechanism in Hive and Glue catalog sync so that if the last commit time synced falls behind to be before the start of the active timeline of Hudi table, the sync gets all partition paths on storage and resolves the difference compared to what's in the metastore, instead of reading archived timeline. - Enhances the tests to cover the new logic. --- .../java/org/apache/hudi/common/fs/FSUtils.java | 3 +- .../java/org/apache/hudi/hive/HiveSyncTool.java | 112 +++++++++++++++------ .../org/apache/hudi/hive/TestHiveSyncTool.java | 77 +++++++++++++- .../apache/hudi/hive/testutils/HiveTestUtil.java | 58 +++++++++-- .../apache/hudi/sync/common/HoodieSyncClient.java | 106 ++++++++++++++++--- 5 files changed, 297 insertions(+), 59 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index b84298e80d8..a7a32ae527e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -235,7 +235,8 @@ public class FSUtils { String fullPartitionPathStr = fullPartitionPath.toString(); if (!fullPartitionPathStr.startsWith(basePath.toString())) { - throw new IllegalArgumentException("Partition path does not belong to base-path"); + throw new IllegalArgumentException("Partition path \"" + fullPartitionPathStr + + "\" does not belong to base-path \"" + basePath + "\""); } int partitionStartIndex = fullPartitionPathStr.indexOf(basePath.getName(), diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java index 95048dd8a71..5d35eeef87e 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java @@ -258,13 +258,28 @@ public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable { lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName); } LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null")); - List<String> writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced); - LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); - // Sync the partitions if needed - // find dropped partitions, if any, in the latest commit - Set<String> droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced); - boolean partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); + boolean partitionsChanged; + if (!lastCommitTimeSynced.isPresent() + || syncClient.getActiveTimeline().isBeforeTimelineStarts(lastCommitTimeSynced.get())) { + // If the last commit time synced is before the start of the active timeline, + // the Hive sync falls back to list all partitions on storage, instead of + // reading active and archived timelines for written partitions. + LOG.info("Sync all partitions given the last commit time synced is empty or " + + "before the start of the active timeline. Listing all partitions in " + + config.getString(META_SYNC_BASE_PATH) + + ", file system: " + config.getHadoopFileSystem()); + partitionsChanged = syncAllPartitions(tableName); + } else { + List<String> writtenPartitionsSince = syncClient.getWrittenPartitionsSince(lastCommitTimeSynced); + LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size()); + + // Sync the partitions if needed + // find dropped partitions, if any, in the latest commit + Set<String> droppedPartitions = syncClient.getDroppedPartitionsSince(lastCommitTimeSynced); + partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, droppedPartitions); + } + boolean meetSyncConditions = schemaChanged || partitionsChanged; if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) { syncClient.updateLastCommitTimeSynced(tableName); @@ -366,46 +381,83 @@ public class HiveSyncTool extends HoodieSyncTool implements AutoCloseable { PartitionFilterGenerator.generatePushDownFilter(writtenPartitions, partitionFields, config)); } + /** + * Syncs all partitions on storage to the metastore, by only making incremental changes. + * + * @param tableName The table name in the metastore. + * @return {@code true} if one or more partition(s) are changed in the metastore; + * {@code false} otherwise. + */ + private boolean syncAllPartitions(String tableName) { + try { + if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { + return false; + } + + List<Partition> allPartitionsInMetastore = syncClient.getAllPartitions(tableName); + List<String> allPartitionsOnStorage = syncClient.getAllPartitionPathsOnStorage(); + return syncPartitions( + tableName, + syncClient.getPartitionEvents(allPartitionsInMetastore, allPartitionsOnStorage)); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e); + } + } + /** * Syncs the list of storage partitions passed in (checks if the partition is in hive, if not adds it or if the * partition path does not match, it updates the partition path). * - * @param writtenPartitionsSince partitions has been added, updated, or dropped since last synced. + * @param tableName The table name in the metastore. + * @param writtenPartitionsSince Partitions has been added, updated, or dropped since last synced. + * @param droppedPartitions Partitions that are dropped since last sync. + * @return {@code true} if one or more partition(s) are changed in the metastore; + * {@code false} otherwise. */ private boolean syncPartitions(String tableName, List<String> writtenPartitionsSince, Set<String> droppedPartitions) { - boolean partitionsChanged; try { if (writtenPartitionsSince.isEmpty() || config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) { return false; } List<Partition> hivePartitions = getTablePartitions(tableName, writtenPartitionsSince); - List<PartitionEvent> partitionEvents = - syncClient.getPartitionEvents(hivePartitions, writtenPartitionsSince, droppedPartitions); - - List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD); - if (!newPartitions.isEmpty()) { - LOG.info("New Partitions " + newPartitions); - syncClient.addPartitionsToTable(tableName, newPartitions); - } + return syncPartitions( + tableName, + syncClient.getPartitionEvents( + hivePartitions, writtenPartitionsSince, droppedPartitions)); + } catch (Exception e) { + throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e); + } + } - List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE); - if (!updatePartitions.isEmpty()) { - LOG.info("Changed Partitions " + updatePartitions); - syncClient.updatePartitionsToTable(tableName, updatePartitions); - } + /** + * Syncs added, updated, and dropped partitions to the metastore. + * + * @param tableName The table name in the metastore. + * @param partitionEventList The partition change event list. + * @return {@code true} if one or more partition(s) are changed in the metastore; + * {@code false} otherwise. + */ + private boolean syncPartitions(String tableName, List<PartitionEvent> partitionEventList) { + List<String> newPartitions = filterPartitions(partitionEventList, PartitionEventType.ADD); + if (!newPartitions.isEmpty()) { + LOG.info("New Partitions " + newPartitions); + syncClient.addPartitionsToTable(tableName, newPartitions); + } - List<String> dropPartitions = filterPartitions(partitionEvents, PartitionEventType.DROP); - if (!dropPartitions.isEmpty()) { - LOG.info("Drop Partitions " + dropPartitions); - syncClient.dropPartitions(tableName, dropPartitions); - } + List<String> updatePartitions = filterPartitions(partitionEventList, PartitionEventType.UPDATE); + if (!updatePartitions.isEmpty()) { + LOG.info("Changed Partitions " + updatePartitions); + syncClient.updatePartitionsToTable(tableName, updatePartitions); + } - partitionsChanged = !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty(); - } catch (Exception e) { - throw new HoodieHiveSyncException("Failed to sync partitions for table " + tableName, e); + List<String> dropPartitions = filterPartitions(partitionEventList, PartitionEventType.DROP); + if (!dropPartitions.isEmpty()) { + LOG.info("Drop Partitions " + dropPartitions); + syncClient.dropPartitions(tableName, dropPartitions); } - return partitionsChanged; + + return !updatePartitions.isEmpty() || !newPartitions.isEmpty() || !dropPartitions.isEmpty(); } private List<String> filterPartitions(List<PartitionEvent> events, PartitionEventType eventType) { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java index d1aaf024403..9a8ada9b65b 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java @@ -18,6 +18,7 @@ package org.apache.hudi.hive; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieSyncTableStrategy; @@ -71,6 +72,8 @@ import java.util.Set; import java.util.stream.Collectors; import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; import static org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE; @@ -282,7 +285,8 @@ public class TestHiveSyncTool { // Manually change a hive partition location to check if the sync will detect // it and generate a partition update event for it. ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME - + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'"); + + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '" + + FSUtils.getPartitionPath(basePath, "2050/1/1").toString() + "'"); hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); List<String> writtenPartitionsSince = hiveClient.getWrittenPartitionsSince(Option.empty()); @@ -291,14 +295,66 @@ public class TestHiveSyncTool { assertEquals(PartitionEventType.UPDATE, partitionEvents.iterator().next().eventType, "The one partition event must of type UPDATE"); + // Add a partition that does not belong to the table, i.e., not in the same base path + // This should not happen in production. However, if this happens, when doing fallback + // to list all partitions in the metastore and we find such a partition, we simply ignore + // it without dropping it from the metastore and notify the user with an error message, + // so the user may manually fix it. + + String dummyBasePath = new Path(basePath).getParent().toString() + "/dummy_basepath"; + ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME + + "` ADD PARTITION (`datestr`='xyz') LOCATION '" + dummyBasePath + "/xyz'"); + // Lets do the sync reSyncHiveTable(); // Sync should update the changed partition to correct path List<Partition> tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); - assertEquals(7, tablePartitions.size(), "The one partition we wrote should be added to hive"); + assertEquals(8, tablePartitions.size(), "The two partitions we wrote should be added to hive"); assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(), "The last commit that was synced should be 100"); + + // Verify that there is one ADD, UPDATE, and DROP event for each type + hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + List<String> allPartitionPathsOnStorage = hiveClient.getAllPartitionPathsOnStorage() + .stream().sorted().collect(Collectors.toList()); + String dropPartition = allPartitionPathsOnStorage.remove(0); + allPartitionPathsOnStorage.add("2050/01/02"); + partitionEvents = hiveClient.getPartitionEvents(hivePartitions, allPartitionPathsOnStorage); + assertEquals(3, partitionEvents.size(), "There should be only one partition event"); + assertEquals( + "2050/01/02", + partitionEvents.stream().filter(e -> e.eventType == PartitionEventType.ADD) + .findFirst().get().storagePartition, + "There should be only one partition event of type ADD"); + assertEquals( + "2050/01/01", + partitionEvents.stream().filter(e -> e.eventType == PartitionEventType.UPDATE) + .findFirst().get().storagePartition, + "There should be only one partition event of type UPDATE"); + assertEquals( + dropPartition, + partitionEvents.stream().filter(e -> e.eventType == PartitionEventType.DROP) + .findFirst().get().storagePartition, + "There should be only one partition event of type DROP"); + + // Simulate the case where the last sync timestamp is before the start of the active timeline, + // by overwriting the same table with some partitions deleted and new partitions added + HiveTestUtil.createCOWTable("200", 6, useSchemaFromCommitMetadata); + reInitHiveSyncClient(); + reSyncHiveTable(); + tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(Option.of("200"), hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME)); + assertEquals(7, tablePartitions.size()); + + // Trigger the fallback of listing all partitions again. There is no partition change. + HiveTestUtil.commitToTable("300", 1, useSchemaFromCommitMetadata); + HiveTestUtil.removeCommitFromActiveTimeline("200", COMMIT_ACTION); + reInitHiveSyncClient(); + reSyncHiveTable(); + tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME); + assertEquals(Option.of("300"), hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME)); + assertEquals(7, tablePartitions.size()); } @ParameterizedTest @@ -1336,6 +1392,9 @@ public class TestHiveSyncTool { String commitTime0 = "100"; String commitTime1 = "101"; String commitTime2 = "102"; + String commitTime3 = "103"; + String commitTime4 = "104"; + String commitTime5 = "105"; HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true); reInitHiveSyncClient(); @@ -1344,10 +1403,22 @@ public class TestHiveSyncTool { assertTrue(hiveClient.tableExists(tableName)); assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get()); - HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime1, commitTime2); + HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime2, commitTime3); reSyncHiveTable(); assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get()); + + // Let the last commit time synced to be before the start of the active timeline, + // to trigger the fallback of listing all partitions. There is no partition change + // and the last commit time synced should still be the same. + HiveTestUtil.addMORPartitions(0, true, true, true, ZonedDateTime.now().plusDays(2), commitTime4, commitTime5); + HiveTestUtil.removeCommitFromActiveTimeline(commitTime0, COMMIT_ACTION); + HiveTestUtil.removeCommitFromActiveTimeline(commitTime1, DELTA_COMMIT_ACTION); + HiveTestUtil.removeCommitFromActiveTimeline(commitTime2, COMMIT_ACTION); + HiveTestUtil.removeCommitFromActiveTimeline(commitTime3, DELTA_COMMIT_ACTION); + reInitHiveSyncClient(); + reSyncHiveTable(); + assertEquals(commitTime1, hiveClient.getLastCommitTimeSynced(tableName).get()); } private void reSyncHiveTable() { diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java index 4e2943be8d5..ad61d6e0d30 100644 --- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java +++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java @@ -68,6 +68,8 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.zookeeper.server.ZooKeeperServer; import org.junit.platform.commons.JUnitException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -88,6 +90,7 @@ import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import static org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS; import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL; @@ -103,6 +106,7 @@ import static org.junit.jupiter.api.Assertions.fail; @SuppressWarnings("SameParameterValue") public class HiveTestUtil { + private static final Logger LOG = LoggerFactory.getLogger(HiveTestUtil.class); public static final String DB_NAME = "testdb"; public static final String TABLE_NAME = "test1"; @@ -190,22 +194,56 @@ public class HiveTestUtil { public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata, String basePath, String databaseName, String tableName) throws IOException, URISyntaxException { Path path = new Path(basePath); - FileIOUtils.deleteDirectory(new File(basePath)); + if (fileSystem.exists(path)) { + fileSystem.delete(path, true); + } HoodieTableMetaClient.withPropertyBuilder() - .setTableType(HoodieTableType.COPY_ON_WRITE) - .setTableName(tableName) - .setPayloadClass(HoodieAvroPayload.class) - .initTable(configuration, basePath); + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setPayloadClass(HoodieAvroPayload.class) + .initTable(configuration, basePath); boolean result = fileSystem.mkdirs(path); checkResult(result); + commitToTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, + basePath, databaseName, tableName); + } + + public static void commitToTable( + String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) + throws IOException, URISyntaxException { + commitToTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, + basePath, DB_NAME, TABLE_NAME); + } + + public static void commitToTable( + String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata, + String basePath, String databaseName, String tableName) throws IOException, URISyntaxException { ZonedDateTime dateTime = ZonedDateTime.now(); HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, - useSchemaFromCommitMetadata, dateTime, instantTime, basePath); + useSchemaFromCommitMetadata, dateTime, instantTime, basePath); createdTablesSet.add(databaseName + "." + tableName); createCommitFile(commitMetadata, instantTime, basePath); } + public static void removeCommitFromActiveTimeline(String instantTime, String actionType) { + List<Path> pathsToDelete = new ArrayList<>(); + Path metaFolderPath = new Path(basePath, METAFOLDER_NAME); + String actionSuffix = "." + actionType; + pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix)); + pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix + ".requested")); + pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix + ".inflight")); + pathsToDelete.forEach(path -> { + try { + if (fileSystem.exists(path)) { + fileSystem.delete(path, false); + } + } catch (IOException e) { + LOG.warn("Error deleting file: ", e); + } + }); + } + public static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata) throws IOException, URISyntaxException { createCOWTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata, basePath, DB_NAME, TABLE_NAME); @@ -481,7 +519,7 @@ public class HiveTestUtil { public static void createCommitFile(HoodieCommitMetadata commitMetadata, String instantTime, String basePath) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); @@ -490,7 +528,7 @@ public class HiveTestUtil { public static void createReplaceCommitFile(HoodieReplaceCommitMetadata commitMetadata, String instantTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" + HoodieTimeline.makeReplaceFileName(instantTime)); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); @@ -505,7 +543,7 @@ public class HiveTestUtil { private static void createCompactionCommitFile(HoodieCommitMetadata commitMetadata, String instantTime) throws IOException { byte[] bytes = commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" + HoodieTimeline.makeCommitFileName(instantTime)); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); @@ -515,7 +553,7 @@ public class HiveTestUtil { private static void createDeltaCommitFile(HoodieCommitMetadata deltaCommitMetadata, String deltaCommitTime) throws IOException { byte[] bytes = deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8); - Path fullPath = new Path(basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/" + HoodieTimeline.makeDeltaFileName(deltaCommitTime)); FSDataOutputStream fsout = fileSystem.create(fullPath, true); fsout.write(bytes); diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index 7f73be33070..b6f1d0bdfa2 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.sync.common.model.Partition; import org.apache.hudi.sync.common.model.PartitionEvent; import org.apache.hudi.sync.common.model.PartitionValueExtractor; @@ -112,16 +113,25 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto } } + /** + * Gets all relative partitions paths in the Hudi table on storage. + * + * @return All relative partitions paths. + */ + public List<String> getAllPartitionPathsOnStorage() { + HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); + return FSUtils.getAllPartitionPaths(engineContext, + config.getString(META_SYNC_BASE_PATH), + config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA), + config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); + } + public List<String> getWrittenPartitionsSince(Option<String> lastCommitTimeSynced) { if (!lastCommitTimeSynced.isPresent()) { LOG.info("Last commit time synced is not known, listing all partitions in " + config.getString(META_SYNC_BASE_PATH) + ",FS :" + config.getHadoopFileSystem()); - HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); - return FSUtils.getAllPartitionPaths(engineContext, - config.getString(META_SYNC_BASE_PATH), - config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA), - config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION)); + return getAllPartitionPathsOnStorage(); } else { LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then"); return TimelineUtils.getWrittenPartitions( @@ -129,27 +139,75 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto } } + /** + * Gets the partition events for changed partitions. + * <p> + * This compares the list of all partitions of a table stored in the metastore and + * on the storage: + * (1) Partitions exist in the metastore, but NOT the storage: drops them in the metastore; + * (2) Partitions exist on the storage, but NOT the metastore: adds them to the metastore; + * (3) Partitions exist in both, but the partition path is different: update them in the metastore. + * + * @param allPartitionsInMetastore All partitions of a table stored in the metastore. + * @param allPartitionsOnStorage All partitions of a table stored on the storage. + * @return partition events for changed partitions. + */ + public List<PartitionEvent> getPartitionEvents(List<Partition> allPartitionsInMetastore, + List<String> allPartitionsOnStorage) { + Map<String, String> paths = getPartitionValuesToPathMapping(allPartitionsInMetastore); + Set<String> partitionsToDrop = new HashSet<>(paths.keySet()); + + List<PartitionEvent> events = new ArrayList<>(); + for (String storagePartition : allPartitionsOnStorage) { + Path storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), storagePartition); + String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); + // Check if the partition values or if hdfs path is the same + List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); + + if (!storagePartitionValues.isEmpty()) { + String storageValue = String.join(", ", storagePartitionValues); + // Remove partitions that exist on storage from the `partitionsToDrop` set, + // so the remaining partitions that exist in the metastore should be dropped + partitionsToDrop.remove(storageValue); + if (!paths.containsKey(storageValue)) { + events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); + } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { + events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); + } + } + } + + partitionsToDrop.forEach(storageValue -> { + String storagePath = paths.get(storageValue); + try { + String relativePath = FSUtils.getRelativePartitionPath( + metaClient.getBasePathV2(), new CachingPath(storagePath)); + events.add(PartitionEvent.newPartitionDropEvent(relativePath)); + } catch (IllegalArgumentException e) { + LOG.error("Cannot parse the path stored in the metastore, ignoring it for " + + "generating DROP partition event: \"" + storagePath + "\".", e); + } + }); + return events; + } + /** * Iterate over the storage partitions and find if there are any new partitions that need to be added or updated. * Generate a list of PartitionEvent based on the changes required. */ - public List<PartitionEvent> getPartitionEvents(List<Partition> tablePartitions, List<String> partitionStoragePartitions, Set<String> droppedPartitions) { - Map<String, String> paths = new HashMap<>(); - for (Partition tablePartition : tablePartitions) { - List<String> hivePartitionValues = tablePartition.getValues(); - String fullTablePartitionPath = - Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getStorageLocation())).toUri().getPath(); - paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); - } + public List<PartitionEvent> getPartitionEvents(List<Partition> partitionsInMetastore, + List<String> writtenPartitionsOnStorage, + Set<String> droppedPartitionsOnStorage) { + Map<String, String> paths = getPartitionValuesToPathMapping(partitionsInMetastore); List<PartitionEvent> events = new ArrayList<>(); - for (String storagePartition : partitionStoragePartitions) { + for (String storagePartition : writtenPartitionsOnStorage) { Path storagePartitionPath = FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), storagePartition); String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); // Check if the partition values or if hdfs path is the same List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); - if (droppedPartitions.contains(storagePartition)) { + if (droppedPartitionsOnStorage.contains(storagePartition)) { events.add(PartitionEvent.newPartitionDropEvent(storagePartition)); } else { if (!storagePartitionValues.isEmpty()) { @@ -164,4 +222,22 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto } return events; } + + /** + * Gets the partition values to the absolute path mapping based on the + * partition information from the metastore. + * + * @param partitionsInMetastore Partitions in the metastore. + * @return The partition values to the absolute path mapping. + */ + private Map<String, String> getPartitionValuesToPathMapping(List<Partition> partitionsInMetastore) { + Map<String, String> paths = new HashMap<>(); + for (Partition tablePartition : partitionsInMetastore) { + List<String> hivePartitionValues = tablePartition.getValues(); + String fullTablePartitionPath = + Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getStorageLocation())).toUri().getPath(); + paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath); + } + return paths; + } }