This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new bada8110120 [HUDI-7494]: multi writer sync partition to glue will missing some partitions (#10841) bada8110120 is described below commit bada811012095232816aead7ba3f542bc3620b7d Author: Nicolas Paris <nicolas.pa...@adevinta.com> AuthorDate: Sat Jul 20 11:23:37 2024 +0200 [HUDI-7494]: multi writer sync partition to glue will missing some partitions (#10841) --- .../hudi/aws/sync/AWSGlueCatalogSyncClient.java | 43 +++++++++++++++++----- .../org/apache/hudi/hive/HoodieHiveSyncClient.java | 4 +- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java index 60d6a1e708c..520f4364837 100644 --- a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java +++ b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java @@ -23,6 +23,8 @@ import org.apache.hudi.aws.sync.util.GluePartitionFilterGenerator; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.CustomizedThreadFactory; import org.apache.hudi.common.util.HoodieTimer; @@ -841,12 +843,23 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { public Option<String> getLastCommitTimeSynced(String tableName) { try { Table table = getTable(awsGlue, databaseName, tableName); - return Option.ofNullable(table.parameters().get(HOODIE_LAST_COMMIT_TIME_SYNC)); + return Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null)); } catch (Exception e) { throw new HoodieGlueSyncException("Fail to get last sync commit time for " + tableId(databaseName, tableName), e); } } + @Override + public Option<String> getLastCommitCompletionTimeSynced(String tableName) { + // Get the last commit completion time from the TBLproperties + try { + Table table = getTable(awsGlue, databaseName, tableName); + return Option.ofNullable(table.parameters().getOrDefault(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, null)); + } catch (Exception e) { + throw new HoodieGlueSyncException("Failed to get the last commit completion time synced from the table " + tableName, e); + } + } + @Override public void close() { awsGlue.close(); @@ -854,15 +867,27 @@ public class AWSGlueCatalogSyncClient extends HoodieSyncClient { @Override public void updateLastCommitTimeSynced(String tableName) { - if (!getActiveTimeline().lastInstant().isPresent()) { + HoodieTimeline activeTimeline = getActiveTimeline(); + Option<String> lastCommitSynced = activeTimeline.lastInstant().map(HoodieInstant::getTimestamp); + Option<String> lastCommitCompletionSynced = activeTimeline + .getInstantsOrderedByCompletionTime() + .skip(activeTimeline.countInstants() - 1) + .findFirst() + .map(i -> Option.of(i.getCompletionTime())) + .orElse(Option.empty()); + if (lastCommitSynced.isPresent()) { + try { + HashMap<String, String> propertyMap = new HashMap<>(); + propertyMap.put(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get()); + if (lastCommitCompletionSynced.isPresent()) { + propertyMap.put(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, lastCommitCompletionSynced.get()); + } + updateTableParameters(awsGlue, databaseName, tableName, propertyMap, skipTableArchive); + } catch (Exception e) { + throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e); + } + } else { LOG.warn("No commit in active timeline."); - return; - } - final String lastCommitTimestamp = getActiveTimeline().lastInstant().get().getTimestamp(); - try { - updateTableParameters(awsGlue, databaseName, tableName, Collections.singletonMap(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitTimestamp), skipTableArchive); - } catch (Exception e) { - throw new HoodieGlueSyncException("Fail to update last sync commit time for " + tableId(databaseName, tableName), e); } try { // as a side effect, we also refresh the partition indexes if needed diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java index ebf5dc7368e..ad96e511af6 100644 --- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java +++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java @@ -418,7 +418,9 @@ public class HoodieHiveSyncClient extends HoodieSyncClient { SerDeInfo serdeInfo = sd.getSerdeInfo(); serdeInfo.putToParameters(ConfigUtils.TABLE_SERDE_PATH, basePath); table.putToParameters(HOODIE_LAST_COMMIT_TIME_SYNC, lastCommitSynced.get()); - table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, lastCommitCompletionSynced.get()); + if (lastCommitCompletionSynced.isPresent()) { + table.putToParameters(HOODIE_LAST_COMMIT_COMPLETION_TIME_SYNC, lastCommitCompletionSynced.get()); + } client.alter_table(databaseName, tableName, table); } catch (Exception e) { throw new HoodieHiveSyncException("Failed to get update last commit time synced to " + lastCommitSynced, e);