This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 03e5b022 [590] Add Hudi Glue Catalog Sync Implementation
03e5b022 is described below

commit 03e5b02239db0008bd96b1b9f757e469a7b668d0
Author: Vamsi <va...@onehouse.ai>
AuthorDate: Thu Feb 13 17:14:34 2025 +0530

    [590] Add Hudi Glue Catalog Sync Implementation
---
 xtable-aws/pom.xml                                 |   8 +
 .../org/apache/xtable/glue/GlueCatalogConfig.java  |  11 +
 .../xtable/glue/GlueCatalogConversionSource.java   |  22 +-
 .../glue/GlueCatalogPartitionSyncOperations.java   | 331 +++++++++++++
 .../apache/xtable/glue/GlueCatalogSyncClient.java  |  38 +-
 .../glue/GlueCatalogTableBuilderFactory.java       |   5 +-
 .../apache/xtable/glue/GlueCatalogTableUtils.java  |  56 +++
 .../apache/xtable/glue/GlueSchemaExtractor.java    |   2 +-
 .../glue/table/HudiGlueCatalogTableBuilder.java    | 185 ++++++++
 .../xtable/glue/GlueCatalogSyncTestBase.java       |   7 +
 .../TestGlueCatalogPartitionSyncOperations.java    | 519 +++++++++++++++++++++
 .../xtable/glue/TestGlueCatalogSyncClient.java     |  87 +++-
 .../table/TestHudiGlueCatalogTableBuilder.java     |  94 ++--
 .../HudiCatalogTablePropertiesExtractor.java       |  36 +-
 .../TestHudiCatalogTablePropertiesExtractor.java   |  52 ++-
 .../hms/table/HudiHMSCatalogTableBuilder.java      |  35 +-
 .../hms/table/TestHudiHMSCatalogTableBuilder.java  |  40 +-
 17 files changed, 1340 insertions(+), 188 deletions(-)

diff --git a/xtable-aws/pom.xml b/xtable-aws/pom.xml
index 099e04d3..7ec24cf9 100644
--- a/xtable-aws/pom.xml
+++ b/xtable-aws/pom.xml
@@ -36,6 +36,14 @@
             <version>${project.version}</version>
         </dependency>
 
+        <!-- Hudi dependencies -->
+        <dependency>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-hive-sync</artifactId>
+            <version>${hudi.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <!-- Hadoop dependencies -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
index 87443698..09e195f1 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
@@ -28,6 +28,8 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
 
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,6 +56,15 @@ public class GlueCatalogConfig {
   @JsonProperty("externalCatalog.glue.credentialsProviderClass")
   private final String clientCredentialsProviderClass;
 
+  @JsonProperty("externalCatalog.glue.schema_string_length_thresh")
+  private int schemaLengthThreshold = 4000;
+
+  @JsonProperty("externalCatalog.glue.partition_extractor_class")
+  private String partitionExtractorClass = 
MultiPartKeysValueExtractor.class.getName();
+
+  @JsonProperty("externalCatalog.glue.max_partitions_per_request")
+  private int maxPartitionsPerRequest = 1000;
+
   /**
    * In case a credentialsProviderClass is configured and require additional 
properties for
    * instantiation, those properties should start with {@link
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
index 370ae44a..dc905cef 100644
--- 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
+++ 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
@@ -18,8 +18,6 @@
  
 package org.apache.xtable.glue;
 
-import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
-
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -31,13 +29,10 @@ import org.apache.xtable.conversion.ExternalCatalogConfig;
 import org.apache.xtable.conversion.SourceTable;
 import org.apache.xtable.exception.CatalogSyncException;
 import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
 import org.apache.xtable.model.storage.CatalogType;
 import org.apache.xtable.spi.extractor.CatalogConversionSource;
 
 import software.amazon.awssdk.services.glue.GlueClient;
-import software.amazon.awssdk.services.glue.model.GetTableRequest;
-import software.amazon.awssdk.services.glue.model.GetTableResponse;
 import software.amazon.awssdk.services.glue.model.GlueException;
 import software.amazon.awssdk.services.glue.model.Table;
 
@@ -61,19 +56,12 @@ public class GlueCatalogConversionSource implements 
CatalogConversionSource {
 
   @Override
   public SourceTable getSourceTable(CatalogTableIdentifier tblIdentifier) {
-    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
     try {
-      GetTableResponse response =
-          glueClient.getTable(
-              GetTableRequest.builder()
-                  .catalogId(glueCatalogConfig.getCatalogId())
-                  .databaseName(tableIdentifier.getDatabaseName())
-                  .name(tableIdentifier.getTableName())
-                  .build());
-      Table table = response.table();
+      Table table =
+          GlueCatalogTableUtils.getTable(
+              glueClient, glueCatalogConfig.getCatalogId(), tblIdentifier);
       if (table == null) {
-        throw new IllegalStateException(
-            String.format("table: %s is null", tableIdentifier.getId()));
+        throw new IllegalStateException(String.format("table: %s is null", 
tblIdentifier.getId()));
       }
 
       String tableFormat = TableFormatUtils.getTableFormat(table.parameters());
@@ -91,7 +79,7 @@ public class GlueCatalogConversionSource implements 
CatalogConversionSource {
           .additionalProperties(tableProperties)
           .build();
     } catch (GlueException e) {
-      throw new CatalogSyncException("Failed to get table: " + 
tableIdentifier.getId(), e);
+      throw new CatalogSyncException("Failed to get table: " + 
tblIdentifier.getId(), e);
     }
   }
 
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java
 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..8c2cff24
--- /dev/null
+++ 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.glue;
+
+import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hudi.common.util.CollectionUtils;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest;
+import 
software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequestEntry;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PartitionValueList;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+
+@Log4j2
+public class GlueCatalogPartitionSyncOperations implements 
CatalogPartitionSyncOperations {
+
+  private final GlueClient glueClient;
+  private final GlueCatalogConfig glueCatalogConfig;
+
+  public GlueCatalogPartitionSyncOperations(
+      GlueClient glueClient, GlueCatalogConfig glueCatalogConfig) {
+    this.glueClient = glueClient;
+    this.glueCatalogConfig = glueCatalogConfig;
+  }
+
+  @Override
+  public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier 
catalogTableIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    try {
+      List<CatalogPartition> partitions = new ArrayList<>();
+      String nextToken = null;
+      do {
+        GetPartitionsResponse result =
+            glueClient.getPartitions(
+                GetPartitionsRequest.builder()
+                    .databaseName(tableIdentifier.getDatabaseName())
+                    .tableName(tableIdentifier.getTableName())
+                    .nextToken(nextToken)
+                    .build());
+        partitions.addAll(
+            result.partitions().stream()
+                .map(p -> new CatalogPartition(p.values(), 
p.storageDescriptor().location()))
+                .collect(Collectors.toList()));
+        nextToken = result.nextToken();
+      } while (nextToken != null);
+      return partitions;
+    } catch (Exception e) {
+      throw new CatalogSyncException(
+          "Failed to get all partitions for table " + tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void addPartitionsToTable(
+      CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> 
partitionsToAdd) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    if (partitionsToAdd.isEmpty()) {
+      log.info("No partitions to add for {}", tableIdentifier);
+      return;
+    }
+    log.info("Adding {} CatalogPartition(s) in table {}", 
partitionsToAdd.size(), tableIdentifier);
+    try {
+      Table table =
+          GlueCatalogTableUtils.getTable(
+              glueClient, glueCatalogConfig.getCatalogId(), 
catalogTableIdentifier);
+      StorageDescriptor sd = table.storageDescriptor();
+      List<PartitionInput> partitionInputs =
+          partitionsToAdd.stream()
+              .map(partition -> createPartitionInput(table, partition))
+              .collect(Collectors.toList());
+
+      List<BatchCreatePartitionResponse> responses = new ArrayList<>();
+
+      CollectionUtils.batches(partitionInputs, 
glueCatalogConfig.getMaxPartitionsPerRequest())
+          .forEach(
+              batch -> {
+                BatchCreatePartitionRequest request =
+                    BatchCreatePartitionRequest.builder()
+                        .databaseName(tableIdentifier.getDatabaseName())
+                        .tableName(tableIdentifier.getTableName())
+                        .partitionInputList(batch)
+                        .build();
+                responses.add(glueClient.batchCreatePartition(request));
+              });
+
+      responses.forEach(
+          response -> {
+            if (CollectionUtils.nonEmpty(response.errors())) {
+              if (response.errors().stream()
+                  .allMatch(
+                      (error) ->
+                          
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
+                log.warn("Partitions already exist in glue: {}", 
response.errors());
+              } else {
+                throw new CatalogSyncException(
+                    "Fail to add partitions to "
+                        + tableIdentifier
+                        + " with error(s): "
+                        + response.errors());
+              }
+            }
+          });
+    } catch (Exception e) {
+      throw new CatalogSyncException("Fail to add partitions to " + 
tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void updatePartitionsToTable(
+      CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> 
changedPartitions) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    if (changedPartitions.isEmpty()) {
+      log.info("No partitions to change for {}", 
tableIdentifier.getTableName());
+      return;
+    }
+    log.info("Updating {} partition(s) in table {}", changedPartitions.size(), 
tableIdentifier);
+    try {
+      Table table =
+          GlueCatalogTableUtils.getTable(
+              glueClient, glueCatalogConfig.getCatalogId(), 
catalogTableIdentifier);
+      StorageDescriptor sd = table.storageDescriptor();
+      List<BatchUpdatePartitionRequestEntry> updatePartitionEntries =
+          changedPartitions.stream()
+              .map(
+                  partition ->
+                      BatchUpdatePartitionRequestEntry.builder()
+                          .partitionInput(createPartitionInput(table, 
partition))
+                          .partitionValueList(partition.getValues())
+                          .build())
+              .collect(Collectors.toList());
+
+      List<BatchUpdatePartitionResponse> responses = new ArrayList<>();
+
+      CollectionUtils.batches(
+              updatePartitionEntries, 
glueCatalogConfig.getMaxPartitionsPerRequest())
+          .forEach(
+              batch -> {
+                BatchUpdatePartitionRequest request =
+                    BatchUpdatePartitionRequest.builder()
+                        .databaseName(tableIdentifier.getDatabaseName())
+                        .tableName(tableIdentifier.getTableName())
+                        .entries(batch)
+                        .build();
+                responses.add(glueClient.batchUpdatePartition(request));
+              });
+
+      responses.forEach(
+          response -> {
+            if (CollectionUtils.nonEmpty(response.errors())) {
+              throw new CatalogSyncException(
+                  "Fail to update partitions to "
+                      + tableIdentifier
+                      + " with error(s): "
+                      + response.errors());
+            }
+          });
+    } catch (Exception e) {
+      throw new CatalogSyncException("Fail to update partitions to " + 
tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void dropPartitions(
+      CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> 
partitionsToDrop) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    if (isNullOrEmpty(partitionsToDrop)) {
+      log.info("No partitions to drop for {}", tableIdentifier);
+      return;
+    }
+    log.info("Drop {} CatalogPartition(s) in table {}", 
partitionsToDrop.size(), tableIdentifier);
+    try {
+      List<BatchDeletePartitionResponse> responses = new ArrayList<>();
+
+      CollectionUtils.batches(partitionsToDrop, 
glueCatalogConfig.getMaxPartitionsPerRequest())
+          .forEach(
+              batch -> {
+                List<PartitionValueList> partitionValueLists =
+                    batch.stream()
+                        .map(
+                            CatalogPartition ->
+                                PartitionValueList.builder()
+                                    .values(CatalogPartition.getValues())
+                                    .build())
+                        .collect(Collectors.toList());
+
+                BatchDeletePartitionRequest batchDeletePartitionRequest =
+                    BatchDeletePartitionRequest.builder()
+                        .databaseName(tableIdentifier.getDatabaseName())
+                        .tableName(tableIdentifier.getTableName())
+                        .partitionsToDelete(partitionValueLists)
+                        .build();
+                
responses.add(glueClient.batchDeletePartition(batchDeletePartitionRequest));
+              });
+
+      responses.forEach(
+          response -> {
+            if (CollectionUtils.nonEmpty(response.errors())) {
+              throw new CatalogSyncException(
+                  "Fail to drop partitions to "
+                      + tableIdentifier
+                      + " with error(s): "
+                      + response.errors());
+            }
+          });
+    } catch (Exception e) {
+      throw new CatalogSyncException("Fail to drop partitions to " + 
tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public Map<String, String> getTableProperties(
+      CatalogTableIdentifier tableIdentifier, List<String> keysToRetrieve) {
+    try {
+      Table table =
+          GlueCatalogTableUtils.getTable(
+              glueClient, glueCatalogConfig.getCatalogId(), tableIdentifier);
+      Map<String, String> tableParameters = table.parameters();
+
+      return keysToRetrieve.stream()
+          .filter(tableParameters::containsKey)
+          .collect(Collectors.toMap(key -> key, tableParameters::get));
+    } catch (Exception e) {
+      throw new CatalogSyncException(
+          "failed to get last time synced properties for table " + 
tableIdentifier, e);
+    }
+  }
+
+  @Override
+  public void updateTableProperties(
+      CatalogTableIdentifier catalogTableIdentifier, Map<String, String> 
propertiesToUpdate) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    if (isNullOrEmpty(propertiesToUpdate)) {
+      return;
+    }
+    try {
+      Table table =
+          GlueCatalogTableUtils.getTable(
+              glueClient, glueCatalogConfig.getCatalogId(), 
catalogTableIdentifier);
+
+      final Map<String, String> newParams = new HashMap<>();
+      newParams.putAll(table.parameters());
+      newParams.putAll(propertiesToUpdate);
+
+      final Instant now = Instant.now();
+      TableInput updatedTableInput =
+          TableInput.builder()
+              .name(tableIdentifier.getTableName())
+              .tableType(table.tableType())
+              .parameters(newParams)
+              .partitionKeys(table.partitionKeys())
+              .storageDescriptor(table.storageDescriptor())
+              .lastAccessTime(now)
+              .lastAnalyzedTime(now)
+              .build();
+
+      UpdateTableRequest request =
+          UpdateTableRequest.builder()
+              .databaseName(tableIdentifier.getDatabaseName())
+              .tableInput(updatedTableInput)
+              .skipArchive(true)
+              .build();
+      glueClient.updateTable(request);
+    } catch (Exception e) {
+      throw new CatalogSyncException(
+          "Fail to update last synced params for table "
+              + tableIdentifier
+              + ": "
+              + propertiesToUpdate,
+          e);
+    }
+  }
+
+  private PartitionInput createPartitionInput(Table table, CatalogPartition 
partition) {
+    StorageDescriptor sd = table.storageDescriptor();
+    StorageDescriptor partitionSD =
+        sd.copy(copySd -> copySd.location(partition.getStorageLocation()));
+    return PartitionInput.builder()
+        .values(partition.getValues())
+        .storageDescriptor(partitionSD)
+        .build();
+  }
+}
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
index d7de2dec..37b91753 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
@@ -21,6 +21,7 @@ package org.apache.xtable.glue;
 import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
 
 import java.time.ZonedDateTime;
+import java.util.Optional;
 
 import lombok.extern.log4j.Log4j2;
 
@@ -28,7 +29,9 @@ import org.apache.hadoop.conf.Configuration;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
 import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.catalog.CatalogUtils;
 import org.apache.xtable.conversion.ExternalCatalogConfig;
 import org.apache.xtable.exception.CatalogSyncException;
 import org.apache.xtable.model.InternalTable;
@@ -45,8 +48,6 @@ import 
software.amazon.awssdk.services.glue.model.DatabaseInput;
 import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
 import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
 import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
-import software.amazon.awssdk.services.glue.model.GetTableRequest;
-import software.amazon.awssdk.services.glue.model.GetTableResponse;
 import software.amazon.awssdk.services.glue.model.Table;
 import software.amazon.awssdk.services.glue.model.TableInput;
 import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
@@ -63,6 +64,7 @@ public class GlueCatalogSyncClient implements 
CatalogSyncClient<Table> {
   private GlueCatalogConfig glueCatalogConfig;
   private Configuration configuration;
   private CatalogTableBuilder<TableInput, Table> tableBuilder;
+  private Optional<CatalogPartitionSyncTool> partitionSyncTool;
 
   // For loading the instance using ServiceLoader
   public GlueCatalogSyncClient() {}
@@ -78,12 +80,14 @@ public class GlueCatalogSyncClient implements 
CatalogSyncClient<Table> {
       Configuration configuration,
       GlueCatalogConfig glueCatalogConfig,
       GlueClient glueClient,
-      CatalogTableBuilder tableBuilder) {
+      CatalogTableBuilder tableBuilder,
+      Optional<CatalogPartitionSyncTool> partitionSyncTool) {
     this.catalogConfig = catalogConfig;
     this.configuration = new Configuration(configuration);
     this.glueCatalogConfig = glueCatalogConfig;
     this.glueClient = glueClient;
     this.tableBuilder = tableBuilder;
+    this.partitionSyncTool = partitionSyncTool;
   }
 
   @Override
@@ -143,20 +147,13 @@ public class GlueCatalogSyncClient implements 
CatalogSyncClient<Table> {
 
   @Override
   public Table getTable(CatalogTableIdentifier tableIdentifier) {
-    HierarchicalTableIdentifier tblIdentifier = 
toHierarchicalTableIdentifier(tableIdentifier);
     try {
-      GetTableResponse response =
-          glueClient.getTable(
-              GetTableRequest.builder()
-                  .catalogId(glueCatalogConfig.getCatalogId())
-                  .databaseName(tblIdentifier.getDatabaseName())
-                  .name(tblIdentifier.getTableName())
-                  .build());
-      return response.table();
+      return GlueCatalogTableUtils.getTable(
+          glueClient, glueCatalogConfig.getCatalogId(), tableIdentifier);
     } catch (EntityNotFoundException e) {
       return null;
     } catch (Exception e) {
-      throw new CatalogSyncException("Failed to get table: " + 
tblIdentifier.getId(), e);
+      throw new CatalogSyncException("Failed to get table: " + 
tableIdentifier.getId(), e);
     }
   }
 
@@ -174,6 +171,8 @@ public class GlueCatalogSyncClient implements 
CatalogSyncClient<Table> {
     } catch (Exception e) {
       throw new CatalogSyncException("Failed to create table: " + 
tblIdentifier.getId(), e);
     }
+
+    partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, 
tableIdentifier));
   }
 
   @Override
@@ -193,6 +192,8 @@ public class GlueCatalogSyncClient implements 
CatalogSyncClient<Table> {
     } catch (Exception e) {
       throw new CatalogSyncException("Failed to refresh table: " + 
tblIdentifier.getId(), e);
     }
+
+    partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, 
tableIdentifier));
   }
 
   @Override
@@ -230,7 +231,16 @@ public class GlueCatalogSyncClient implements 
CatalogSyncClient<Table> {
     this.glueCatalogConfig = 
GlueCatalogConfig.of(catalogConfig.getCatalogProperties());
     this.glueClient = new 
DefaultGlueClientFactory(glueCatalogConfig).getGlueClient();
     this.configuration = new Configuration(configuration);
-    this.tableBuilder = 
GlueCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration);
+    this.tableBuilder =
+        GlueCatalogTableBuilderFactory.getInstance(
+            tableFormat, glueCatalogConfig, this.configuration);
+    this.partitionSyncTool =
+        CatalogUtils.getPartitionSyncTool(
+            tableFormat,
+            glueCatalogConfig.getPartitionExtractorClass(),
+            new GlueCatalogPartitionSyncOperations(glueClient, 
glueCatalogConfig),
+            configuration);
+    ;
   }
 
   @Override
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
index cd0b1d08..7316aaf2 100644
--- 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
+++ 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.xtable.catalog.CatalogTableBuilder;
 import org.apache.xtable.exception.NotSupportedException;
 import org.apache.xtable.glue.table.DeltaGlueCatalogTableBuilder;
+import org.apache.xtable.glue.table.HudiGlueCatalogTableBuilder;
 import org.apache.xtable.glue.table.IcebergGlueCatalogTableBuilder;
 import org.apache.xtable.model.storage.TableFormat;
 
@@ -32,12 +33,14 @@ import 
software.amazon.awssdk.services.glue.model.TableInput;
 class GlueCatalogTableBuilderFactory {
 
   static CatalogTableBuilder<TableInput, Table> getInstance(
-      String tableFormat, Configuration configuration) {
+      String tableFormat, GlueCatalogConfig glueCatalogConfig, Configuration 
configuration) {
     switch (tableFormat) {
       case TableFormat.ICEBERG:
         return new IcebergGlueCatalogTableBuilder(configuration);
       case TableFormat.DELTA:
         return new DeltaGlueCatalogTableBuilder();
+      case TableFormat.HUDI:
+        return new HudiGlueCatalogTableBuilder(glueCatalogConfig, 
configuration);
       default:
         throw new NotSupportedException("Unsupported table format: " + 
tableFormat);
     }
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java
new file mode 100644
index 00000000..269f79a5
--- /dev/null
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.glue;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+
+/** Utils class to fetch details about Glue table */
+public class GlueCatalogTableUtils {
+
+  /**
+   * Retrieves a table from AWS Glue based on the provided catalog and table 
identifiers.
+   *
+   * @param glueClient The AWS Glue client used to connect to AWS Glue.
+   * @param catalogId The ID of the AWS Glue Data Catalog where the table is 
located.
+   * @param catalogTableIdentifier The identifier for the table in the catalog.
+   * @return The retrieved {@link Table} object if found; otherwise, returns 
{@code null} if the
+   *     table does not exist.
+   */
+  public static Table getTable(
+      GlueClient glueClient, String catalogId, CatalogTableIdentifier 
catalogTableIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    GetTableResponse response =
+        glueClient.getTable(
+            GetTableRequest.builder()
+                .catalogId(catalogId)
+                .databaseName(tableIdentifier.getDatabaseName())
+                .name(tableIdentifier.getTableName())
+                .build());
+    return response.table();
+  }
+}
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java 
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
index 1bb51adc..24da6ea4 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
@@ -143,7 +143,7 @@ public class GlueSchemaExtractor {
    * @param fieldSchema InternalTable field schema
    * @return glue column type
    */
-  protected String toTypeString(InternalSchema fieldSchema, String 
tableFormat) {
+  public String toTypeString(InternalSchema fieldSchema, String tableFormat) {
     switch (fieldSchema.getDataType()) {
       case BOOLEAN:
         return "boolean";
diff --git 
a/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java
 
b/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java
new file mode 100644
index 00000000..9618873c
--- /dev/null
+++ 
b/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.glue.table;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.glue.GlueCatalogConfig;
+import org.apache.xtable.glue.GlueSchemaExtractor;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+import org.apache.xtable.hudi.catalog.HudiInputFormatUtils;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.storage.TableFormat;
+
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.SerDeInfo;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+public class HudiGlueCatalogTableBuilder implements 
CatalogTableBuilder<TableInput, Table> {
+  protected static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE";
+
+  private final GlueSchemaExtractor schemaExtractor;
+  private final HudiTableManager hudiTableManager;
+  private final GlueCatalogConfig glueCatalogConfig;
+  private HoodieTableMetaClient metaClient;
+  private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor;
+
+  public HudiGlueCatalogTableBuilder(
+      GlueCatalogConfig glueCatalogConfig, Configuration configuration) {
+    this.glueCatalogConfig = glueCatalogConfig;
+    this.schemaExtractor = GlueSchemaExtractor.getInstance();
+    this.hudiTableManager = HudiTableManager.of(configuration);
+    this.tablePropertiesExtractor = 
HudiCatalogTablePropertiesExtractor.getInstance();
+  }
+
+  @VisibleForTesting
+  HudiGlueCatalogTableBuilder(
+      GlueCatalogConfig glueCatalogConfig,
+      GlueSchemaExtractor schemaExtractor,
+      HudiTableManager hudiTableManager,
+      HoodieTableMetaClient metaClient,
+      HudiCatalogTablePropertiesExtractor propertiesExtractor) {
+    this.glueCatalogConfig = glueCatalogConfig;
+    this.hudiTableManager = hudiTableManager;
+    this.schemaExtractor = schemaExtractor;
+    this.metaClient = metaClient;
+    this.tablePropertiesExtractor = propertiesExtractor;
+  }
+
+  HoodieTableMetaClient getMetaClient(String basePath) {
+    if (metaClient == null) {
+      Optional<HoodieTableMetaClient> metaClientOpt =
+          hudiTableManager.loadTableMetaClientIfExists(basePath);
+
+      if (!metaClientOpt.isPresent()) {
+        throw new CatalogSyncException(
+            "Failed to get meta client since table is not present in the base 
path " + basePath);
+      }
+
+      metaClient = metaClientOpt.get();
+    }
+    return metaClient;
+  }
+
+  @Override
+  public TableInput getCreateTableRequest(
+      InternalTable table, CatalogTableIdentifier catalogTableIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    final Instant now = Instant.now();
+    return TableInput.builder()
+        .name(tableIdentifier.getTableName())
+        .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+        .parameters(
+            tablePropertiesExtractor.getTableProperties(
+                table, glueCatalogConfig.getSchemaLengthThreshold()))
+        .partitionKeys(getSchemaPartitionKeys(table.getPartitioningFields()))
+        .storageDescriptor(getStorageDescriptor(table))
+        .lastAccessTime(now)
+        .lastAnalyzedTime(now)
+        .build();
+  }
+
+  @Override
+  public TableInput getUpdateTableRequest(
+      InternalTable table, Table glueTable, CatalogTableIdentifier 
catalogTableIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier =
+        toHierarchicalTableIdentifier(catalogTableIdentifier);
+    Map<String, String> tableParameters = new 
HashMap<>(glueTable.parameters());
+    tableParameters.putAll(
+        tablePropertiesExtractor.getTableProperties(
+            table, glueCatalogConfig.getSchemaLengthThreshold()));
+    List<Column> newColumns = getSchemaWithoutPartitionKeys(table);
+    StorageDescriptor sd = glueTable.storageDescriptor();
+    StorageDescriptor partitionSD = sd.copy(copySd -> 
copySd.columns(newColumns));
+
+    final Instant now = Instant.now();
+    return TableInput.builder()
+        .name(tableIdentifier.getTableName())
+        .tableType(glueTable.tableType())
+        .parameters(tableParameters)
+        .partitionKeys(getSchemaPartitionKeys(table.getPartitioningFields()))
+        .storageDescriptor(partitionSD)
+        .lastAccessTime(now)
+        .lastAnalyzedTime(now)
+        .build();
+  }
+
+  private StorageDescriptor getStorageDescriptor(InternalTable table) {
+    HoodieFileFormat baseFileFormat =
+        
getMetaClient(table.getBasePath()).getTableConfig().getBaseFileFormat();
+    SerDeInfo serDeInfo =
+        SerDeInfo.builder()
+            
.serializationLibrary(HudiInputFormatUtils.getSerDeClassName(baseFileFormat))
+            
.parameters(tablePropertiesExtractor.getSerdeProperties(table.getBasePath()))
+            .build();
+    return StorageDescriptor.builder()
+        .serdeInfo(serDeInfo)
+        .location(table.getBasePath())
+        
.inputFormat(HudiInputFormatUtils.getInputFormatClassName(baseFileFormat, 
false))
+        
.outputFormat(HudiInputFormatUtils.getOutputFormatClassName(baseFileFormat))
+        .columns(getSchemaWithoutPartitionKeys(table))
+        .build();
+  }
+
+  private List<Column> getSchemaWithoutPartitionKeys(InternalTable table) {
+    List<String> partitionKeys =
+        table.getPartitioningFields().stream()
+            .map(field -> field.getSourceField().getName())
+            .collect(Collectors.toList());
+    return schemaExtractor.toColumns(TableFormat.HUDI, 
table.getReadSchema()).stream()
+        .filter(c -> !partitionKeys.contains(c.name()))
+        .collect(Collectors.toList());
+  }
+
+  private List<Column> getSchemaPartitionKeys(List<InternalPartitionField> 
partitioningFields) {
+    return partitioningFields.stream()
+        .map(
+            field -> {
+              String fieldName = field.getSourceField().getName();
+              String fieldType =
+                  schemaExtractor.toTypeString(
+                      field.getSourceField().getSchema(), TableFormat.HUDI);
+              return Column.builder().name(fieldName).type(fieldType).build();
+            })
+        .collect(Collectors.toList());
+  }
+}
diff --git 
a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java 
b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
index d6efeb19..a1f5e73c 100644
--- 
a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
+++ 
b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
@@ -138,6 +138,13 @@ public class GlueCatalogSyncTestBase {
           .readSchema(INTERNAL_SCHEMA)
           .partitioningFields(Collections.singletonList(PARTITION_FIELD))
           .build();
+  protected static final InternalTable TEST_EVOLVED_HUDI_INTERNAL_TABLE =
+      InternalTable.builder()
+          .basePath(TEST_BASE_PATH)
+          .tableFormat(TableFormat.HUDI)
+          .readSchema(UPDATED_INTERNAL_SCHEMA)
+          .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+          .build();
   protected static final InternalTable TEST_DELTA_INTERNAL_TABLE =
       InternalTable.builder()
           .basePath(TEST_BASE_PATH)
diff --git 
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java
 
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..edf3a790
--- /dev/null
+++ 
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+
+@ExtendWith(MockitoExtension.class)
+public class TestGlueCatalogPartitionSyncOperations extends 
GlueCatalogSyncTestBase {
+
+  private CatalogPartitionSyncOperations gluePartitionSyncOperations;
+
+  @BeforeEach
+  public void setup() {
+    setupCommonMocks();
+  }
+
+  void setupCommonMocks() {
+    gluePartitionSyncOperations =
+        new GlueCatalogPartitionSyncOperations(mockGlueClient, 
mockGlueCatalogConfig);
+  }
+
+  @Test
+  void testGetAllPartitionsSuccess() {
+    // Mock the first response with a next token
+    GetPartitionsResponse firstResponse =
+        GetPartitionsResponse.builder()
+            .partitions(
+                Arrays.asList(
+                    Partition.builder()
+                        .values(Collections.singletonList("value1"))
+                        .storageDescriptor(
+                            
StorageDescriptor.builder().location("location1").build())
+                        .build(),
+                    Partition.builder()
+                        .values(Collections.singletonList("value2"))
+                        .storageDescriptor(
+                            
StorageDescriptor.builder().location("location2").build())
+                        .build()))
+            .nextToken("token123")
+            .build();
+
+    // Mock the second response without a next token
+    GetPartitionsResponse secondResponse =
+        GetPartitionsResponse.builder()
+            .partitions(
+                Collections.singletonList(
+                    Partition.builder()
+                        .values(Collections.singletonList("value3"))
+                        .storageDescriptor(
+                            
StorageDescriptor.builder().location("location3").build())
+                        .build()))
+            .nextToken(null)
+            .build();
+
+    when(mockGlueClient.getPartitions(any(GetPartitionsRequest.class)))
+        .thenReturn(firstResponse)
+        .thenReturn(secondResponse);
+
+    List<org.apache.xtable.catalog.CatalogPartition> partitions =
+        
gluePartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER);
+
+    // Validate the result
+    assertEquals(3, partitions.size());
+
+    assertEquals(Collections.singletonList("value1"), 
partitions.get(0).getValues());
+    assertEquals("location1", partitions.get(0).getStorageLocation());
+
+    assertEquals(Collections.singletonList("value2"), 
partitions.get(1).getValues());
+    assertEquals("location2", partitions.get(1).getStorageLocation());
+
+    assertEquals(Collections.singletonList("value3"), 
partitions.get(2).getValues());
+    assertEquals("location3", partitions.get(2).getStorageLocation());
+
+    // Verify GlueClient interactions
+    verify(mockGlueClient, 
times(2)).getPartitions(any(GetPartitionsRequest.class));
+  }
+
+  @Test
+  void testGetAllPartitionsHandlesException() {
+    // Mock GlueClient to throw an exception
+    when(mockGlueClient.getPartitions(any(GetPartitionsRequest.class)))
+        .thenThrow(new RuntimeException("Test exception"));
+
+    // Execute and validate exception
+
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () -> 
gluePartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER));
+
+    assertInstanceOf(RuntimeException.class, exception.getCause());
+
+    // Verify GlueClient interactions
+    verify(mockGlueClient, 
times(1)).getPartitions(any(GetPartitionsRequest.class));
+  }
+
+  @Test
+  void testAddPartitionsToTableSuccess() {
+    // Mock getTable to return a valid table
+    Table mockTable =
+        Table.builder()
+            .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+            .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .build();
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+    when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+    // Mock glueClient to return a valid response for batchCreatePartition
+    BatchCreatePartitionResponse mockResponse = 
mock(BatchCreatePartitionResponse.class);
+    
when(mockGlueClient.batchCreatePartition(any(BatchCreatePartitionRequest.class)))
+        .thenReturn(mockResponse);
+
+    // Execute the method
+    gluePartitionSyncOperations.addPartitionsToTable(
+        TEST_CATALOG_TABLE_IDENTIFIER,
+        Collections.singletonList(
+            new org.apache.xtable.catalog.CatalogPartition(
+                Collections.singletonList("value1"), "location1")));
+
+    // Verify that batchCreatePartition was called
+    ArgumentCaptor<BatchCreatePartitionRequest> 
createPartitionRequestArgumentCaptor =
+        ArgumentCaptor.forClass(BatchCreatePartitionRequest.class);
+    verify(mockGlueClient, times(1))
+        .batchCreatePartition(createPartitionRequestArgumentCaptor.capture());
+
+    BatchCreatePartitionRequest request = 
createPartitionRequestArgumentCaptor.getValue();
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
request.databaseName());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
request.tableName());
+    assertEquals(1, request.partitionInputList().size());
+    assertEquals("location1", 
request.partitionInputList().get(0).storageDescriptor().location());
+    assertEquals(Collections.singletonList("value1"), 
request.partitionInputList().get(0).values());
+  }
+
+  @Test
+  void testAddPartitionsToTableHandlesException() {
+    // Mock getTable to return a valid table
+    Table mockTable = mock(Table.class);
+    StorageDescriptor mockSd = mock(StorageDescriptor.class);
+    when(mockTable.storageDescriptor()).thenReturn(mockSd);
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+    when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+    // Mock glueClient to throw an exception when batchCreatePartition is 
called
+    
when(mockGlueClient.batchCreatePartition(any(BatchCreatePartitionRequest.class)))
+        .thenThrow(new RuntimeException("Test exception"));
+
+    // Test the exception handling
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                gluePartitionSyncOperations.addPartitionsToTable(
+                    TEST_CATALOG_TABLE_IDENTIFIER,
+                    Collections.singletonList(
+                        new org.apache.xtable.catalog.CatalogPartition(
+                            Collections.singletonList("value1"), 
"location1"))));
+
+    // Verify that the exception was caused by a RuntimeException
+    assertInstanceOf(RuntimeException.class, exception.getCause());
+
+    // Verify glueClient.batchCreatePartition was called
+    verify(mockGlueClient, 
times(1)).batchCreatePartition(any(BatchCreatePartitionRequest.class));
+  }
+
+  @Test
+  void testUpdatePartitionsToTableSuccess() {
+    // Mock getTable to return a valid table
+    Table mockTable =
+        Table.builder()
+            .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+            .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .build();
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+    when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+    // Prepare test partition to be updated
+    org.apache.xtable.catalog.CatalogPartition partitionToUpdate =
+        new org.apache.xtable.catalog.CatalogPartition(
+            Collections.singletonList("value1"), "new_location");
+
+    // Mock glueClient to return a valid response for batchUpdatePartition
+    BatchUpdatePartitionResponse mockResponse = 
mock(BatchUpdatePartitionResponse.class);
+    
when(mockGlueClient.batchUpdatePartition(any(BatchUpdatePartitionRequest.class)))
+        .thenReturn(mockResponse);
+
+    // Execute the method
+    gluePartitionSyncOperations.updatePartitionsToTable(
+        TEST_CATALOG_TABLE_IDENTIFIER, 
Collections.singletonList(partitionToUpdate));
+
+    // Verify that batchUpdatePartition was called
+    ArgumentCaptor<BatchUpdatePartitionRequest> 
updatePartitionRequestArgumentCaptor =
+        ArgumentCaptor.forClass(BatchUpdatePartitionRequest.class);
+    verify(mockGlueClient, times(1))
+        .batchUpdatePartition(updatePartitionRequestArgumentCaptor.capture());
+
+    // Capture the request
+    BatchUpdatePartitionRequest request = 
updatePartitionRequestArgumentCaptor.getValue();
+
+    // Validate the request's properties
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
request.databaseName());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
request.tableName());
+    assertEquals(1, request.entries().size());
+    assertEquals(
+        "new_location", 
request.entries().get(0).partitionInput().storageDescriptor().location());
+    assertEquals(
+        Collections.singletonList("value1"), 
request.entries().get(0).partitionInput().values());
+  }
+
+  @Test
+  void testUpdatePartitionsToTableHandlesException() {
+    // Mock getTable to return a valid table
+    Table mockTable =
+        Table.builder()
+            .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+            .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .build();
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+    when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+    // Mock glueClient to throw an exception
+    
when(mockGlueClient.batchUpdatePartition(any(BatchUpdatePartitionRequest.class)))
+        .thenThrow(new RuntimeException("Test exception"));
+
+    // Execute and validate exception
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                gluePartitionSyncOperations.updatePartitionsToTable(
+                    TEST_CATALOG_TABLE_IDENTIFIER,
+                    Collections.singletonList(
+                        new org.apache.xtable.catalog.CatalogPartition(
+                            Collections.singletonList("value1"), 
"new_location"))));
+
+    // Verify the exception cause
+    assertInstanceOf(RuntimeException.class, exception.getCause());
+
+    // Verify glueClient.batchUpdatePartition was called
+    verify(mockGlueClient, 
times(1)).batchUpdatePartition(any(BatchUpdatePartitionRequest.class));
+  }
+
+  @Test
+  void testDropPartitionsSuccess() {
+    // Prepare a partition to drop
+    org.apache.xtable.catalog.CatalogPartition partitionToDrop =
+        new org.apache.xtable.catalog.CatalogPartition(
+            Collections.singletonList("value1"), "location1");
+
+    // Mock glueClient to return a valid response for batchDeletePartition
+    BatchDeletePartitionResponse mockResponse = 
mock(BatchDeletePartitionResponse.class);
+    
when(mockGlueClient.batchDeletePartition(any(BatchDeletePartitionRequest.class)))
+        .thenReturn(mockResponse);
+    when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+    gluePartitionSyncOperations.dropPartitions(
+        TEST_CATALOG_TABLE_IDENTIFIER, 
Collections.singletonList(partitionToDrop));
+
+    // Verify that batchDeletePartition was called
+    ArgumentCaptor<BatchDeletePartitionRequest> 
deletePartitionRequestArgumentCaptor =
+        ArgumentCaptor.forClass(BatchDeletePartitionRequest.class);
+    verify(mockGlueClient, times(1))
+        .batchDeletePartition(deletePartitionRequestArgumentCaptor.capture());
+
+    // Capture the request
+    BatchDeletePartitionRequest request = 
deletePartitionRequestArgumentCaptor.getValue();
+
+    // Validate the request's properties
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
request.databaseName());
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
request.tableName());
+    assertEquals(1, request.partitionsToDelete().size());
+    assertEquals(Collections.singletonList("value1"), 
request.partitionsToDelete().get(0).values());
+  }
+
+  @Test
+  void testDropPartitionsHandlesException() {
+    // Mock glueClient to throw an exception during batchDeletePartition
+    
when(mockGlueClient.batchDeletePartition(any(BatchDeletePartitionRequest.class)))
+        .thenThrow(new RuntimeException("Test exception"));
+    when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                gluePartitionSyncOperations.dropPartitions(
+                    TEST_CATALOG_TABLE_IDENTIFIER,
+                    Collections.singletonList(
+                        new org.apache.xtable.catalog.CatalogPartition(
+                            Collections.singletonList("value1"), 
"location1"))));
+
+    // Verify the exception cause
+    assertInstanceOf(RuntimeException.class, exception.getCause());
+
+    // Verify glueClient.batchDeletePartition was called
+    verify(mockGlueClient, 
times(1)).batchDeletePartition(any(BatchDeletePartitionRequest.class));
+  }
+
+  @Test
+  void testDropPartitionsNoPartitionsToDrop() {
+    // Execute the method with an empty partition list
+    gluePartitionSyncOperations.dropPartitions(
+        TEST_CATALOG_TABLE_IDENTIFIER, Collections.emptyList());
+
+    // Verify that batchDeletePartition was not called
+    verify(mockGlueClient, 
times(0)).batchDeletePartition(any(BatchDeletePartitionRequest.class));
+  }
+
+  @Test
+  void testGetLastTimeSyncedPropertiesSuccess() {
+    // Mock table and parameters
+    Map<String, String> mockParameters = new HashMap<>();
+    mockParameters.put("last_synced_time", "2023-12-01T12:00:00Z");
+    mockParameters.put("last_modified_by", "user123");
+    Table mockTable =
+        Table.builder()
+            .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+            .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .parameters(mockParameters)
+            .build();
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+
+    // List of keys to look for
+    List<String> lastSyncedKeys = Arrays.asList("last_synced_time", 
"last_modified_by");
+
+    // Execute the method
+    Map<String, String> result =
+        gluePartitionSyncOperations.getTableProperties(
+            TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys);
+
+    // Assert the result
+    assertEquals(2, result.size());
+    assertEquals("2023-12-01T12:00:00Z", result.get("last_synced_time"));
+    assertEquals("user123", result.get("last_modified_by"));
+  }
+
+  @Test
+  void testGetLastTimeSyncedPropertiesKeyNotFound() {
+    // Mock table and parameters
+    Map<String, String> mockParameters =
+        Collections.singletonMap("last_synced_time", "2023-12-01T12:00:00Z");
+    Table mockTable =
+        Table.builder()
+            .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+            .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .parameters(mockParameters)
+            .build();
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+
+    // List of keys to look for (including a non-existent key)
+    List<String> lastSyncedKeys = Arrays.asList("last_synced_time", 
"non_existent_key");
+
+    // Execute the method
+    Map<String, String> result =
+        gluePartitionSyncOperations.getTableProperties(
+            TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys);
+
+    // Assert the result
+    assertEquals(1, result.size());
+    assertEquals("2023-12-01T12:00:00Z", result.get("last_synced_time"));
+  }
+
+  @Test
+  void testGetLastTimeSyncedPropertiesHandlesException() {
+    // Mock getTable to throw an exception
+    when(mockGlueClient.getTable(any(GetTableRequest.class)))
+        .thenThrow(new RuntimeException("Test exception"));
+
+    // Execute and validate exception
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                gluePartitionSyncOperations.getTableProperties(
+                    TEST_CATALOG_TABLE_IDENTIFIER, 
Collections.singletonList("last_synced_time")));
+
+    // Verify the exception cause
+    assertInstanceOf(RuntimeException.class, exception.getCause());
+
+    // Verify that getTable was called
+    verify(mockGlueClient, times(1)).getTable(any(GetTableRequest.class));
+  }
+
+  @Test
+  void testUpdateLastTimeSyncedPropertiesSuccess() {
+    // Mock table and parameters
+    Map<String, String> mockParameters = new HashMap<>();
+    mockParameters.put("last_synced_time", "2023-12-01T12:00:00Z");
+    mockParameters.put("last_modified_by", "user123");
+    Table mockTable =
+        Table.builder()
+            .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+            .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+            .storageDescriptor(StorageDescriptor.builder().build())
+            .parameters(mockParameters)
+            .build();
+    GetTableResponse tableResponse = 
GetTableResponse.builder().table(mockTable).build();
+    
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+
+    // Mock the updateTable call to verify it's triggered
+    when(mockGlueClient.updateTable(any(UpdateTableRequest.class)))
+        .thenReturn(UpdateTableResponse.builder().build());
+
+    // New properties to update
+    Map<String, String> newProperties = new HashMap<>();
+    newProperties.put("last_synced_time", "2023-12-02T14:00:00Z");
+    newProperties.put("last_modified_by", "user456");
+
+    // Execute the method
+    
gluePartitionSyncOperations.updateTableProperties(TEST_CATALOG_TABLE_IDENTIFIER,
 newProperties);
+
+    // Capture the UpdateTableRequest sent to the GlueClient
+    ArgumentCaptor<UpdateTableRequest> captor = 
ArgumentCaptor.forClass(UpdateTableRequest.class);
+    verify(mockGlueClient, times(1)).updateTable(captor.capture());
+    UpdateTableRequest capturedRequest = captor.getValue();
+
+    // Assert that the table update request contains the new parameters
+    assertEquals(
+        "2023-12-02T14:00:00Z", 
capturedRequest.tableInput().parameters().get("last_synced_time"));
+    assertEquals("user456", 
capturedRequest.tableInput().parameters().get("last_modified_by"));
+  }
+
+  @Test
+  void testUpdateLastTimeSyncedPropertiesHandlesException() {
+    // Setup mocks to throw an exception
+    when(mockGlueClient.getTable(any(GetTableRequest.class)))
+        .thenThrow(new RuntimeException("Test exception"));
+
+    // New properties to update
+    Map<String, String> newProperties = new HashMap<>();
+    newProperties.put("last_synced_time", "2023-12-02T14:00:00Z");
+
+    // Execute and validate exception
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                gluePartitionSyncOperations.updateTableProperties(
+                    TEST_CATALOG_TABLE_IDENTIFIER, newProperties));
+
+    // Assert that the exception has the right cause
+    assertInstanceOf(RuntimeException.class, exception.getCause());
+    verify(mockGlueClient, times(1)).getTable(any(GetTableRequest.class));
+  }
+
+  @Test
+  void testUpdateLastTimeSyncedPropertiesEmptyProperties() {
+    // Execute the method with empty properties (nothing to update)
+    gluePartitionSyncOperations.updateTableProperties(
+        TEST_CATALOG_TABLE_IDENTIFIER, new HashMap<>());
+
+    // Verify that updateTable was never called
+    verify(mockGlueClient, 
times(0)).updateTable(any(UpdateTableRequest.class));
+  }
+}
diff --git 
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
 
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
index 0ec0d04b..2fa5a53c 100644
--- 
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
+++ 
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.never;
@@ -34,6 +35,7 @@ import static org.mockito.Mockito.when;
 
 import java.time.ZonedDateTime;
 import java.util.Collections;
+import java.util.Optional;
 import java.util.ServiceLoader;
 
 import org.junit.jupiter.api.Test;
@@ -44,8 +46,10 @@ import org.mockito.Mock;
 import org.mockito.MockedStatic;
 import org.mockito.junit.jupiter.MockitoExtension;
 
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
 import org.apache.xtable.catalog.CatalogTableBuilder;
 import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool;
 import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
 import org.apache.xtable.model.storage.CatalogType;
 import org.apache.xtable.spi.sync.CatalogSyncClient;
@@ -71,15 +75,22 @@ import 
software.amazon.awssdk.services.glue.model.UpdateTableResponse;
 public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase {
 
   @Mock private CatalogTableBuilder<TableInput, Table> mockTableBuilder;
+  @Mock private HudiCatalogPartitionSyncTool mockPartitionSyncTool;
   private GlueCatalogSyncClient glueCatalogSyncClient;
 
-  private GlueCatalogSyncClient createGlueCatalogSyncClient() {
+  private GlueCatalogSyncClient createGlueCatalogSyncClient(boolean 
includePartitionSyncTool) {
+    Optional<CatalogPartitionSyncTool> partitionSyncToolOpt =
+        includePartitionSyncTool ? Optional.of(mockPartitionSyncTool) : 
Optional.empty();
     return new GlueCatalogSyncClient(
-        catalogConfig, testConfiguration, mockGlueCatalogConfig, 
mockGlueClient, mockTableBuilder);
+        catalogConfig,
+        testConfiguration,
+        mockGlueCatalogConfig,
+        mockGlueClient,
+        mockTableBuilder,
+        partitionSyncToolOpt);
   }
 
   void setupCommonMocks() {
-    glueCatalogSyncClient = createGlueCatalogSyncClient();
     
when(mockGlueCatalogConfig.getCatalogId()).thenReturn(TEST_GLUE_CATALOG_ID);
   }
 
@@ -87,6 +98,7 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
   @ValueSource(booleans = {true, false})
   void testHasDatabase(boolean isDbPresent) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     GetDatabaseRequest dbRequest = 
getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName());
     GetDatabaseResponse dbResponse =
         GetDatabaseResponse.builder()
@@ -111,6 +123,7 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
   @Test
   void testHasDatabaseFailure() {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     GetDatabaseRequest dbRequest = 
getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName());
     when(mockGlueClient.getDatabase(dbRequest)).thenThrow(TEST_GLUE_EXCEPTION);
     CatalogSyncException exception =
@@ -126,6 +139,7 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
   @ValueSource(booleans = {true, false})
   void testGetTable(boolean isTablePresent) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     GetTableRequest tableRequest =
         getTableRequest(
             TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
@@ -158,6 +172,7 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
   @Test
   void testGetTableFailure() {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     GetTableRequest tableRequest =
         getTableRequest(
             TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
@@ -177,6 +192,7 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
   @ValueSource(booleans = {false, true})
   void testCreateDatabase(boolean shouldFail) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     CreateDatabaseRequest dbRequest =
         createDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName());
     if (shouldFail) {
@@ -200,6 +216,7 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
   @ValueSource(booleans = {false, true})
   void testDropTable(boolean shouldFail) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     DeleteTableRequest deleteRequest =
         deleteTableRequest(
             TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
@@ -223,9 +240,11 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
     verify(mockGlueClient, times(1)).deleteTable(deleteRequest);
   }
 
-  @Test
-  void testCreateTable_Success() {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateTable_Success(boolean syncPartitions) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
     CreateTableRequest createTableRequest =
         createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
TEST_TABLE_INPUT);
     when(mockTableBuilder.getCreateTableRequest(
@@ -237,12 +256,19 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
     verify(mockGlueClient, times(1)).createTable(createTableRequest);
     verify(mockTableBuilder, times(1))
         .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    if (syncPartitions) {
+      verify(mockPartitionSyncTool, times(1))
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    } else {
+      verify(mockPartitionSyncTool, never())
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    }
   }
 
-  @Test
-  void testCreateTable_ErrorGettingTableInput() {
-    glueCatalogSyncClient = createGlueCatalogSyncClient();
-
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateTable_ErrorGettingTableInput(boolean syncPartitions) {
+    glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
     // error when getting iceberg table input
     doThrow(new RuntimeException("something went wrong"))
         .when(mockTableBuilder)
@@ -255,12 +281,15 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
     verify(mockTableBuilder, times(1))
         .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockGlueClient, never()).createTable(any(CreateTableRequest.class));
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
-  @Test
-  void testCreateTable_ErrorCreatingTable() {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testCreateTable_ErrorCreatingTable(boolean syncPartitions) {
     setupCommonMocks();
-
+    glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
     // error when creating table
     CreateTableRequest createTableRequest =
         createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
TEST_TABLE_INPUT);
@@ -280,11 +309,15 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
     verify(mockTableBuilder, times(1))
         .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockGlueClient, times(1)).createTable(createTableRequest);
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
-  @Test
-  void testRefreshTable_Success() {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testRefreshTable_Success(boolean syncPartitions) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
     UpdateTableRequest updateTableRequest =
         updateTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
TEST_TABLE_INPUT);
     Table glueTable = 
Table.builder().parameters(Collections.emptyMap()).build();
@@ -299,11 +332,19 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
     verify(mockTableBuilder, times(1))
         .getUpdateTableRequest(
             TEST_ICEBERG_INTERNAL_TABLE, glueTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    if (syncPartitions) {
+      verify(mockPartitionSyncTool, times(1))
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    } else {
+      verify(mockPartitionSyncTool, never())
+          .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+    }
   }
 
-  @Test
-  void testRefreshTable_ErrorCreatingTableInput() {
-    glueCatalogSyncClient = createGlueCatalogSyncClient();
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testRefreshTable_ErrorCreatingTableInput(boolean syncPartitions) {
+    glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
     Table glueTable = 
Table.builder().parameters(Collections.emptyMap()).build();
 
     // error while refreshing table
@@ -320,11 +361,15 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
         .getUpdateTableRequest(
             TEST_ICEBERG_INTERNAL_TABLE, glueTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockGlueClient, never()).updateTable(any(UpdateTableRequest.class));
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
-  @Test
-  void testRefreshTable_ErrorRefreshingTable() {
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testRefreshTable_ErrorRefreshingTable(boolean syncPartitions) {
     setupCommonMocks();
+    glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
     Table glueTable = 
Table.builder().parameters(Collections.emptyMap()).build();
 
     UpdateTableRequest updateTableRequest =
@@ -348,12 +393,14 @@ public class TestGlueCatalogSyncClient extends 
GlueCatalogSyncTestBase {
         .getUpdateTableRequest(
             TEST_ICEBERG_INTERNAL_TABLE, glueTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
     verify(mockGlueClient, times(1)).updateTable(updateTableRequest);
+    verify(mockPartitionSyncTool, never())
+        .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), 
eq(TEST_CATALOG_TABLE_IDENTIFIER));
   }
 
   @Test
   void testCreateOrReplaceTable() {
     setupCommonMocks();
-
+    glueCatalogSyncClient = createGlueCatalogSyncClient(false);
     ZonedDateTime fixedDateTime = 
ZonedDateTime.parse("2024-10-25T10:15:30.00Z");
     try (MockedStatic<ZonedDateTime> mockZonedDateTime = 
mockStatic(ZonedDateTime.class)) {
       mockZonedDateTime.when(ZonedDateTime::now).thenReturn(fixedDateTime);
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
 
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java
similarity index 51%
copy from 
xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
copy to 
xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java
index adf88c99..daeb3df9 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
+++ 
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java
@@ -16,11 +16,9 @@
  * limitations under the License.
  */
  
-package org.apache.xtable.hms.table;
+package org.apache.xtable.glue.table;
 
-import static 
org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -30,10 +28,8 @@ import static org.mockito.Mockito.when;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -41,33 +37,36 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 
-import org.apache.xtable.hms.HMSCatalogSyncTestBase;
-import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.glue.GlueCatalogSyncTestBase;
+import org.apache.xtable.glue.GlueSchemaExtractor;
 import org.apache.xtable.hudi.HudiTableManager;
 import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
 
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
 @ExtendWith(MockitoExtension.class)
-public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
+public class TestHudiGlueCatalogTableBuilder extends GlueCatalogSyncTestBase {
 
   @Mock private HoodieTableMetaClient mockMetaClient;
   @Mock private HudiTableManager mockHudiTableManager;
   @Mock private HoodieTableConfig mockTableConfig;
   @Mock private HudiCatalogTablePropertiesExtractor 
mockTablePropertiesExtractor;
 
-  private HudiHMSCatalogTableBuilder hudiHMSCatalogTableBuilder;
+  private HudiGlueCatalogTableBuilder hudiGlueCatalogTableBuilder;
 
-  private HudiHMSCatalogTableBuilder 
createMockHudiHMSCatalogSyncRequestProvider() {
-    return new HudiHMSCatalogTableBuilder(
-        mockHMSCatalogConfig,
-        HMSSchemaExtractor.getInstance(),
+  private HudiGlueCatalogTableBuilder 
createMockHudiGlueCatalogSyncRequestProvider() {
+    return new HudiGlueCatalogTableBuilder(
+        mockGlueCatalogConfig,
+        GlueSchemaExtractor.getInstance(),
         mockHudiTableManager,
         mockMetaClient,
         mockTablePropertiesExtractor);
   }
 
   void setupCommonMocks() {
-    hudiHMSCatalogTableBuilder = createMockHudiHMSCatalogSyncRequestProvider();
-    when(mockHMSCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000);
+    hudiGlueCatalogTableBuilder = 
createMockHudiGlueCatalogSyncRequestProvider();
+    when(mockGlueCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000);
   }
 
   void setupMetaClientMocks() {
@@ -80,31 +79,16 @@ public class TestHudiHMSCatalogTableBuilder extends 
HMSCatalogSyncTestBase {
     setupCommonMocks();
     setupMetaClientMocks();
 
-    List<String> partitionFields =
-        TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
-            .map(partitionField -> partitionField.getSourceField().getName())
-            .collect(Collectors.toList());
-
-    Table table =
-        hudiHMSCatalogTableBuilder.getCreateTableRequest(
+    TableInput table =
+        hudiGlueCatalogTableBuilder.getCreateTableRequest(
             TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
 
-    ArgumentCaptor<List<String>> partitionsCaptor = 
ArgumentCaptor.forClass(List.class);
     verify(mockTablePropertiesExtractor)
-        .getSparkTableProperties(
-            partitionsCaptor.capture(),
-            eq(""),
-            any(Integer.class),
-            eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
-    assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
-    assertEquals(partitionFields, partitionsCaptor.getValue());
-    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
table.getTableName());
-    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
table.getDbName());
-    assertEquals(3, table.getSd().getCols().size());
-    assertEquals(1, table.getPartitionKeys().size());
-    assertNotNull(table.getParameters());
-    assertFalse(table.getParameters().isEmpty());
-    assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+        .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.name());
+    assertEquals(2, table.storageDescriptor().columns().size());
+    assertEquals(1, table.partitionKeys().size());
+    assertNotNull(table.parameters());
   }
 
   @Test
@@ -116,28 +100,26 @@ public class TestHudiHMSCatalogTableBuilder extends 
HMSCatalogSyncTestBase {
         TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
             .map(partitionField -> partitionField.getSourceField().getName())
             .collect(Collectors.toList());
-    Table table =
-        hudiHMSCatalogTableBuilder.getCreateTableRequest(
+
+    TableInput tableInput =
+        hudiGlueCatalogTableBuilder.getCreateTableRequest(
             TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
-    Table updatedTable =
-        hudiHMSCatalogTableBuilder.getUpdateTableRequest(
+    Table table =
+        Table.builder()
+            .name(tableInput.name())
+            .parameters(tableInput.parameters())
+            .storageDescriptor(tableInput.storageDescriptor())
+            .partitionKeys(tableInput.partitionKeys())
+            .build();
+    TableInput updatedTable =
+        hudiGlueCatalogTableBuilder.getUpdateTableRequest(
             TEST_EVOLVED_HUDI_INTERNAL_TABLE, table, 
TEST_CATALOG_TABLE_IDENTIFIER);
 
-    ArgumentCaptor<List<String>> partitionsCaptor = 
ArgumentCaptor.forClass(List.class);
     verify(mockTablePropertiesExtractor)
-        .getSparkTableProperties(
-            partitionsCaptor.capture(),
-            eq(""),
-            any(Integer.class),
-            eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
-    assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
-    assertEquals(partitionFields, partitionsCaptor.getValue());
-    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
updatedTable.getTableName());
-    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
updatedTable.getDbName());
-    assertEquals(4, updatedTable.getSd().getCols().size());
-    assertEquals(1, updatedTable.getPartitionKeys().size());
-    assertNotNull(updatedTable.getParameters());
-    assertFalse(table.getParameters().isEmpty());
-    assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+        .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
+    assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
updatedTable.name());
+    assertEquals(3, updatedTable.storageDescriptor().columns().size());
+    assertEquals(1, updatedTable.partitionKeys().size());
+    assertNotNull(updatedTable.parameters());
   }
 }
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
index ff26e600..07625e44 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
@@ -22,14 +22,17 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 
 import org.apache.spark.sql.types.StructType;
 
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.StringUtils;
 
+import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.schema.InternalField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
@@ -41,17 +44,28 @@ public class HudiCatalogTablePropertiesExtractor {
 
   private static final HudiCatalogTablePropertiesExtractor INSTANCE =
       new HudiCatalogTablePropertiesExtractor();
+  protected static final String HUDI_METADATA_CONFIG = 
"hudi.metadata-listing-enabled";
 
   public static HudiCatalogTablePropertiesExtractor getInstance() {
     return INSTANCE;
   }
-  /**
-   * Get Spark Sql related table properties. This is used for spark datasource 
table.
-   *
-   * @param schema The schema to write to the table.
-   * @return A new parameters added the spark's table properties.
-   */
-  public Map<String, String> getSparkTableProperties(
+
+  /** Get Hudi table properties that needs to be synced with catalog table */
+  public Map<String, String> getTableProperties(InternalTable table, int 
schemaLengthThreshold) {
+    Map<String, String> tableProperties = new HashMap<>();
+    List<String> partitionFields =
+        table.getPartitioningFields().stream()
+            .map(field -> field.getSourceField().getName())
+            .collect(Collectors.toList());
+    tableProperties.put(HUDI_METADATA_CONFIG, "true");
+    Map<String, String> sparkTableProperties =
+        getSparkTableProperties(partitionFields, "", schemaLengthThreshold, 
table.getReadSchema());
+    tableProperties.putAll(sparkTableProperties);
+    return tableProperties;
+  }
+
+  /** Get Spark Sql related table properties. This is used for spark 
datasource table. */
+  private Map<String, String> getSparkTableProperties(
       List<String> partitionNames,
       String sparkVersion,
       int schemaLengthThreshold,
@@ -123,4 +137,12 @@ public class HudiCatalogTablePropertiesExtractor {
     }
     return sparkProperties;
   }
+
+  /** Get Hudi serde properties that needs to be synced with catalog table */
+  public Map<String, String> getSerdeProperties(String basePath) {
+    Map<String, String> serdeProperties = new HashMap<>();
+    serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
+    serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, 
String.valueOf(false));
+    return serdeProperties;
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
index e08674bf..202e84e2 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
@@ -18,27 +18,39 @@
  
 package org.apache.xtable.hudi.catalog;
 
+import static 
org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor.HUDI_METADATA_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 import org.junit.jupiter.api.Test;
 
+import org.apache.hudi.common.util.ConfigUtils;
+
+import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
 import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.TableFormat;
 
 public class TestHudiCatalogTablePropertiesExtractor {
 
   @Test
   void testGetSparkTableProperties() {
 
-    List<String> partitionNames = Arrays.asList("region", "category");
-    String sparkVersion = "3.2.1";
+    InternalPartitionField p1 =
+        InternalPartitionField.builder()
+            .sourceField(InternalField.builder().name("region").build())
+            .build();
+    InternalPartitionField p2 =
+        InternalPartitionField.builder()
+            .sourceField(InternalField.builder().name("category").build())
+            .build();
     int schemaLengthThreshold = 1000;
     InternalSchema schema =
         InternalSchema.builder()
@@ -80,13 +92,20 @@ public class TestHudiCatalogTablePropertiesExtractor {
             .name("testSchema")
             .build();
 
+    InternalTable table =
+        InternalTable.builder()
+            .name("test-table")
+            .tableFormat(TableFormat.HUDI)
+            .readSchema(schema)
+            .partitioningFields(Arrays.asList(p1, p2))
+            .build();
+
     Map<String, String> result =
         HudiCatalogTablePropertiesExtractor.getInstance()
-            .getSparkTableProperties(partitionNames, sparkVersion, 
schemaLengthThreshold, schema);
+            .getTableProperties(table, schemaLengthThreshold);
 
     // Validate results
     assertEquals("hudi", result.get("spark.sql.sources.provider"));
-    assertEquals("3.2.1", result.get("spark.sql.create.version"));
     assertEquals("1", result.get("spark.sql.sources.schema.numParts"));
     assertEquals(
         
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"region\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}",
@@ -94,12 +113,12 @@ public class TestHudiCatalogTablePropertiesExtractor {
     assertEquals("2", result.get("spark.sql.sources.schema.numPartCols"));
     assertEquals("region", result.get("spark.sql.sources.schema.partCol.0"));
     assertEquals("category", result.get("spark.sql.sources.schema.partCol.1"));
+    assertEquals("true", result.get(HUDI_METADATA_CONFIG));
   }
 
   @Test
   void testGetSparkTablePropertiesEmptyPartitions() {
     // Setup input data with no partitions
-    List<String> partitionNames = Collections.emptyList();
     int schemaLengthThreshold = 50;
     InternalSchema schema =
         InternalSchema.builder()
@@ -125,10 +144,17 @@ public class TestHudiCatalogTablePropertiesExtractor {
             .name("testSchema")
             .build();
 
+    InternalTable table =
+        InternalTable.builder()
+            .name("test-table")
+            .tableFormat(TableFormat.HUDI)
+            .readSchema(schema)
+            .partitioningFields(Collections.emptyList())
+            .build();
     // Call the method
     Map<String, String> result =
         HudiCatalogTablePropertiesExtractor.getInstance()
-            .getSparkTableProperties(partitionNames, "", 
schemaLengthThreshold, schema);
+            .getTableProperties(table, schemaLengthThreshold);
 
     assertEquals("hudi", result.get("spark.sql.sources.provider"));
     assertNull(result.get("spark.sql.create.version"));
@@ -140,5 +166,17 @@ public class TestHudiCatalogTablePropertiesExtractor {
             + result.get("spark.sql.sources.schema.part.2")
             + result.get("spark.sql.sources.schema.part.3"));
     assertNull(result.get("spark.sql.sources.schema.numPartCols"));
+    assertEquals("true", result.get(HUDI_METADATA_CONFIG));
+  }
+
+  @Test
+  void testGetSerdeProperties() {
+    String basePath = "/test/base/path";
+    Map<String, String> serdeProperties =
+        
HudiCatalogTablePropertiesExtractor.getInstance().getSerdeProperties(basePath);
+
+    assertNotNull(serdeProperties);
+    assertEquals(basePath, serdeProperties.get(ConfigUtils.TABLE_SERDE_PATH));
+    assertEquals("false", 
serdeProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE));
   }
 }
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
index cc4dc3df..b8b1ea49 100644
--- 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
@@ -22,7 +22,6 @@ import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifi
 
 import java.io.IOException;
 import java.time.Instant;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -39,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.ConfigUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -54,7 +52,6 @@ import org.apache.xtable.model.InternalTable;
 import org.apache.xtable.model.catalog.CatalogTableIdentifier;
 import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
 import org.apache.xtable.model.schema.InternalPartitionField;
-import org.apache.xtable.model.schema.InternalSchema;
 import org.apache.xtable.model.storage.TableFormat;
 
 @Log4j2
@@ -122,7 +119,9 @@ public class HudiHMSCatalogTableBuilder implements 
CatalogTableBuilder<Table, Ta
     }
 
     newTb.setCreateTime((int) Instant.now().toEpochMilli());
-    Map<String, String> tableProperties = getTableProperties(table, 
table.getReadSchema());
+    Map<String, String> tableProperties =
+        tablePropertiesExtractor.getTableProperties(
+            table, hmsCatalogConfig.getSchemaLengthThreshold());
     newTb.setParameters(tableProperties);
     newTb.setSd(getStorageDescriptor(table));
     newTb.setPartitionKeys(getSchemaPartitionKeys(table));
@@ -134,7 +133,9 @@ public class HudiHMSCatalogTableBuilder implements 
CatalogTableBuilder<Table, Ta
       InternalTable table, Table hmsTable, CatalogTableIdentifier 
tableIdentifier) {
     Map<String, String> parameters = hmsTable.getParameters();
     Map<String, String> tableParameters = hmsTable.getParameters();
-    tableParameters.putAll(getTableProperties(table, table.getReadSchema()));
+    tableParameters.putAll(
+        tablePropertiesExtractor.getTableProperties(
+            table, hmsCatalogConfig.getSchemaLengthThreshold()));
     hmsTable.setParameters(tableParameters);
     hmsTable.setSd(getStorageDescriptor(table));
 
@@ -143,20 +144,6 @@ public class HudiHMSCatalogTableBuilder implements 
CatalogTableBuilder<Table, Ta
     return hmsTable;
   }
 
-  private Map<String, String> getTableProperties(InternalTable table, 
InternalSchema schema) {
-    List<String> partitionFields =
-        table.getPartitioningFields().stream()
-            .map(field -> field.getSourceField().getName())
-            .collect(Collectors.toList());
-    Map<String, String> tableProperties = new HashMap<>();
-    tableProperties.put(HUDI_METADATA_CONFIG, "true");
-    Map<String, String> sparkTableProperties =
-        tablePropertiesExtractor.getSparkTableProperties(
-            partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(), 
schema);
-    tableProperties.putAll(sparkTableProperties);
-    return tableProperties;
-  }
-
   private StorageDescriptor getStorageDescriptor(InternalTable table) {
     final StorageDescriptor storageDescriptor = new StorageDescriptor();
     storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI, 
table.getReadSchema()));
@@ -168,7 +155,8 @@ public class HudiHMSCatalogTableBuilder implements 
CatalogTableBuilder<Table, Ta
     String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat);
     storageDescriptor.setInputFormat(inputFormatClassName);
     storageDescriptor.setOutputFormat(outputFormatClassName);
-    Map<String, String> serdeProperties = 
getSerdeProperties(table.getBasePath());
+    Map<String, String> serdeProperties =
+        tablePropertiesExtractor.getSerdeProperties(table.getBasePath());
     SerDeInfo serDeInfo = new SerDeInfo();
     serDeInfo.setSerializationLib(serdeClassName);
     serDeInfo.setParameters(serdeProperties);
@@ -176,13 +164,6 @@ public class HudiHMSCatalogTableBuilder implements 
CatalogTableBuilder<Table, Ta
     return storageDescriptor;
   }
 
-  private static Map<String, String> getSerdeProperties(String basePath) {
-    Map<String, String> serdeProperties = new HashMap<>();
-    serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
-    serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, 
String.valueOf(false));
-    return serdeProperties;
-  }
-
   private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) {
 
     List<InternalPartitionField> partitioningFields = 
table.getPartitioningFields();
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
index adf88c99..4df2ab85 100644
--- 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
@@ -18,22 +18,15 @@
  
 package org.apache.xtable.hms.table;
 
-import static 
org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.List;
-import java.util.stream.Collectors;
-
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
 
@@ -80,31 +73,16 @@ public class TestHudiHMSCatalogTableBuilder extends 
HMSCatalogSyncTestBase {
     setupCommonMocks();
     setupMetaClientMocks();
 
-    List<String> partitionFields =
-        TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
-            .map(partitionField -> partitionField.getSourceField().getName())
-            .collect(Collectors.toList());
-
     Table table =
         hudiHMSCatalogTableBuilder.getCreateTableRequest(
             TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
 
-    ArgumentCaptor<List<String>> partitionsCaptor = 
ArgumentCaptor.forClass(List.class);
     verify(mockTablePropertiesExtractor)
-        .getSparkTableProperties(
-            partitionsCaptor.capture(),
-            eq(""),
-            any(Integer.class),
-            eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
-    assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
-    assertEquals(partitionFields, partitionsCaptor.getValue());
+        .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
     assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
table.getTableName());
     assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
table.getDbName());
     assertEquals(3, table.getSd().getCols().size());
     assertEquals(1, table.getPartitionKeys().size());
-    assertNotNull(table.getParameters());
-    assertFalse(table.getParameters().isEmpty());
-    assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
   }
 
   @Test
@@ -112,10 +90,6 @@ public class TestHudiHMSCatalogTableBuilder extends 
HMSCatalogSyncTestBase {
     setupCommonMocks();
     setupMetaClientMocks();
 
-    List<String> partitionFields =
-        TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
-            .map(partitionField -> partitionField.getSourceField().getName())
-            .collect(Collectors.toList());
     Table table =
         hudiHMSCatalogTableBuilder.getCreateTableRequest(
             TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
@@ -123,21 +97,11 @@ public class TestHudiHMSCatalogTableBuilder extends 
HMSCatalogSyncTestBase {
         hudiHMSCatalogTableBuilder.getUpdateTableRequest(
             TEST_EVOLVED_HUDI_INTERNAL_TABLE, table, 
TEST_CATALOG_TABLE_IDENTIFIER);
 
-    ArgumentCaptor<List<String>> partitionsCaptor = 
ArgumentCaptor.forClass(List.class);
     verify(mockTablePropertiesExtractor)
-        .getSparkTableProperties(
-            partitionsCaptor.capture(),
-            eq(""),
-            any(Integer.class),
-            eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
-    assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
-    assertEquals(partitionFields, partitionsCaptor.getValue());
+        .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
     assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), 
updatedTable.getTableName());
     assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), 
updatedTable.getDbName());
     assertEquals(4, updatedTable.getSd().getCols().size());
     assertEquals(1, updatedTable.getPartitionKeys().size());
-    assertNotNull(updatedTable.getParameters());
-    assertFalse(table.getParameters().isEmpty());
-    assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
   }
 }

Reply via email to