This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef00d147c1 [HUDI-5816] List all partitions as the fallback mechanism 
in Hive and Glue Sync (#8388)
6ef00d147c1 is described below

commit 6ef00d147c12e41f71958299a6cb54eae1dce2f6
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Sun May 7 21:44:22 2023 -0700

    [HUDI-5816] List all partitions as the fallback mechanism in Hive and Glue 
Sync (#8388)
    
    - Avoid loading archived timeline during Hive and Glue Sync.
    - Add the fallback mechanism in Hive and Glue catalog sync
       so that if the last commit time synced falls behind to be before
       the start of the active timeline of Hudi table, the sync gets all
       partition paths on storage and resolves the difference compared
       to what's in the metastore, instead of reading archived timeline.
    - Enhances the tests to cover the new logic.
---
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   3 +-
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 112 +++++++++++++++------
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |  77 +++++++++++++-
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |  58 +++++++++--
 .../apache/hudi/sync/common/HoodieSyncClient.java  | 106 ++++++++++++++++---
 5 files changed, 297 insertions(+), 59 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index b84298e80d8..a7a32ae527e 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -235,7 +235,8 @@ public class FSUtils {
     String fullPartitionPathStr = fullPartitionPath.toString();
 
     if (!fullPartitionPathStr.startsWith(basePath.toString())) {
-      throw new IllegalArgumentException("Partition path does not belong to 
base-path");
+      throw new IllegalArgumentException("Partition path \"" + 
fullPartitionPathStr
+          + "\" does not belong to base-path \"" + basePath + "\"");
     }
 
     int partitionStartIndex = fullPartitionPathStr.indexOf(basePath.getName(),
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 95048dd8a71..5d35eeef87e 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -258,13 +258,28 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
       lastCommitTimeSynced = syncClient.getLastCommitTimeSynced(tableName);
     }
     LOG.info("Last commit time synced was found to be " + 
lastCommitTimeSynced.orElse("null"));
-    List<String> writtenPartitionsSince = 
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
-    LOG.info("Storage partitions scan complete. Found " + 
writtenPartitionsSince.size());
 
-    // Sync the partitions if needed
-    // find dropped partitions, if any, in the latest commit
-    Set<String> droppedPartitions = 
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
-    boolean partitionsChanged = syncPartitions(tableName, 
writtenPartitionsSince, droppedPartitions);
+    boolean partitionsChanged;
+    if (!lastCommitTimeSynced.isPresent()
+        || 
syncClient.getActiveTimeline().isBeforeTimelineStarts(lastCommitTimeSynced.get()))
 {
+      // If the last commit time synced is before the start of the active 
timeline,
+      // the Hive sync falls back to list all partitions on storage, instead of
+      // reading active and archived timelines for written partitions.
+      LOG.info("Sync all partitions given the last commit time synced is empty 
or "
+          + "before the start of the active timeline. Listing all partitions 
in "
+          + config.getString(META_SYNC_BASE_PATH)
+          + ", file system: " + config.getHadoopFileSystem());
+      partitionsChanged = syncAllPartitions(tableName);
+    } else {
+      List<String> writtenPartitionsSince = 
syncClient.getWrittenPartitionsSince(lastCommitTimeSynced);
+      LOG.info("Storage partitions scan complete. Found " + 
writtenPartitionsSince.size());
+
+      // Sync the partitions if needed
+      // find dropped partitions, if any, in the latest commit
+      Set<String> droppedPartitions = 
syncClient.getDroppedPartitionsSince(lastCommitTimeSynced);
+      partitionsChanged = syncPartitions(tableName, writtenPartitionsSince, 
droppedPartitions);
+    }
+
     boolean meetSyncConditions = schemaChanged || partitionsChanged;
     if (!config.getBoolean(META_SYNC_CONDITIONAL_SYNC) || meetSyncConditions) {
       syncClient.updateLastCommitTimeSynced(tableName);
@@ -366,46 +381,83 @@ public class HiveSyncTool extends HoodieSyncTool 
implements AutoCloseable {
         PartitionFilterGenerator.generatePushDownFilter(writtenPartitions, 
partitionFields, config));
   }
 
+  /**
+   * Syncs all partitions on storage to the metastore, by only making 
incremental changes.
+   *
+   * @param tableName The table name in the metastore.
+   * @return {@code true} if one or more partition(s) are changed in the 
metastore;
+   * {@code false} otherwise.
+   */
+  private boolean syncAllPartitions(String tableName) {
+    try {
+      if (config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
+        return false;
+      }
+
+      List<Partition> allPartitionsInMetastore = 
syncClient.getAllPartitions(tableName);
+      List<String> allPartitionsOnStorage = 
syncClient.getAllPartitionPathsOnStorage();
+      return syncPartitions(
+          tableName,
+          syncClient.getPartitionEvents(allPartitionsInMetastore, 
allPartitionsOnStorage));
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to sync partitions for table " 
+ tableName, e);
+    }
+  }
+
   /**
    * Syncs the list of storage partitions passed in (checks if the partition 
is in hive, if not adds it or if the
    * partition path does not match, it updates the partition path).
    *
-   * @param writtenPartitionsSince partitions has been added, updated, or 
dropped since last synced.
+   * @param tableName              The table name in the metastore.
+   * @param writtenPartitionsSince Partitions has been added, updated, or 
dropped since last synced.
+   * @param droppedPartitions      Partitions that are dropped since last sync.
+   * @return {@code true} if one or more partition(s) are changed in the 
metastore;
+   * {@code false} otherwise.
    */
   private boolean syncPartitions(String tableName, List<String> 
writtenPartitionsSince, Set<String> droppedPartitions) {
-    boolean partitionsChanged;
     try {
       if (writtenPartitionsSince.isEmpty() || 
config.getSplitStrings(META_SYNC_PARTITION_FIELDS).isEmpty()) {
         return false;
       }
 
       List<Partition> hivePartitions = getTablePartitions(tableName, 
writtenPartitionsSince);
-      List<PartitionEvent> partitionEvents =
-          syncClient.getPartitionEvents(hivePartitions, 
writtenPartitionsSince, droppedPartitions);
-
-      List<String> newPartitions = filterPartitions(partitionEvents, 
PartitionEventType.ADD);
-      if (!newPartitions.isEmpty()) {
-        LOG.info("New Partitions " + newPartitions);
-        syncClient.addPartitionsToTable(tableName, newPartitions);
-      }
+      return syncPartitions(
+          tableName,
+          syncClient.getPartitionEvents(
+              hivePartitions, writtenPartitionsSince, droppedPartitions));
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to sync partitions for table " 
+ tableName, e);
+    }
+  }
 
-      List<String> updatePartitions = filterPartitions(partitionEvents, 
PartitionEventType.UPDATE);
-      if (!updatePartitions.isEmpty()) {
-        LOG.info("Changed Partitions " + updatePartitions);
-        syncClient.updatePartitionsToTable(tableName, updatePartitions);
-      }
+  /**
+   * Syncs added, updated, and dropped partitions to the metastore.
+   *
+   * @param tableName          The table name in the metastore.
+   * @param partitionEventList The partition change event list.
+   * @return {@code true} if one or more partition(s) are changed in the 
metastore;
+   * {@code false} otherwise.
+   */
+  private boolean syncPartitions(String tableName, List<PartitionEvent> 
partitionEventList) {
+    List<String> newPartitions = filterPartitions(partitionEventList, 
PartitionEventType.ADD);
+    if (!newPartitions.isEmpty()) {
+      LOG.info("New Partitions " + newPartitions);
+      syncClient.addPartitionsToTable(tableName, newPartitions);
+    }
 
-      List<String> dropPartitions = filterPartitions(partitionEvents, 
PartitionEventType.DROP);
-      if (!dropPartitions.isEmpty()) {
-        LOG.info("Drop Partitions " + dropPartitions);
-        syncClient.dropPartitions(tableName, dropPartitions);
-      }
+    List<String> updatePartitions = filterPartitions(partitionEventList, 
PartitionEventType.UPDATE);
+    if (!updatePartitions.isEmpty()) {
+      LOG.info("Changed Partitions " + updatePartitions);
+      syncClient.updatePartitionsToTable(tableName, updatePartitions);
+    }
 
-      partitionsChanged = !updatePartitions.isEmpty() || 
!newPartitions.isEmpty() || !dropPartitions.isEmpty();
-    } catch (Exception e) {
-      throw new HoodieHiveSyncException("Failed to sync partitions for table " 
+ tableName, e);
+    List<String> dropPartitions = filterPartitions(partitionEventList, 
PartitionEventType.DROP);
+    if (!dropPartitions.isEmpty()) {
+      LOG.info("Drop Partitions " + dropPartitions);
+      syncClient.dropPartitions(tableName, dropPartitions);
     }
-    return partitionsChanged;
+
+    return !updatePartitions.isEmpty() || !newPartitions.isEmpty() || 
!dropPartitions.isEmpty();
   }
 
   private List<String> filterPartitions(List<PartitionEvent> events, 
PartitionEventType eventType) {
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index d1aaf024403..9a8ada9b65b 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieSyncTableStrategy;
@@ -71,6 +72,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.hive.HiveSyncConfig.HIVE_SYNC_FILTER_PUSHDOWN_ENABLED;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_AUTO_CREATE_DATABASE;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE;
@@ -282,7 +285,8 @@ public class TestHiveSyncTool {
     // Manually change a hive partition location to check if the sync will 
detect
     // it and generate a partition update event for it.
     ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
-        + "` PARTITION (`datestr`='2050-01-01') SET LOCATION 
'/some/new/location'");
+        + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '"
+        + FSUtils.getPartitionPath(basePath, "2050/1/1").toString() + "'");
 
     hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
     List<String> writtenPartitionsSince = 
hiveClient.getWrittenPartitionsSince(Option.empty());
@@ -291,14 +295,66 @@ public class TestHiveSyncTool {
     assertEquals(PartitionEventType.UPDATE, 
partitionEvents.iterator().next().eventType,
         "The one partition event must of type UPDATE");
 
+    // Add a partition that does not belong to the table, i.e., not in the 
same base path
+    // This should not happen in production.  However, if this happens, when 
doing fallback
+    // to list all partitions in the metastore and we find such a partition, 
we simply ignore
+    // it without dropping it from the metastore and notify the user with an 
error message,
+    // so the user may manually fix it.
+
+    String dummyBasePath = new Path(basePath).getParent().toString() + 
"/dummy_basepath";
+    ddlExecutor.runSQL("ALTER TABLE `" + HiveTestUtil.TABLE_NAME
+        + "` ADD PARTITION (`datestr`='xyz') LOCATION '" + dummyBasePath + 
"/xyz'");
+
     // Lets do the sync
     reSyncHiveTable();
 
     // Sync should update the changed partition to correct path
     List<Partition> tablePartitions = 
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
-    assertEquals(7, tablePartitions.size(), "The one partition we wrote should 
be added to hive");
+    assertEquals(8, tablePartitions.size(), "The two partitions we wrote 
should be added to hive");
     assertEquals(instantTime, 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME).get(),
         "The last commit that was synced should be 100");
+
+    // Verify that there is one ADD, UPDATE, and DROP event for each type
+    hivePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    List<String> allPartitionPathsOnStorage = 
hiveClient.getAllPartitionPathsOnStorage()
+        .stream().sorted().collect(Collectors.toList());
+    String dropPartition = allPartitionPathsOnStorage.remove(0);
+    allPartitionPathsOnStorage.add("2050/01/02");
+    partitionEvents = hiveClient.getPartitionEvents(hivePartitions, 
allPartitionPathsOnStorage);
+    assertEquals(3, partitionEvents.size(), "There should be only one 
partition event");
+    assertEquals(
+        "2050/01/02",
+        partitionEvents.stream().filter(e -> e.eventType == 
PartitionEventType.ADD)
+            .findFirst().get().storagePartition,
+        "There should be only one partition event of type ADD");
+    assertEquals(
+        "2050/01/01",
+        partitionEvents.stream().filter(e -> e.eventType == 
PartitionEventType.UPDATE)
+            .findFirst().get().storagePartition,
+        "There should be only one partition event of type UPDATE");
+    assertEquals(
+        dropPartition,
+        partitionEvents.stream().filter(e -> e.eventType == 
PartitionEventType.DROP)
+            .findFirst().get().storagePartition,
+        "There should be only one partition event of type DROP");
+
+    // Simulate the case where the last sync timestamp is before the start of 
the active timeline,
+    // by overwriting the same table with some partitions deleted and new 
partitions added
+    HiveTestUtil.createCOWTable("200", 6, useSchemaFromCommitMetadata);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+    tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(Option.of("200"), 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+    assertEquals(7, tablePartitions.size());
+
+    // Trigger the fallback of listing all partitions again.  There is no 
partition change.
+    HiveTestUtil.commitToTable("300", 1, useSchemaFromCommitMetadata);
+    HiveTestUtil.removeCommitFromActiveTimeline("200", COMMIT_ACTION);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+    tablePartitions = hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
+    assertEquals(Option.of("300"), 
hiveClient.getLastCommitTimeSynced(HiveTestUtil.TABLE_NAME));
+    assertEquals(7, tablePartitions.size());
   }
 
   @ParameterizedTest
@@ -1336,6 +1392,9 @@ public class TestHiveSyncTool {
     String commitTime0 = "100";
     String commitTime1 = "101";
     String commitTime2 = "102";
+    String commitTime3 = "103";
+    String commitTime4 = "104";
+    String commitTime5 = "105";
     HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true);
 
     reInitHiveSyncClient();
@@ -1344,10 +1403,22 @@ public class TestHiveSyncTool {
     assertTrue(hiveClient.tableExists(tableName));
     assertEquals(commitTime1, 
hiveClient.getLastCommitTimeSynced(tableName).get());
 
-    HiveTestUtil.addMORPartitions(0, true, true, true, 
ZonedDateTime.now().plusDays(2), commitTime1, commitTime2);
+    HiveTestUtil.addMORPartitions(0, true, true, true, 
ZonedDateTime.now().plusDays(2), commitTime2, commitTime3);
 
     reSyncHiveTable();
     assertEquals(commitTime1, 
hiveClient.getLastCommitTimeSynced(tableName).get());
+
+    // Let the last commit time synced to be before the start of the active 
timeline,
+    // to trigger the fallback of listing all partitions. There is no 
partition change
+    // and the last commit time synced should still be the same.
+    HiveTestUtil.addMORPartitions(0, true, true, true, 
ZonedDateTime.now().plusDays(2), commitTime4, commitTime5);
+    HiveTestUtil.removeCommitFromActiveTimeline(commitTime0, COMMIT_ACTION);
+    HiveTestUtil.removeCommitFromActiveTimeline(commitTime1, 
DELTA_COMMIT_ACTION);
+    HiveTestUtil.removeCommitFromActiveTimeline(commitTime2, COMMIT_ACTION);
+    HiveTestUtil.removeCommitFromActiveTimeline(commitTime3, 
DELTA_COMMIT_ACTION);
+    reInitHiveSyncClient();
+    reSyncHiveTable();
+    assertEquals(commitTime1, 
hiveClient.getLastCommitTimeSynced(tableName).get());
   }
 
   private void reSyncHiveTable() {
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index 4e2943be8d5..ad61d6e0d30 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -68,6 +68,8 @@ import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.junit.platform.commons.JUnitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -88,6 +90,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.table.HoodieTableMetaClient.METAFOLDER_NAME;
 import static 
org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_BATCH_SYNC_PARTITION_NUM;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_PASS;
 import static org.apache.hudi.hive.HiveSyncConfigHolder.HIVE_URL;
@@ -103,6 +106,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @SuppressWarnings("SameParameterValue")
 public class HiveTestUtil {
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveTestUtil.class);
 
   public static final String DB_NAME = "testdb";
   public static final String TABLE_NAME = "test1";
@@ -190,22 +194,56 @@ public class HiveTestUtil {
   public static void createCOWTable(String instantTime, int 
numberOfPartitions, boolean useSchemaFromCommitMetadata,
                                     String basePath, String databaseName, 
String tableName) throws IOException, URISyntaxException {
     Path path = new Path(basePath);
-    FileIOUtils.deleteDirectory(new File(basePath));
+    if (fileSystem.exists(path)) {
+      fileSystem.delete(path, true);
+    }
     HoodieTableMetaClient.withPropertyBuilder()
-            .setTableType(HoodieTableType.COPY_ON_WRITE)
-            .setTableName(tableName)
-            .setPayloadClass(HoodieAvroPayload.class)
-            .initTable(configuration, basePath);
+        .setTableType(HoodieTableType.COPY_ON_WRITE)
+        .setTableName(tableName)
+        .setPayloadClass(HoodieAvroPayload.class)
+        .initTable(configuration, basePath);
 
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
+    commitToTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata,
+        basePath, databaseName, tableName);
+  }
+
+  public static void commitToTable(
+      String instantTime, int numberOfPartitions, boolean 
useSchemaFromCommitMetadata)
+      throws IOException, URISyntaxException {
+    commitToTable(instantTime, numberOfPartitions, useSchemaFromCommitMetadata,
+        basePath, DB_NAME, TABLE_NAME);
+  }
+
+  public static void commitToTable(
+      String instantTime, int numberOfPartitions, boolean 
useSchemaFromCommitMetadata,
+      String basePath, String databaseName, String tableName) throws 
IOException, URISyntaxException {
     ZonedDateTime dateTime = ZonedDateTime.now();
     HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, 
true,
-            useSchemaFromCommitMetadata, dateTime, instantTime, basePath);
+        useSchemaFromCommitMetadata, dateTime, instantTime, basePath);
     createdTablesSet.add(databaseName + "." + tableName);
     createCommitFile(commitMetadata, instantTime, basePath);
   }
 
+  public static void removeCommitFromActiveTimeline(String instantTime, String 
actionType) {
+    List<Path> pathsToDelete = new ArrayList<>();
+    Path metaFolderPath = new Path(basePath, METAFOLDER_NAME);
+    String actionSuffix = "." + actionType;
+    pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix));
+    pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix + 
".requested"));
+    pathsToDelete.add(new Path(metaFolderPath, instantTime + actionSuffix + 
".inflight"));
+    pathsToDelete.forEach(path -> {
+      try {
+        if (fileSystem.exists(path)) {
+          fileSystem.delete(path, false);
+        }
+      } catch (IOException e) {
+        LOG.warn("Error deleting file: ", e);
+      }
+    });
+  }
+
   public static void createCOWTable(String instantTime, int 
numberOfPartitions, boolean useSchemaFromCommitMetadata)
       throws IOException, URISyntaxException {
     createCOWTable(instantTime, numberOfPartitions, 
useSchemaFromCommitMetadata, basePath, DB_NAME, TABLE_NAME);
@@ -481,7 +519,7 @@ public class HiveTestUtil {
 
   public static void createCommitFile(HoodieCommitMetadata commitMetadata, 
String instantTime, String basePath) throws IOException {
     byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
         + HoodieTimeline.makeCommitFileName(instantTime));
     FSDataOutputStream fsout = fileSystem.create(fullPath, true);
     fsout.write(bytes);
@@ -490,7 +528,7 @@ public class HiveTestUtil {
 
   public static void createReplaceCommitFile(HoodieReplaceCommitMetadata 
commitMetadata, String instantTime) throws IOException {
     byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
         + HoodieTimeline.makeReplaceFileName(instantTime));
     FSDataOutputStream fsout = fileSystem.create(fullPath, true);
     fsout.write(bytes);
@@ -505,7 +543,7 @@ public class HiveTestUtil {
   private static void createCompactionCommitFile(HoodieCommitMetadata 
commitMetadata, String instantTime)
       throws IOException {
     byte[] bytes = 
commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
         + HoodieTimeline.makeCommitFileName(instantTime));
     FSDataOutputStream fsout = fileSystem.create(fullPath, true);
     fsout.write(bytes);
@@ -515,7 +553,7 @@ public class HiveTestUtil {
   private static void createDeltaCommitFile(HoodieCommitMetadata 
deltaCommitMetadata, String deltaCommitTime)
       throws IOException {
     byte[] bytes = 
deltaCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8);
-    Path fullPath = new Path(basePath + "/" + 
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+    Path fullPath = new Path(basePath + "/" + METAFOLDER_NAME + "/"
         + HoodieTimeline.makeDeltaFileName(deltaCommitTime));
     FSDataOutputStream fsout = fileSystem.create(fullPath, true);
     fsout.write(bytes);
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index 7f73be33070..b6f1d0bdfa2 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.hadoop.CachingPath;
 import org.apache.hudi.sync.common.model.Partition;
 import org.apache.hudi.sync.common.model.PartitionEvent;
 import org.apache.hudi.sync.common.model.PartitionValueExtractor;
@@ -112,16 +113,25 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     }
   }
 
+  /**
+   * Gets all relative partitions paths in the Hudi table on storage.
+   *
+   * @return All relative partitions paths.
+   */
+  public List<String> getAllPartitionPathsOnStorage() {
+    HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
+    return FSUtils.getAllPartitionPaths(engineContext,
+        config.getString(META_SYNC_BASE_PATH),
+        config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
+        config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+  }
+
   public List<String> getWrittenPartitionsSince(Option<String> 
lastCommitTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
       LOG.info("Last commit time synced is not known, listing all partitions 
in "
           + config.getString(META_SYNC_BASE_PATH)
           + ",FS :" + config.getHadoopFileSystem());
-      HoodieLocalEngineContext engineContext = new 
HoodieLocalEngineContext(metaClient.getHadoopConf());
-      return FSUtils.getAllPartitionPaths(engineContext,
-          config.getString(META_SYNC_BASE_PATH),
-          config.getBoolean(META_SYNC_USE_FILE_LISTING_FROM_METADATA),
-          config.getBoolean(META_SYNC_ASSUME_DATE_PARTITION));
+      return getAllPartitionPathsOnStorage();
     } else {
       LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", 
Getting commits since then");
       return TimelineUtils.getWrittenPartitions(
@@ -129,27 +139,75 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     }
   }
 
+  /**
+   * Gets the partition events for changed partitions.
+   * <p>
+   * This compares the list of all partitions of a table stored in the 
metastore and
+   * on the storage:
+   * (1) Partitions exist in the metastore, but NOT the storage: drops them in 
the metastore;
+   * (2) Partitions exist on the storage, but NOT the metastore: adds them to 
the metastore;
+   * (3) Partitions exist in both, but the partition path is different: update 
them in the metastore.
+   *
+   * @param allPartitionsInMetastore All partitions of a table stored in the 
metastore.
+   * @param allPartitionsOnStorage   All partitions of a table stored on the 
storage.
+   * @return partition events for changed partitions.
+   */
+  public List<PartitionEvent> getPartitionEvents(List<Partition> 
allPartitionsInMetastore,
+                                                 List<String> 
allPartitionsOnStorage) {
+    Map<String, String> paths = 
getPartitionValuesToPathMapping(allPartitionsInMetastore);
+    Set<String> partitionsToDrop = new HashSet<>(paths.keySet());
+
+    List<PartitionEvent> events = new ArrayList<>();
+    for (String storagePartition : allPartitionsOnStorage) {
+      Path storagePartitionPath = 
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), 
storagePartition);
+      String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
+      // Check if the partition values or if hdfs path is the same
+      List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
+
+      if (!storagePartitionValues.isEmpty()) {
+        String storageValue = String.join(", ", storagePartitionValues);
+        // Remove partitions that exist on storage from the `partitionsToDrop` 
set,
+        // so the remaining partitions that exist in the metastore should be 
dropped
+        partitionsToDrop.remove(storageValue);
+        if (!paths.containsKey(storageValue)) {
+          events.add(PartitionEvent.newPartitionAddEvent(storagePartition));
+        } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) {
+          events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition));
+        }
+      }
+    }
+
+    partitionsToDrop.forEach(storageValue -> {
+      String storagePath = paths.get(storageValue);
+      try {
+        String relativePath = FSUtils.getRelativePartitionPath(
+            metaClient.getBasePathV2(), new CachingPath(storagePath));
+        events.add(PartitionEvent.newPartitionDropEvent(relativePath));
+      } catch (IllegalArgumentException e) {
+        LOG.error("Cannot parse the path stored in the metastore, ignoring it 
for "
+            + "generating DROP partition event: \"" + storagePath + "\".", e);
+      }
+    });
+    return events;
+  }
+
   /**
    * Iterate over the storage partitions and find if there are any new 
partitions that need to be added or updated.
    * Generate a list of PartitionEvent based on the changes required.
    */
-  public List<PartitionEvent> getPartitionEvents(List<Partition> 
tablePartitions, List<String> partitionStoragePartitions, Set<String> 
droppedPartitions) {
-    Map<String, String> paths = new HashMap<>();
-    for (Partition tablePartition : tablePartitions) {
-      List<String> hivePartitionValues = tablePartition.getValues();
-      String fullTablePartitionPath =
-          Path.getPathWithoutSchemeAndAuthority(new 
Path(tablePartition.getStorageLocation())).toUri().getPath();
-      paths.put(String.join(", ", hivePartitionValues), 
fullTablePartitionPath);
-    }
+  public List<PartitionEvent> getPartitionEvents(List<Partition> 
partitionsInMetastore,
+                                                 List<String> 
writtenPartitionsOnStorage,
+                                                 Set<String> 
droppedPartitionsOnStorage) {
+    Map<String, String> paths = 
getPartitionValuesToPathMapping(partitionsInMetastore);
 
     List<PartitionEvent> events = new ArrayList<>();
-    for (String storagePartition : partitionStoragePartitions) {
+    for (String storagePartition : writtenPartitionsOnStorage) {
       Path storagePartitionPath = 
FSUtils.getPartitionPath(config.getString(META_SYNC_BASE_PATH), 
storagePartition);
       String fullStoragePartitionPath = 
Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = 
partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
 
-      if (droppedPartitions.contains(storagePartition)) {
+      if (droppedPartitionsOnStorage.contains(storagePartition)) {
         events.add(PartitionEvent.newPartitionDropEvent(storagePartition));
       } else {
         if (!storagePartitionValues.isEmpty()) {
@@ -164,4 +222,22 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
     }
     return events;
   }
+
+  /**
+   * Gets the partition values to the absolute path mapping based on the
+   * partition information from the metastore.
+   *
+   * @param partitionsInMetastore Partitions in the metastore.
+   * @return The partition values to the absolute path mapping.
+   */
+  private Map<String, String> getPartitionValuesToPathMapping(List<Partition> 
partitionsInMetastore) {
+    Map<String, String> paths = new HashMap<>();
+    for (Partition tablePartition : partitionsInMetastore) {
+      List<String> hivePartitionValues = tablePartition.getValues();
+      String fullTablePartitionPath =
+          Path.getPathWithoutSchemeAndAuthority(new 
Path(tablePartition.getStorageLocation())).toUri().getPath();
+      paths.put(String.join(", ", hivePartitionValues), 
fullTablePartitionPath);
+    }
+    return paths;
+  }
 }

Reply via email to