This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bada8110120 [HUDI-7494]: multi writer sync partition to glue will 
missing some partitions (#10841)
bada8110120 is described below

commit bada811012095232816aead7ba3f542bc3620b7d
Author: Nicolas Paris <nicolas.pa...@adevinta.com>
AuthorDate: Sat Jul 20 11:23:37 2024 +0200

    [HUDI-7494]: multi writer sync partition to glue will missing some 
partitions (#10841)
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    | 43 +++++++++++++++++-----
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java |  4 +-
 2 files changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 60d6a1e708c..520f4364837 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -23,6 +23,8 @@ import 
org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.HoodieTimer;
@@ -841,12 +843,23 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   public Option<String> getLastCommitTimeSynced(String tableName) {
     try {
       Table table = getTable(awsGlue, databaseName, tableName);
-      return 
Option.ofNullable(table.parameters().get(HOODIE_LAST_COMMIT_TIME_SYNC));
+      return 
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, 
null));
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Fail to get last sync commit time for 
" + tableId(databaseName, tableName), e);
     }
   }
 
+  @Override
+  public Option<String> getLastCommitCompletionTimeSynced(String tableName) {
+    // Get the last commit completion time from the TBLproperties
+    try {
+      Table table = getTable(awsGlue, databaseName, tableName);
+      return 
Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC,
 null));
+    } catch (Exception e) {
+      throw new HoodieGlueSyncException("Failed to get the last commit 
completion time synced from the table " + tableName, e);
+    }
+  }
+
   @Override
   public void close() {
     awsGlue.close();
@@ -854,15 +867,27 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
 
   @Override
   public void updateLastCommitTimeSynced(String tableName) {
-    if (!getActiveTimeline().lastInstant().isPresent()) {
+    HoodieTimeline activeTimeline = getActiveTimeline();
+    Option<String> lastCommitSynced = 
activeTimeline.lastInstant().map(HoodieInstant::getTimestamp);
+    Option<String> lastCommitCompletionSynced = activeTimeline
+        .getInstantsOrderedByCompletionTime()
+        .skip(activeTimeline.countInstants() - 1)
+        .findFirst()
+        .map(i -> Option.of(i.getCompletionTime()))
+        .orElse(Option.empty());
+    if (lastCommitSynced.isPresent()) {
+      try {
+        HashMap<String, String> propertyMap = new HashMap<>();
+        propertyMap.put(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get());
+        if (lastCommitCompletionSynced.isPresent()) {
+          propertyMap.put(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionSynced.get());
+        }
+        updateTableParameters(awsGlue, databaseName, tableName, propertyMap, 
skipTableArchive);
+      } catch (Exception e) {
+        throw new HoodieGlueSyncException("Fail to update last sync commit 
time for " + tableId(databaseName, tableName), e);
+      }
+    } else {
       LOG.warn("No commit in active timeline.");
-      return;
-    }
-    final String lastCommitTimestamp = 
getActiveTimeline().lastInstant().get().getTimestamp();
-    try {
-      updateTableParameters(awsGlue, databaseName, tableName, 
Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), 
skipTableArchive);
-    } catch (Exception e) {
-      throw new HoodieGlueSyncException("Fail to update last sync commit time 
for " + tableId(databaseName, tableName), e);
     }
     try {
       // as a side effect, we also refresh the partition indexes if needed
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index ebf5dc7368e..ad96e511af6 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -418,7 +418,9 @@ public class HoodieHiveSyncClient extends HoodieSyncClient {
         SerDeInfo serdeInfo = sd.getSerdeInfo();
         serdeInfo.putToParameters(ConfigUtils.TABLE_SERDE_PATH, basePath);
         table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, 
lastCommitSynced.get());
-        table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionSynced.get());
+        if (lastCommitCompletionSynced.isPresent()) {
+          table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, 
lastCommitCompletionSynced.get());
+        }
         client.alter_table(databaseName, tableName, table);
       } catch (Exception e) {
         throw new HoodieHiveSyncException("Failed to get update last commit 
time synced to " + lastCommitSynced, e);

Reply via email to