This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 30f3e544 [590] Add Iceberg HMS Catalog Sync implementation
30f3e544 is described below

commit 30f3e544158aad880e0ab094da15d303f90cd3d7
Author: Roushan Kumar <[email protected]>
AuthorDate: Wed Feb 5 00:05:42 2025 +0530

    [590] Add Iceberg HMS Catalog Sync implementation
---
 pom.xml                                            |   2 +
 .../apache/xtable/model/storage/CatalogType.java   |   1 +
 xtable-hive-metastore/pom.xml                      | 194 ++++++++
 .../org/apache/xtable/hms/HMSCatalogConfig.java    |  54 +++
 .../xtable/hms/HMSCatalogConversionSource.java     | 111 +++++
 .../apache/xtable/hms/HMSCatalogSyncClient.java    | 223 ++++++++++
 .../xtable/hms/HMSCatalogTableBuilderFactory.java  |  71 +++
 .../org/apache/xtable/hms/HMSClientProvider.java   |  65 +++
 .../org/apache/xtable/hms/HMSSchemaExtractor.java  | 143 ++++++
 .../hms/table/IcebergHMSCatalogTableBuilder.java   | 126 ++++++
 ...he.xtable.spi.extractor.CatalogConversionSource |  18 +
 .../org.apache.xtable.spi.sync.CatalogSyncClient   |  18 +
 .../xtable/hms/HMSCatalogSyncClientTestBase.java   |  98 ++++
 .../apache/xtable/hms/TestHMSCatalogConfig.java    |  64 +++
 .../xtable/hms/TestHMSCatalogConversionSource.java | 181 ++++++++
 .../xtable/hms/TestHMSCatalogSyncClient.java       | 389 ++++++++++++++++
 .../hms/TestHMSCatalogTableBuilderFactory.java     |  63 +++
 .../apache/xtable/hms/TestHMSSchemaExtractor.java  | 493 +++++++++++++++++++++
 .../table/TestIcebergHMSCatalogTableBuilder.java   | 197 ++++++++
 xtable-utilities/pom.xml                           |  20 +
 20 files changed, 2531 insertions(+)

diff --git a/pom.xml b/pom.xml
index 414133b0..3d2b44d9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,6 +52,7 @@
         <module>xtable-core</module>
         <module>xtable-utilities</module>
         <module>xtable-aws</module>
+        <module>xtable-hive-metastore</module>
     </modules>
 
     <properties>
@@ -65,6 +66,7 @@
         <hadoop.version>3.4.1</hadoop.version>
         <hudi.version>0.14.0</hudi.version>
         <aws.version>2.29.40</aws.version>
+        <hive.version>2.3.9</hive.version>
         <maven-source-plugin.version>3.3.1</maven-source-plugin.version>
         <maven-javadoc-plugin.version>3.8.0</maven-javadoc-plugin.version>
         <maven-gpg-plugin.version>3.2.4</maven-gpg-plugin.version>
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
index f9d4a77e..e4b778d6 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/CatalogType.java
@@ -26,4 +26,5 @@ package org.apache.xtable.model.storage;
 public class CatalogType {
   public static final String STORAGE = "STORAGE";
   public static final String GLUE = "GLUE";
+  public static final String HMS = "HMS";
 }
diff --git a/xtable-hive-metastore/pom.xml b/xtable-hive-metastore/pom.xml
new file mode 100644
index 00000000..c292c123
--- /dev/null
+++ b/xtable-hive-metastore/pom.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.xtable</groupId>
+        <artifactId>xtable</artifactId>
+        <version>0.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>xtable-hive-metastore</artifactId>
+    <name>XTable HMS</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.xtable</groupId>
+            <artifactId>xtable-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Hadoop dependencies -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Iceberg dependencies  -->
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-hive-runtime</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+
+        <!-- HMS dependencies -->
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-metastore</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>jdk.tools</groupId>
+                    <artifactId>jdk.tools</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.parquet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+            <classifier>core</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.pentaho</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.codehaus.janino</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Junit -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Mockito -->
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.xtable</groupId>
+            <artifactId>xtable-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-deploy-plugin</artifactId>
+                <version>${maven-deploy-plugin.version}</version>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
new file mode 100644
index 00000000..f6f9eabd
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import java.util.Map;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.ToString;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Getter
+@EqualsAndHashCode
+@ToString
+@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
+public class HMSCatalogConfig {
+
+  private static final ObjectMapper OBJECT_MAPPER =
+      new 
ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+  @JsonProperty("externalCatalog.hms.serverUrl")
+  private final String serverUrl;
+
+  protected static HMSCatalogConfig of(Map<String, String> properties) {
+    try {
+      return OBJECT_MAPPER.readValue(
+          OBJECT_MAPPER.writeValueAsString(properties), 
HMSCatalogConfig.class);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConversionSource.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConversionSource.java
new file mode 100644
index 00000000..db4d3c68
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConversionSource.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.thrift.TException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.TableFormatUtils;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+
+public class HMSCatalogConversionSource implements CatalogConversionSource {
+
+  private HMSCatalogConfig hmsCatalogConfig;
+  private IMetaStoreClient metaStoreClient;
+
+  // For loading the instance using ServiceLoader
+  public HMSCatalogConversionSource() {}
+
+  public HMSCatalogConversionSource(
+      ExternalCatalogConfig catalogConfig, Configuration configuration) {
+    _init(catalogConfig, configuration);
+  }
+
+  @VisibleForTesting
+  HMSCatalogConversionSource(HMSCatalogConfig hmsCatalogConfig, 
IMetaStoreClient metaStoreClient) {
+    this.hmsCatalogConfig = hmsCatalogConfig;
+    this.metaStoreClient = metaStoreClient;
+  }
+
+  @Override
+  public SourceTable getSourceTable(CatalogTableIdentifier tblIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
+    try {
+      Table table =
+          metaStoreClient.getTable(
+              tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName());
+      if (table == null) {
+        throw new IllegalStateException(
+            String.format("table: %s is null", tableIdentifier.getId()));
+      }
+
+      String tableFormat = 
TableFormatUtils.getTableFormat(table.getParameters());
+      String tableLocation = table.getSd().getLocation();
+      String dataPath =
+          TableFormatUtils.getTableDataLocation(tableFormat, tableLocation, 
table.getParameters());
+
+      Properties tableProperties = new Properties();
+      tableProperties.putAll(table.getParameters());
+      return SourceTable.builder()
+          .name(table.getTableName())
+          .basePath(tableLocation)
+          .dataPath(dataPath)
+          .formatName(tableFormat)
+          .additionalProperties(tableProperties)
+          .build();
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to get table: " + 
tableIdentifier.getId(), e);
+    }
+  }
+
+  @Override
+  public String getCatalogType() {
+    return CatalogType.HMS;
+  }
+
+  @Override
+  public void init(ExternalCatalogConfig catalogConfig, Configuration 
configuration) {
+    _init(catalogConfig, configuration);
+  }
+
+  private void _init(ExternalCatalogConfig catalogConfig, Configuration 
configuration) {
+    this.hmsCatalogConfig = 
HMSCatalogConfig.of(catalogConfig.getCatalogProperties());
+    try {
+      this.metaStoreClient = new HMSClientProvider(hmsCatalogConfig, 
configuration).getMSC();
+    } catch (MetaException | HiveException e) {
+      throw new CatalogSyncException("HiveMetastoreClient could not be 
created", e);
+    }
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
new file mode 100644
index 00000000..db86958d
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.time.ZonedDateTime;
+import java.util.Collections;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.thrift.TException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+@Log4j2
+public class HMSCatalogSyncClient implements CatalogSyncClient<Table> {
+
+  private static final String TEMP_SUFFIX = "_temp";
+  private ExternalCatalogConfig catalogConfig;
+  private HMSCatalogConfig hmsCatalogConfig;
+  private Configuration configuration;
+  private IMetaStoreClient metaStoreClient;
+  private CatalogTableBuilder<Table, Table> tableBuilder;
+
+  // For loading the instance using ServiceLoader
+  public HMSCatalogSyncClient() {}
+
+  public HMSCatalogSyncClient(
+      ExternalCatalogConfig catalogConfig, String tableFormat, Configuration 
configuration) {
+    _init(catalogConfig, tableFormat, configuration);
+  }
+
+  @VisibleForTesting
+  HMSCatalogSyncClient(
+      ExternalCatalogConfig catalogConfig,
+      HMSCatalogConfig hmsCatalogConfig,
+      Configuration configuration,
+      IMetaStoreClient metaStoreClient,
+      CatalogTableBuilder tableBuilder) {
+    this.catalogConfig = catalogConfig;
+    this.hmsCatalogConfig = hmsCatalogConfig;
+    this.configuration = configuration;
+    this.metaStoreClient = metaStoreClient;
+    this.tableBuilder = tableBuilder;
+  }
+
+  @Override
+  public String getCatalogId() {
+    return catalogConfig.getCatalogId();
+  }
+
+  @Override
+  public String getCatalogType() {
+    return CatalogType.HMS;
+  }
+
+  @Override
+  public String getStorageLocation(Table table) {
+    if (table == null || table.getSd() == null) {
+      return null;
+    }
+    return table.getSd().getLocation();
+  }
+
+  @Override
+  public boolean hasDatabase(CatalogTableIdentifier tableIdentifier) {
+    String databaseName = 
toHierarchicalTableIdentifier(tableIdentifier).getDatabaseName();
+    try {
+      return metaStoreClient.getDatabase(databaseName) != null;
+    } catch (NoSuchObjectException e) {
+      return false;
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to get database: " + 
databaseName, e);
+    }
+  }
+
+  @Override
+  public void createDatabase(CatalogTableIdentifier tableIdentifier) {
+    String databaseName = 
toHierarchicalTableIdentifier(tableIdentifier).getDatabaseName();
+    try {
+      Database database =
+          new Database(
+              databaseName,
+              "Created by " + this.getClass().getName(),
+              null,
+              Collections.emptyMap());
+      metaStoreClient.createDatabase(database);
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to create database: " + 
databaseName, e);
+    }
+  }
+
+  @Override
+  public Table getTable(CatalogTableIdentifier tblIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
+    try {
+      return metaStoreClient.getTable(
+          tableIdentifier.getDatabaseName(), tableIdentifier.getTableName());
+    } catch (NoSuchObjectException e) {
+      return null;
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to get table: " + 
tableIdentifier.getId(), e);
+    }
+  }
+
+  @Override
+  public void createTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {
+    Table hmsTable = tableBuilder.getCreateTableRequest(table, 
tableIdentifier);
+    try {
+      metaStoreClient.createTable(hmsTable);
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to create table: " + 
tableIdentifier.getId(), e);
+    }
+  }
+
+  @Override
+  public void refreshTable(
+      InternalTable table, Table catalogTable, CatalogTableIdentifier 
tblIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
+    catalogTable = tableBuilder.getUpdateTableRequest(table, catalogTable, 
tableIdentifier);
+    try {
+      metaStoreClient.alter_table(
+          tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), 
catalogTable);
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to refresh table: " + 
tableIdentifier.getId(), e);
+    }
+  }
+
+  @Override
+  public void createOrReplaceTable(InternalTable table, CatalogTableIdentifier 
tableIdentifier) {
+    // validate before dropping the table
+    validateTempTableCreation(table, tableIdentifier);
+    dropTable(table, tableIdentifier);
+    createTable(table, tableIdentifier);
+  }
+
+  @Override
+  public void dropTable(InternalTable table, CatalogTableIdentifier 
tblIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
+    try {
+      metaStoreClient.dropTable(tableIdentifier.getDatabaseName(), 
tableIdentifier.getTableName());
+    } catch (TException e) {
+      throw new CatalogSyncException("Failed to drop table: " + 
tableIdentifier.getId(), e);
+    }
+  }
+
+  @Override
+  public void init(
+      ExternalCatalogConfig catalogConfig, String tableFormat, Configuration 
configuration) {
+    _init(catalogConfig, tableFormat, configuration);
+  }
+
+  private void _init(
+      ExternalCatalogConfig catalogConfig, String tableFormat, Configuration 
configuration) {
+    this.catalogConfig = catalogConfig;
+    this.hmsCatalogConfig = 
HMSCatalogConfig.of(catalogConfig.getCatalogProperties());
+    this.configuration = configuration;
+    try {
+      this.metaStoreClient = new HMSClientProvider(hmsCatalogConfig, 
configuration).getMSC();
+    } catch (MetaException | HiveException e) {
+      throw new CatalogSyncException("HiveMetastoreClient could not be 
created", e);
+    }
+    this.tableBuilder =
+        HMSCatalogTableBuilderFactory.getTableBuilder(tableFormat, 
this.configuration);
+  }
+
+  /**
+   * creates a temp table with new metadata and properties to ensure table 
creation succeeds before
+   * dropping the table and recreating it. This ensures that actual table is 
not dropped in case
+   * there are any issues
+   */
+  private void validateTempTableCreation(
+      InternalTable table, CatalogTableIdentifier tblIdentifier) {
+    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
+    String tempTableName =
+        tableIdentifier.getTableName() + TEMP_SUFFIX + 
ZonedDateTime.now().toEpochSecond();
+    ThreePartHierarchicalTableIdentifier tempTableIdentifier =
+        new 
ThreePartHierarchicalTableIdentifier(tableIdentifier.getDatabaseName(), 
tempTableName);
+    createTable(table, tempTableIdentifier);
+    dropTable(table, tempTableIdentifier);
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (metaStoreClient != null) {
+      metaStoreClient.close();
+    }
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
new file mode 100644
index 00000000..e6a4eb5c
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static 
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class HMSCatalogTableBuilderFactory {
+
+  public static CatalogTableBuilder<Table, Table> getTableBuilder(
+      String tableFormat, Configuration configuration) {
+    switch (tableFormat) {
+      case TableFormat.ICEBERG:
+        return new IcebergHMSCatalogTableBuilder(configuration);
+      default:
+        throw new NotSupportedException("Unsupported table format: " + 
tableFormat);
+    }
+  }
+
+  public static Table newHmsTable(
+      CatalogTableIdentifier tblIdentifier,
+      StorageDescriptor storageDescriptor,
+      Map<String, String> params) {
+    HierarchicalTableIdentifier tableIdentifier = 
toHierarchicalTableIdentifier(tblIdentifier);
+    try {
+      Table newTb = new Table();
+      newTb.setDbName(tableIdentifier.getDatabaseName());
+      newTb.setTableName(tableIdentifier.getTableName());
+      newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+      newTb.setCreateTime((int) Instant.now().getEpochSecond());
+      newTb.setSd(storageDescriptor);
+      newTb.setTableType(TableType.EXTERNAL_TABLE.toString());
+      newTb.setParameters(params);
+      return newTb;
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to set owner for hms table: " + 
tableIdentifier, e);
+    }
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSClientProvider.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSClientProvider.java
new file mode 100644
index 00000000..5230d687
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSClientProvider.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import java.lang.reflect.InvocationTargetException;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+@Log4j2
+public class HMSClientProvider {
+
+  private final HMSCatalogConfig hmsCatalogConfig;
+  private final Configuration configuration;
+
+  public HMSClientProvider(HMSCatalogConfig hmsCatalogConfig, Configuration 
configuration) {
+    this.hmsCatalogConfig = hmsCatalogConfig;
+    this.configuration = configuration;
+  }
+
+  public IMetaStoreClient getMSC() throws MetaException, HiveException {
+    HiveConf hiveConf = new HiveConf(configuration, HiveConf.class);
+    hiveConf.set(METASTOREURIS.varname, hmsCatalogConfig.getServerUrl());
+    IMetaStoreClient metaStoreClient;
+    try {
+      metaStoreClient =
+          ((Hive)
+                  Hive.class
+                      .getMethod("getWithoutRegisterFns", HiveConf.class)
+                      .invoke(null, hiveConf))
+              .getMSC();
+    } catch (NoSuchMethodException
+        | IllegalAccessException
+        | IllegalArgumentException
+        | InvocationTargetException ex) {
+      metaStoreClient = Hive.get(hiveConf).getMSC();
+    }
+    log.debug("Connected to metastore with uri: {}", 
hmsCatalogConfig.getServerUrl());
+    return metaStoreClient;
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSSchemaExtractor.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSSchemaExtractor.java
new file mode 100644
index 00000000..740c7790
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSSchemaExtractor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.exception.SchemaExtractorException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class HMSSchemaExtractor {
+
+  private static final HMSSchemaExtractor INSTANCE = new HMSSchemaExtractor();
+
+  public static HMSSchemaExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  /**
+   * Extract HMS schema from InternalTable schema
+   *
+   * @param tableFormat tableFormat to handle format specific type conversion
+   * @param tableSchema InternalTable schema
+   * @return HMS Field schema list
+   */
+  public List<FieldSchema> toColumns(String tableFormat, InternalSchema 
tableSchema) {
+    return tableSchema.getFields().stream()
+        .map(
+            field ->
+                new FieldSchema(
+                    field.getName(),
+                    convertToTypeString(field.getSchema()),
+                    field.getSchema().getComment()))
+        .collect(Collectors.toList());
+  }
+
+  private String convertToTypeString(InternalSchema fieldSchema) {
+    switch (fieldSchema.getDataType()) {
+      case BOOLEAN:
+        return "boolean";
+      case INT:
+        return "int";
+      case LONG:
+        return "bigint";
+      case FLOAT:
+        return "float";
+      case DOUBLE:
+        return "double";
+      case DATE:
+        return "date";
+      case ENUM:
+      case STRING:
+        return "string";
+      case TIMESTAMP:
+      case TIMESTAMP_NTZ:
+        return "timestamp";
+      case FIXED:
+      case BYTES:
+        return "binary";
+      case DECIMAL:
+        Map<InternalSchema.MetadataKey, Object> metadata = 
fieldSchema.getMetadata();
+        if (metadata == null || metadata.isEmpty()) {
+          throw new NotSupportedException("Invalid decimal type, precision and 
scale is missing");
+        }
+        int precision =
+            (int)
+                metadata.computeIfAbsent(
+                    InternalSchema.MetadataKey.DECIMAL_PRECISION,
+                    k -> {
+                      throw new NotSupportedException("Invalid decimal type, 
precision is missing");
+                    });
+        int scale =
+            (int)
+                metadata.computeIfAbsent(
+                    InternalSchema.MetadataKey.DECIMAL_SCALE,
+                    k -> {
+                      throw new NotSupportedException("Invalid decimal type, 
scale is missing");
+                    });
+        return String.format("decimal(%s,%s)", precision, scale);
+      case RECORD:
+        final String nameToType =
+            fieldSchema.getFields().stream()
+                .map(f -> String.format("%s:%s", f.getName(), 
convertToTypeString(f.getSchema())))
+                .collect(Collectors.joining(","));
+        return String.format("struct<%s>", nameToType);
+      case LIST:
+        InternalField arrayElement =
+            fieldSchema.getFields().stream()
+                .filter(
+                    arrayField ->
+                        
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(
+                            arrayField.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid array 
schema"));
+        return String.format("array<%s>", 
convertToTypeString(arrayElement.getSchema()));
+      case MAP:
+        InternalField key =
+            fieldSchema.getFields().stream()
+                .filter(
+                    mapField ->
+                        
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
+        InternalField value =
+            fieldSchema.getFields().stream()
+                .filter(
+                    mapField ->
+                        
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
+                .findFirst()
+                .orElseThrow(() -> new SchemaExtractorException("Invalid map 
schema"));
+        return String.format(
+            "map<%s,%s>",
+            convertToTypeString(key.getSchema()), 
convertToTypeString(value.getSchema()));
+      default:
+        throw new NotSupportedException("Unsupported type: " + 
fieldSchema.getDataType());
+    }
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/IcebergHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/IcebergHMSCatalogTableBuilder.java
new file mode 100644
index 00000000..661caebe
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/IcebergHMSCatalogTableBuilder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms.table;
+
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
+import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static org.apache.xtable.catalog.Constants.PROP_EXTERNAL;
+import static org.apache.xtable.hms.HMSCatalogTableBuilderFactory.newHmsTable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
+import org.apache.iceberg.mr.hive.HiveIcebergOutputFormat;
+import org.apache.iceberg.mr.hive.HiveIcebergSerDe;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class IcebergHMSCatalogTableBuilder implements 
CatalogTableBuilder<Table, Table> {
+
+  private static final String ICEBERG_CATALOG_NAME_PROP = "iceberg.catalog";
+  private static final String ICEBERG_HADOOP_TABLE_NAME = 
"location_based_table";
+  private static final String tableFormat = TableFormat.ICEBERG;
+  private final HMSSchemaExtractor schemaExtractor;
+  private final HadoopTables hadoopTables;
+
+  public IcebergHMSCatalogTableBuilder(Configuration configuration) {
+    this.schemaExtractor = HMSSchemaExtractor.getInstance();
+    this.hadoopTables = new HadoopTables(configuration);
+  }
+
+  @VisibleForTesting
+  IcebergHMSCatalogTableBuilder(HMSSchemaExtractor schemaExtractor, 
HadoopTables hadoopTables) {
+    this.schemaExtractor = schemaExtractor;
+    this.hadoopTables = hadoopTables;
+  }
+
+  @Override
+  public Table getCreateTableRequest(InternalTable table, 
CatalogTableIdentifier tableIdentifier) {
+    return newHmsTable(
+        tableIdentifier,
+        getStorageDescriptor(table),
+        getTableParameters(loadTableFromFs(table.getBasePath())));
+  }
+
+  @Override
+  public Table getUpdateTableRequest(
+      InternalTable table, Table catalogTable, CatalogTableIdentifier 
tableIdentifier) {
+    BaseTable icebergTable = loadTableFromFs(table.getBasePath());
+    Table copyTb = new Table(catalogTable);
+    Map<String, String> parameters = copyTb.getParameters();
+    parameters.putAll(icebergTable.properties());
+    String currentMetadataLocation = parameters.get(METADATA_LOCATION_PROP);
+    parameters.put(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation);
+    parameters.put(METADATA_LOCATION_PROP, 
getMetadataFileLocation(icebergTable));
+    copyTb.setParameters(parameters);
+    copyTb.getSd().setCols(schemaExtractor.toColumns(tableFormat, 
table.getReadSchema()));
+    return copyTb;
+  }
+
+  @VisibleForTesting
+  StorageDescriptor getStorageDescriptor(InternalTable table) {
+    final StorageDescriptor storageDescriptor = new StorageDescriptor();
+    storageDescriptor.setCols(schemaExtractor.toColumns(tableFormat, 
table.getReadSchema()));
+    storageDescriptor.setLocation(table.getBasePath());
+    
storageDescriptor.setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
+    
storageDescriptor.setOutputFormat(HiveIcebergOutputFormat.class.getCanonicalName());
+    SerDeInfo serDeInfo = new SerDeInfo();
+    serDeInfo.setSerializationLib(HiveIcebergSerDe.class.getCanonicalName());
+    storageDescriptor.setSerdeInfo(serDeInfo);
+    return storageDescriptor;
+  }
+
+  @VisibleForTesting
+  Map<String, String> getTableParameters(BaseTable icebergTable) {
+    Map<String, String> parameters = new HashMap<>(icebergTable.properties());
+    parameters.put(PROP_EXTERNAL, "TRUE");
+    parameters.put(TABLE_TYPE_PROP, tableFormat);
+    parameters.put(METADATA_LOCATION_PROP, 
getMetadataFileLocation(icebergTable));
+    parameters.put(
+        hive_metastoreConstants.META_TABLE_STORAGE,
+        HiveIcebergStorageHandler.class.getCanonicalName());
+    parameters.put(ICEBERG_CATALOG_NAME_PROP, ICEBERG_HADOOP_TABLE_NAME);
+    return parameters;
+  }
+
+  private BaseTable loadTableFromFs(String tableBasePath) {
+    return (BaseTable) hadoopTables.load(tableBasePath);
+  }
+
+  private String getMetadataFileLocation(BaseTable table) {
+    return table.operations().current().metadataFileLocation();
+  }
+}
diff --git 
a/xtable-hive-metastore/src/main/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
 
b/xtable-hive-metastore/src/main/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
new file mode 100644
index 00000000..f69c81be
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/resources/META-INF/services/org.apache.xtable.spi.extractor.CatalogConversionSource
@@ -0,0 +1,18 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+org.apache.xtable.hms.HMSCatalogConversionSource
\ No newline at end of file
diff --git 
a/xtable-hive-metastore/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
 
b/xtable-hive-metastore/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
new file mode 100644
index 00000000..b0b45a5d
--- /dev/null
+++ 
b/xtable-hive-metastore/src/main/resources/META-INF/services/org.apache.xtable.spi.sync.CatalogSyncClient
@@ -0,0 +1,18 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+org.apache.xtable.hms.HMSCatalogSyncClient
\ No newline at end of file
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
new file mode 100644
index 00000000..052a5ded
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.mockito.Mock;
+
+import org.apache.xtable.conversion.ExternalCatalogConfig;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class HMSCatalogSyncClientTestBase {
+
+  @Mock protected IMetaStoreClient mockMetaStoreClient;
+  @Mock protected HMSCatalogConfig mockHMSCatalogConfig;
+  @Mock protected HMSSchemaExtractor mockHmsSchemaExtractor;
+  protected Configuration testConfiguration = new Configuration();
+
+  protected static final String TEST_HMS_DATABASE = "hms_db";
+  protected static final String TEST_HMS_TABLE = "hms_table";
+  protected static final String TEST_BASE_PATH = "base-path";
+  protected static final String TEST_CATALOG_NAME = "hms-1";
+  protected static final ExternalCatalogConfig TEST_CATALOG_CONFIG =
+      ExternalCatalogConfig.builder()
+          .catalogId(TEST_CATALOG_NAME)
+          .catalogType(CatalogType.HMS)
+          .catalogSyncClientImpl(HMSCatalogSyncClient.class.getCanonicalName())
+          .catalogProperties(Collections.emptyMap())
+          .build();
+
+  protected static final String ICEBERG_METADATA_FILE_LOCATION = 
"base-path/metadata";
+  protected static final String ICEBERG_METADATA_FILE_LOCATION_V2 = 
"base-path/v2-metadata";
+  protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE =
+      InternalTable.builder()
+          .basePath(TEST_BASE_PATH)
+          .tableFormat(TableFormat.ICEBERG)
+          
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
+          .build();
+  protected static final InternalTable TEST_HUDI_INTERNAL_TABLE =
+      InternalTable.builder()
+          .basePath(TEST_BASE_PATH)
+          .tableFormat(TableFormat.HUDI)
+          
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
+          .build();
+  protected static final ThreePartHierarchicalTableIdentifier 
TEST_CATALOG_TABLE_IDENTIFIER =
+      new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE, 
TEST_HMS_TABLE);
+
+  protected Table newTable(String dbName, String tableName) {
+    return newTable(dbName, tableName, new HashMap<>());
+  }
+
+  protected Table newTable(String dbName, String tableName, Map<String, 
String> params) {
+    Table table = new Table();
+    table.setDbName(dbName);
+    table.setTableName(tableName);
+    table.setParameters(params);
+    return table;
+  }
+
+  protected Table newTable(
+      String dbName, String tableName, Map<String, String> params, 
StorageDescriptor sd) {
+    Table table = newTable(dbName, tableName, params);
+    table.setSd(sd);
+    return table;
+  }
+
+  protected Database newDatabase(String dbName) {
+    return new Database(
+        dbName, "Created by " + HMSCatalogSyncClient.class.getName(), null, 
Collections.emptyMap());
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogConfig.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogConfig.java
new file mode 100644
index 00000000..bf05883a
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogConfig.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+public class TestHMSCatalogConfig {
+
+  private static final String HMS_CATALOG_SERVER_URL_KEY = 
"externalCatalog.hms.serverUrl";
+  private static final String HMS_CATALOG_SERVER_URL_VALUE = 
"thrift://localhost:9083";
+
+  @Test
+  void testGetHmsCatalogConfig_withNoPropertiesSet() {
+    Map<String, String> props = new HashMap<>();
+    HMSCatalogConfig catalogConfig = HMSCatalogConfig.of(props);
+    assertNull(catalogConfig.getServerUrl());
+  }
+
+  @Test
+  void testGetHmsCatalogConfig_withUnknownProperty() {
+    Map<String, String> props =
+        createProps("externalCatalog.glue.unknownProperty", 
"unknown-property-value");
+    assertDoesNotThrow(() -> HMSCatalogConfig.of(props));
+  }
+
+  @Test
+  void testGetHmsCatalogConfig() {
+    Map<String, String> props =
+        createProps(HMS_CATALOG_SERVER_URL_KEY, HMS_CATALOG_SERVER_URL_VALUE);
+    HMSCatalogConfig catalogConfig = HMSCatalogConfig.of(props);
+    assertEquals(HMS_CATALOG_SERVER_URL_VALUE, catalogConfig.getServerUrl());
+  }
+
+  private Map<String, String> createProps(String... keyValues) {
+    Map<String, String> props = new HashMap<>();
+    for (int i = 0; i < keyValues.length; i += 2) {
+      props.put(keyValues[i], keyValues[i + 1]);
+    }
+    return props;
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogConversionSource.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogConversionSource.java
new file mode 100644
index 00000000..fc649800
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogConversionSource.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.CatalogConversionSource;
+
+@ExtendWith(MockitoExtension.class)
+class TestHMSCatalogConversionSource {
+
+  @Mock private HMSCatalogConfig mockCatalogConfig;
+  @Mock private IMetaStoreClient mockMetaStoreClient;
+  private HMSCatalogConversionSource catalogConversionSource;
+  private static final String HMS_DB = "hms_db";
+  private static final String HMS_TABLE = "hms_tbl";
+  private static final String TABLE_BASE_PATH = "/var/data/table";
+  private final ThreePartHierarchicalTableIdentifier tableIdentifier =
+      new ThreePartHierarchicalTableIdentifier(HMS_DB, HMS_TABLE);
+
+  @BeforeEach
+  void init() {
+    catalogConversionSource =
+        new HMSCatalogConversionSource(mockCatalogConfig, mockMetaStoreClient);
+  }
+
+  @SneakyThrows
+  @Test
+  void testGetSourceTable_errorGettingTableFromHMS() {
+    // error getting table from hms
+    when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
+        .thenThrow(new TException("something went wrong"));
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () -> catalogConversionSource.getSourceTable(tableIdentifier));
+    assertEquals(
+        String.format(
+            "Failed to get table: %s.%s",
+            tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()),
+        exception.getMessage());
+
+    verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE);
+  }
+
+  @SneakyThrows
+  @Test
+  void testGetSourceTable_tableNotFoundInHMS() {
+    // table not found in hms
+    when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
+        .thenThrow(new NoSuchObjectException("table not found"));
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () -> catalogConversionSource.getSourceTable(tableIdentifier));
+    assertEquals(
+        String.format(
+            "Failed to get table: %s.%s",
+            tableIdentifier.getDatabaseName(), tableIdentifier.getTableName()),
+        exception.getMessage());
+
+    verify(mockMetaStoreClient, times(1)).getTable(HMS_DB, HMS_TABLE);
+  }
+
+  @SneakyThrows
+  @ParameterizedTest
+  @CsvSource(value = {"ICEBERG", "HUDI", "DELTA"})
+  void testGetSourceTable(String tableFormat) {
+    StorageDescriptor sd = new StorageDescriptor();
+    sd.setLocation(TABLE_BASE_PATH);
+    Map<String, String> tableParams = new HashMap<>();
+    if (Objects.equals(tableFormat, TableFormat.ICEBERG)) {
+      tableParams.put("write.data.path", String.format("%s/iceberg", 
TABLE_BASE_PATH));
+      tableParams.put("table_type", tableFormat);
+    } else {
+      tableParams.put("spark.sql.sources.provider", tableFormat);
+    }
+
+    String dataPath =
+        tableFormat.equals(TableFormat.ICEBERG)
+            ? String.format("%s/iceberg", TABLE_BASE_PATH)
+            : TABLE_BASE_PATH;
+    SourceTable expected =
+        newSourceTable(HMS_TABLE, TABLE_BASE_PATH, dataPath, tableFormat, 
tableParams);
+    when(mockMetaStoreClient.getTable(HMS_DB, HMS_TABLE))
+        .thenReturn(newHmsTable(HMS_DB, HMS_TABLE, tableParams, sd));
+    SourceTable output = 
catalogConversionSource.getSourceTable(tableIdentifier);
+    assertEquals(expected, output);
+  }
+
+  @Test
+  void testLoadInstanceByServiceLoader() {
+    ServiceLoader<CatalogConversionSource> loader =
+        ServiceLoader.load(CatalogConversionSource.class);
+    CatalogConversionSource catalogConversionSource = null;
+
+    for (CatalogConversionSource instance : loader) {
+      if (instance.getCatalogType().equals(CatalogType.HMS)) {
+        catalogConversionSource = instance;
+        break;
+      }
+    }
+    assertNotNull(catalogConversionSource);
+    assertEquals(
+        catalogConversionSource.getClass().getName(), 
HMSCatalogConversionSource.class.getName());
+  }
+
+  private Table newHmsTable(
+      String dbName, String tableName, Map<String, String> params, 
StorageDescriptor sd) {
+    Table table = new Table();
+    table.setDbName(dbName);
+    table.setTableName(tableName);
+    table.setParameters(params);
+    table.setSd(sd);
+    return table;
+  }
+
+  private SourceTable newSourceTable(
+      String tblName,
+      String basePath,
+      String dataPath,
+      String tblFormat,
+      Map<String, String> params) {
+    Properties tblProperties = new Properties();
+    tblProperties.putAll(params);
+    return SourceTable.builder()
+        .name(tblName)
+        .basePath(basePath)
+        .dataPath(dataPath)
+        .formatName(tblFormat)
+        .additionalProperties(tblProperties)
+        .build();
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
new file mode 100644
index 00000000..fad22ad2
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.ServiceLoader;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.spi.sync.CatalogSyncClient;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase {
+
+  @Mock private CatalogTableBuilder<Table, Table> mockTableBuilder;
+  private HMSCatalogSyncClient hmsCatalogSyncClient;
+
+  private HMSCatalogSyncClient createHMSCatalogSyncClient() {
+    return new HMSCatalogSyncClient(
+        TEST_CATALOG_CONFIG,
+        mockHMSCatalogConfig,
+        testConfiguration,
+        mockMetaStoreClient,
+        mockTableBuilder);
+  }
+
+  void setupCommonMocks() {
+    hmsCatalogSyncClient = createHMSCatalogSyncClient();
+  }
+
+  @SneakyThrows
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testHasDatabase(boolean isDbPresent) {
+    setupCommonMocks();
+    Database db = new Database(TEST_HMS_DATABASE, null, null, 
Collections.emptyMap());
+    if (isDbPresent) {
+      when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)).thenReturn(db);
+    } else {
+      when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE))
+          .thenThrow(new NoSuchObjectException("db not found"));
+    }
+    boolean output = 
hmsCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER);
+    if (isDbPresent) {
+      assertTrue(output);
+    } else {
+      assertFalse(output);
+    }
+    verify(mockMetaStoreClient, times(1)).getDatabase(TEST_HMS_DATABASE);
+  }
+
+  @SneakyThrows
+  @Test
+  void testHasDatabaseFailure() {
+    setupCommonMocks();
+    when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE))
+        .thenThrow(new TException("something went wrong"));
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () -> 
hmsCatalogSyncClient.hasDatabase(TEST_CATALOG_TABLE_IDENTIFIER));
+    assertEquals(
+        String.format("Failed to get database: %s", TEST_HMS_DATABASE), 
exception.getMessage());
+    verify(mockMetaStoreClient, times(1)).getDatabase(TEST_HMS_DATABASE);
+  }
+
+  @SneakyThrows
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testGetTable(boolean isTablePresent) {
+    setupCommonMocks();
+    Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE);
+    if (isTablePresent) {
+      when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, 
TEST_HMS_TABLE)).thenReturn(table);
+    } else {
+      when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE))
+          .thenThrow(new NoSuchObjectException("db not found"));
+    }
+    Table hmsTable = 
hmsCatalogSyncClient.getTable(TEST_CATALOG_TABLE_IDENTIFIER);
+    if (isTablePresent) {
+      assertEquals(table, hmsTable);
+    } else {
+      assertNull(hmsTable);
+    }
+    verify(mockMetaStoreClient, times(1)).getTable(TEST_HMS_DATABASE, 
TEST_HMS_TABLE);
+  }
+
+  @SneakyThrows
+  @Test
+  void testGetTableFailure() {
+    setupCommonMocks();
+    when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE))
+        .thenThrow(new TException("something went wrong"));
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () -> 
hmsCatalogSyncClient.getTable(TEST_CATALOG_TABLE_IDENTIFIER));
+    assertEquals(
+        String.format("Failed to get table: %s.%s", TEST_HMS_DATABASE, 
TEST_HMS_TABLE),
+        exception.getMessage());
+    verify(mockMetaStoreClient, times(1)).getTable(TEST_HMS_DATABASE, 
TEST_HMS_TABLE);
+  }
+
+  @SneakyThrows
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testCreateDatabase(boolean shouldFail) {
+    setupCommonMocks();
+    Database database = newDatabase(TEST_HMS_DATABASE);
+    if (shouldFail) {
+      Mockito.doThrow(new TException("something went wrong"))
+          .when(mockMetaStoreClient)
+          .createDatabase(database);
+      CatalogSyncException exception =
+          assertThrows(
+              CatalogSyncException.class,
+              () -> 
hmsCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER));
+      assertEquals(
+          String.format("Failed to create database: %s", TEST_HMS_DATABASE),
+          exception.getMessage());
+    } else {
+      hmsCatalogSyncClient.createDatabase(TEST_CATALOG_TABLE_IDENTIFIER);
+    }
+    verify(mockMetaStoreClient, times(1)).createDatabase(database);
+  }
+
+  @SneakyThrows
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  void testDropTable(boolean shouldFail) {
+    setupCommonMocks();
+    if (shouldFail) {
+      Mockito.doThrow(new TException("something went wrong"))
+          .when(mockMetaStoreClient)
+          .dropTable(TEST_HMS_DATABASE, TEST_HMS_TABLE);
+      CatalogSyncException exception =
+          assertThrows(
+              CatalogSyncException.class,
+              () ->
+                  hmsCatalogSyncClient.dropTable(
+                      TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER));
+      assertEquals(
+          String.format("Failed to drop table: %s.%s", TEST_HMS_DATABASE, 
TEST_HMS_TABLE),
+          exception.getMessage());
+    } else {
+      hmsCatalogSyncClient.dropTable(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    }
+    verify(mockMetaStoreClient, times(1)).dropTable(TEST_HMS_DATABASE, 
TEST_HMS_TABLE);
+  }
+
+  @SneakyThrows
+  @Test
+  void testCreateTable_Success() {
+    setupCommonMocks();
+    Table testTable = new Table();
+    when(mockTableBuilder.getCreateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER))
+        .thenReturn(testTable);
+    hmsCatalogSyncClient.createTable(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    verify(mockMetaStoreClient, times(1)).createTable(testTable);
+    verify(mockTableBuilder, times(1))
+        .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+  }
+
+  @SneakyThrows
+  @Test
+  void testCreateTable_ErrorGettingTableInput() {
+    setupCommonMocks();
+
+    // error when getting iceberg table input
+    doThrow(new RuntimeException("something went wrong"))
+        .when(mockTableBuilder)
+        .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    assertThrows(
+        RuntimeException.class,
+        () ->
+            hmsCatalogSyncClient.createTable(
+                TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER));
+    verify(mockTableBuilder, times(1))
+        .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    verify(mockMetaStoreClient, never()).createTable(any());
+  }
+
+  @SneakyThrows
+  @Test
+  void testCreateTable_ErrorCreatingTable() {
+    setupCommonMocks();
+
+    // error when creating table
+    Table testTable = new Table();
+    when(mockTableBuilder.getCreateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER))
+        .thenReturn(testTable);
+    doThrow(new TException("something went wrong"))
+        .when(mockMetaStoreClient)
+        .createTable(testTable);
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsCatalogSyncClient.createTable(
+                    TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER));
+    assertEquals(
+        String.format("Failed to create table: %s.%s", TEST_HMS_DATABASE, 
TEST_HMS_TABLE),
+        exception.getMessage());
+    verify(mockTableBuilder, times(1))
+        .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    verify(mockMetaStoreClient, times(1)).createTable(testTable);
+  }
+
+  @SneakyThrows
+  @Test
+  void testRefreshTable_Success() {
+    setupCommonMocks();
+    Table origTable = new Table();
+    Table updatedTable = new Table(origTable);
+    updatedTable.putToParameters(METADATA_LOCATION_PROP, 
ICEBERG_METADATA_FILE_LOCATION_V2);
+    when(mockTableBuilder.getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER))
+        .thenReturn(updatedTable);
+    hmsCatalogSyncClient.refreshTable(
+        TEST_ICEBERG_INTERNAL_TABLE, origTable, TEST_CATALOG_TABLE_IDENTIFIER);
+    verify(mockMetaStoreClient, times(1))
+        .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable);
+    verify(mockTableBuilder, times(1))
+        .getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+  }
+
+  @SneakyThrows
+  @Test
+  void testRefreshTable_ErrorGettingUpdatedTable() {
+    setupCommonMocks();
+
+    // error when getting iceberg table input
+    Table testTable = new Table();
+    doThrow(new RuntimeException("something went wrong"))
+        .when(mockTableBuilder)
+        .getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, testTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    assertThrows(
+        RuntimeException.class,
+        () ->
+            hmsCatalogSyncClient.refreshTable(
+                TEST_ICEBERG_INTERNAL_TABLE, testTable, 
TEST_CATALOG_TABLE_IDENTIFIER));
+    verify(mockTableBuilder, times(1))
+        .getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, testTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    verify(mockMetaStoreClient, never()).alter_table(any(), any(), any());
+  }
+
+  @SneakyThrows
+  @Test
+  void testRefreshTable_ErrorRefreshingTable() {
+    setupCommonMocks();
+
+    // error when creating table
+    Table origTable = new Table();
+    Table updatedTable = new Table(origTable);
+    updatedTable.putToParameters(METADATA_LOCATION_PROP, 
ICEBERG_METADATA_FILE_LOCATION_V2);
+    when(mockTableBuilder.getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER))
+        .thenReturn(updatedTable);
+    doThrow(new TException("something went wrong"))
+        .when(mockMetaStoreClient)
+        .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable);
+    CatalogSyncException exception =
+        assertThrows(
+            CatalogSyncException.class,
+            () ->
+                hmsCatalogSyncClient.refreshTable(
+                    TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER));
+    assertEquals(
+        String.format("Failed to refresh table: %s.%s", TEST_HMS_DATABASE, 
TEST_HMS_TABLE),
+        exception.getMessage());
+    verify(mockTableBuilder, times(1))
+        .getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, origTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    verify(mockMetaStoreClient, times(1))
+        .alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable);
+  }
+
+  @SneakyThrows
+  @Test
+  void testCreateOrReplaceTable() {
+    setupCommonMocks();
+
+    ZonedDateTime zonedDateTime =
+        
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
+    try (MockedStatic<ZonedDateTime> mockZonedDateTime = 
mockStatic(ZonedDateTime.class)) {
+      mockZonedDateTime.when(ZonedDateTime::now).thenReturn(zonedDateTime);
+
+      String tempTableName = TEST_HMS_TABLE + "_temp" + 
ZonedDateTime.now().toEpochSecond();
+      final ThreePartHierarchicalTableIdentifier tempTableIdentifier =
+          new ThreePartHierarchicalTableIdentifier(
+              TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), tempTableName);
+
+      Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE);
+      Table tempTable = newTable(TEST_HMS_DATABASE, tempTableName);
+
+      when(mockTableBuilder.getCreateTableRequest(
+              TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER))
+          .thenReturn(table);
+      when(mockTableBuilder.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
tempTableIdentifier))
+          .thenReturn(tempTable);
+
+      hmsCatalogSyncClient.createOrReplaceTable(
+          TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
+
+      verify(mockMetaStoreClient, times(1)).createTable(table);
+      verify(mockMetaStoreClient, times(1))
+          .dropTable(TEST_HMS_DATABASE, 
TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+      verify(mockMetaStoreClient, times(1)).createTable(tempTable);
+      verify(mockMetaStoreClient, times(1))
+          .dropTable(TEST_HMS_DATABASE, tempTableIdentifier.getTableName());
+
+      verify(mockTableBuilder, times(1))
+          .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
TEST_CATALOG_TABLE_IDENTIFIER);
+      verify(mockTableBuilder, times(1))
+          .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, 
tempTableIdentifier);
+    }
+  }
+
+  @Test
+  void testLoadInstanceByServiceLoader() {
+    ServiceLoader<CatalogSyncClient> loader = 
ServiceLoader.load(CatalogSyncClient.class);
+    CatalogSyncClient catalogSyncClient = null;
+
+    for (CatalogSyncClient instance : loader) {
+      if (instance.getCatalogType().equals(CatalogType.HMS)) {
+        catalogSyncClient = instance;
+        break;
+      }
+    }
+    assertNotNull(catalogSyncClient);
+    assertEquals(catalogSyncClient.getClass().getName(), 
HMSCatalogSyncClient.class.getName());
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
new file mode 100644
index 00000000..6748b688
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static 
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
+import static 
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE;
+import static 
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE;
+import static 
org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableParameters;
+import static 
org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableStorageDescriptor;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mockStatic;
+
+import java.time.Instant;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class TestHMSCatalogTableBuilderFactory {
+
+  @SneakyThrows
+  @Test
+  void testNewHmsTable() {
+    Instant createdTime = Instant.now();
+    try (MockedStatic<Instant> mockZonedDateTime = mockStatic(Instant.class)) {
+      mockZonedDateTime.when(Instant::now).thenReturn(createdTime);
+      Table expected = new Table();
+      expected.setDbName(TEST_HMS_DATABASE);
+      expected.setTableName(TEST_HMS_TABLE);
+      
expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+      expected.setCreateTime((int) createdTime.getEpochSecond());
+      expected.setSd(getTestHmsTableStorageDescriptor());
+      expected.setTableType("EXTERNAL_TABLE");
+      expected.setParameters(getTestHmsTableParameters());
+
+      assertEquals(
+          expected,
+          HMSCatalogTableBuilderFactory.newHmsTable(
+              TEST_CATALOG_TABLE_IDENTIFIER,
+              getTestHmsTableStorageDescriptor(),
+              getTestHmsTableParameters()));
+    }
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSSchemaExtractor.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSSchemaExtractor.java
new file mode 100644
index 00000000..0a774a63
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSSchemaExtractor.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.junit.jupiter.api.Test;
+
+import org.apache.xtable.catalog.TestSchemaExtractorBase;
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class TestHMSSchemaExtractor extends TestSchemaExtractorBase {
+
+  private FieldSchema getFieldSchema(String name, String type) {
+    return new FieldSchema(name, type, null);
+  }
+
+  @Test
+  void testPrimitiveTypes() {
+    int precision = 10;
+    int scale = 5;
+    Map<InternalSchema.MetadataKey, Object> doubleMetadata = new HashMap<>();
+    doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 
precision);
+    doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, scale);
+    String tableFormat = TableFormat.ICEBERG;
+
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .name("record")
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "requiredBoolean", "boolean", InternalType.BOOLEAN, 
false, 1),
+                    getPrimitiveInternalField(
+                        "optionalBoolean", "boolean", InternalType.BOOLEAN, 
true, 2),
+                    getPrimitiveInternalField("requiredInt", "integer", 
InternalType.INT, false, 3),
+                    getPrimitiveInternalField("requiredLong", "long", 
InternalType.LONG, false, 4),
+                    getPrimitiveInternalField(
+                        "requiredDouble", "double", InternalType.DOUBLE, 
false, 5),
+                    getPrimitiveInternalField(
+                        "requiredFloat", "float", InternalType.FLOAT, false, 
6),
+                    getPrimitiveInternalField(
+                        "requiredString", "string", InternalType.STRING, 
false, 7),
+                    getPrimitiveInternalField(
+                        "requiredBytes", "binary", InternalType.BYTES, false, 
8),
+                    getPrimitiveInternalField("requiredDate", "date", 
InternalType.DATE, false, 9),
+                    getPrimitiveInternalField(
+                        "requiredDecimal",
+                        "decimal",
+                        InternalType.DECIMAL,
+                        false,
+                        10,
+                        doubleMetadata),
+                    getPrimitiveInternalField(
+                        "requiredTimestamp", "timestamp", 
InternalType.TIMESTAMP, false, 11),
+                    getPrimitiveInternalField(
+                        "requiredTimestampNTZ",
+                        "timestamp_ntz",
+                        InternalType.TIMESTAMP_NTZ,
+                        false,
+                        12)))
+            .build();
+
+    List<FieldSchema> expected =
+        Arrays.asList(
+            getFieldSchema("requiredBoolean", "boolean"),
+            getFieldSchema("optionalBoolean", "boolean"),
+            getFieldSchema("requiredInt", "int"),
+            getFieldSchema("requiredLong", "bigint"),
+            getFieldSchema("requiredDouble", "double"),
+            getFieldSchema("requiredFloat", "float"),
+            getFieldSchema("requiredString", "string"),
+            getFieldSchema("requiredBytes", "binary"),
+            getFieldSchema("requiredDate", "date"),
+            getFieldSchema("requiredDecimal", String.format("decimal(%s,%s)", 
precision, scale)),
+            getFieldSchema("requiredTimestamp", "timestamp"),
+            getFieldSchema("requiredTimestampNTZ", "timestamp"));
+
+    assertEquals(expected, 
HMSSchemaExtractor.getInstance().toColumns(tableFormat, internalSchema));
+  }
+
+  @Test
+  void testTimestamps() {
+    String tableFormat = TableFormat.ICEBERG;
+    Map<InternalSchema.MetadataKey, Object> millisTimestamp =
+        Collections.singletonMap(
+            InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MILLIS);
+
+    Map<InternalSchema.MetadataKey, Object> microsTimestamp =
+        Collections.singletonMap(
+            InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
+
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .name("record")
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "requiredTimestampMillis",
+                        "timestamp",
+                        InternalType.TIMESTAMP,
+                        false,
+                        1,
+                        millisTimestamp),
+                    getPrimitiveInternalField(
+                        "requiredTimestampMicros",
+                        "timestamp",
+                        InternalType.TIMESTAMP,
+                        false,
+                        2,
+                        microsTimestamp),
+                    getPrimitiveInternalField(
+                        "requiredTimestampNTZMillis",
+                        "timestamp_ntz",
+                        InternalType.TIMESTAMP_NTZ,
+                        false,
+                        3,
+                        millisTimestamp),
+                    getPrimitiveInternalField(
+                        "requiredTimestampNTZMicros",
+                        "timestamp_ntz",
+                        InternalType.TIMESTAMP_NTZ,
+                        false,
+                        4,
+                        microsTimestamp)))
+            .build();
+
+    List<FieldSchema> expected =
+        Arrays.asList(
+            getFieldSchema("requiredTimestampMillis", "timestamp"),
+            getFieldSchema("requiredTimestampMicros", "timestamp"),
+            getFieldSchema("requiredTimestampNTZMillis", "timestamp"),
+            getFieldSchema("requiredTimestampNTZMicros", "timestamp"));
+
+    assertEquals(expected, 
HMSSchemaExtractor.getInstance().toColumns(tableFormat, internalSchema));
+  }
+
+  @Test
+  void testMaps() {
+    String tableFormat = TableFormat.ICEBERG;
+    InternalSchema recordMapElementSchema =
+        InternalSchema.builder()
+            .name("struct")
+            .isNullable(true)
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "requiredDouble",
+                        "double",
+                        InternalType.DOUBLE,
+                        false,
+                        1,
+                        "recordMap._one_field_value"),
+                    getPrimitiveInternalField(
+                        "optionalString",
+                        "string",
+                        InternalType.STRING,
+                        true,
+                        2,
+                        "recordMap._one_field_value")))
+            .dataType(InternalType.RECORD)
+            .build();
+
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .name("record")
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("intMap")
+                        .fieldId(1)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("map")
+                                .isNullable(false)
+                                .dataType(InternalType.MAP)
+                                .fields(
+                                    Arrays.asList(
+                                        getPrimitiveInternalField(
+                                            
InternalField.Constants.MAP_KEY_FIELD_NAME,
+                                            "string",
+                                            InternalType.STRING,
+                                            false,
+                                            3,
+                                            "intMap"),
+                                        getPrimitiveInternalField(
+                                            
InternalField.Constants.MAP_VALUE_FIELD_NAME,
+                                            "integer",
+                                            InternalType.INT,
+                                            false,
+                                            4,
+                                            "intMap")))
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("recordMap")
+                        .fieldId(2)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("map")
+                                .isNullable(true)
+                                .dataType(InternalType.MAP)
+                                .fields(
+                                    Arrays.asList(
+                                        getPrimitiveInternalField(
+                                            
InternalField.Constants.MAP_KEY_FIELD_NAME,
+                                            "integer",
+                                            InternalType.INT,
+                                            false,
+                                            5,
+                                            "recordMap"),
+                                        InternalField.builder()
+                                            
.name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+                                            .fieldId(6)
+                                            .parentPath("recordMap")
+                                            .schema(recordMapElementSchema)
+                                            .build()))
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build()))
+            .build();
+
+    List<FieldSchema> expected =
+        Arrays.asList(
+            getFieldSchema("intMap", "map<string,int>"),
+            getFieldSchema(
+                "recordMap", 
"map<int,struct<requiredDouble:double,optionalString:string>>"));
+
+    assertEquals(expected, 
HMSSchemaExtractor.getInstance().toColumns(tableFormat, internalSchema));
+  }
+
+  @Test
+  void testLists() {
+    String tableFormat = TableFormat.ICEBERG;
+    InternalSchema recordListElementSchema =
+        InternalSchema.builder()
+            .name("struct")
+            .isNullable(true)
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "requiredDouble",
+                        "double",
+                        InternalType.DOUBLE,
+                        false,
+                        11,
+                        "recordMap._one_field_value"),
+                    getPrimitiveInternalField(
+                        "optionalString",
+                        "string",
+                        InternalType.STRING,
+                        true,
+                        12,
+                        "recordMap._one_field_value")))
+            .dataType(InternalType.RECORD)
+            .build();
+
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .name("record")
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("intList")
+                        .fieldId(1)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("list")
+                                .isNullable(false)
+                                .dataType(InternalType.LIST)
+                                .fields(
+                                    Collections.singletonList(
+                                        getPrimitiveInternalField(
+                                            
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME,
+                                            "integer",
+                                            InternalType.INT,
+                                            false,
+                                            13,
+                                            "intList")))
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("recordList")
+                        .fieldId(2)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("list")
+                                .isNullable(true)
+                                .dataType(InternalType.LIST)
+                                .fields(
+                                    Collections.singletonList(
+                                        InternalField.builder()
+                                            
.name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+                                            .fieldId(14)
+                                            .parentPath("recordList")
+                                            .schema(recordListElementSchema)
+                                            .build()))
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build()))
+            .build();
+
+    List<FieldSchema> expected =
+        Arrays.asList(
+            getFieldSchema("intList", "array<int>"),
+            getFieldSchema(
+                "recordList", 
"array<struct<requiredDouble:double,optionalString:string>>"));
+
+    assertEquals(expected, 
HMSSchemaExtractor.getInstance().toColumns(tableFormat, internalSchema));
+  }
+
+  @Test
+  void testNestedRecords() {
+    String tableFormat = TableFormat.ICEBERG;
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .name("record")
+            .isNullable(false)
+            .fields(
+                Collections.singletonList(
+                    InternalField.builder()
+                        .name("nestedOne")
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .fieldId(1)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("struct")
+                                .dataType(InternalType.RECORD)
+                                .isNullable(true)
+                                .fields(
+                                    Arrays.asList(
+                                        getPrimitiveInternalField(
+                                            "nestedOptionalInt",
+                                            "integer",
+                                            InternalType.INT,
+                                            true,
+                                            11,
+                                            "nestedOne"),
+                                        getPrimitiveInternalField(
+                                            "nestedRequiredDouble",
+                                            "double",
+                                            InternalType.DOUBLE,
+                                            false,
+                                            12,
+                                            "nestedOne"),
+                                        InternalField.builder()
+                                            .name("nestedTwo")
+                                            .parentPath("nestedOne")
+                                            .fieldId(13)
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("struct")
+                                                    
.dataType(InternalType.RECORD)
+                                                    .isNullable(false)
+                                                    .fields(
+                                                        
Collections.singletonList(
+                                                            
getPrimitiveInternalField(
+                                                                
"doublyNestedString",
+                                                                "string",
+                                                                
InternalType.STRING,
+                                                                true,
+                                                                14,
+                                                                
"nestedOne.nestedTwo")))
+                                                    .build())
+                                            .build()))
+                                .build())
+                        .build()))
+            .build();
+
+    List<FieldSchema> expected =
+        Arrays.asList(
+            getFieldSchema(
+                "nestedOne",
+                
"struct<nestedOptionalInt:int,nestedRequiredDouble:double,nestedTwo:struct<doublyNestedString:string>>"));
+    assertEquals(expected, 
HMSSchemaExtractor.getInstance().toColumns(tableFormat, internalSchema));
+  }
+
+  @Test
+  void testUnsupportedType() {
+    String tableFormat = TableFormat.ICEBERG;
+    // Unknown "UNION" type
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .name("record")
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "optionalBoolean", "boolean", InternalType.BOOLEAN, 
true, 2),
+                    InternalField.builder()
+                        .name("unionField")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("unionSchema")
+                                .dataType(InternalType.UNION)
+                                .isNullable(true)
+                                .build())
+                        .fieldId(2)
+                        .build()))
+            .build();
+
+    NotSupportedException exception =
+        assertThrows(
+            NotSupportedException.class,
+            () -> HMSSchemaExtractor.getInstance().toColumns(tableFormat, 
internalSchema));
+    assertEquals("Unsupported type: InternalType.UNION(name=union)", 
exception.getMessage());
+
+    // Invalid decimal type (precision and scale metadata is missing)
+    InternalSchema internalSchema2 =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .name("record")
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "optionalBoolean", "boolean", InternalType.BOOLEAN, 
true, 1),
+                    getPrimitiveInternalField(
+                        "optionalDecimal", "decimal", InternalType.DECIMAL, 
true, 2)))
+            .build();
+
+    exception =
+        assertThrows(
+            NotSupportedException.class,
+            () -> HMSSchemaExtractor.getInstance().toColumns(tableFormat, 
internalSchema2));
+    assertEquals("Invalid decimal type, precision and scale is missing", 
exception.getMessage());
+
+    // Invalid decimal type (scale metadata is missing)
+    Map<InternalSchema.MetadataKey, Object> doubleMetadata = new HashMap<>();
+    doubleMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
+    InternalSchema internalSchema3 =
+        InternalSchema.builder()
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .name("record")
+            .fields(
+                Arrays.asList(
+                    getPrimitiveInternalField(
+                        "optionalBoolean", "boolean", InternalType.BOOLEAN, 
true, 1),
+                    getPrimitiveInternalField(
+                        "optionalDecimal",
+                        "decimal",
+                        InternalType.DECIMAL,
+                        true,
+                        2,
+                        doubleMetadata)))
+            .build();
+
+    exception =
+        assertThrows(
+            NotSupportedException.class,
+            () -> HMSSchemaExtractor.getInstance().toColumns(tableFormat, 
internalSchema3));
+    assertEquals("Invalid decimal type, scale is missing", 
exception.getMessage());
+  }
+}
diff --git 
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
new file mode 100644
index 00000000..b0c09a94
--- /dev/null
+++ 
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.hms.table;
+
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
+import static 
org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Instant;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.hadoop.HadoopTables;
+
+import org.apache.xtable.hms.HMSCatalogSyncClientTestBase;
+import org.apache.xtable.model.storage.TableFormat;
+
+@ExtendWith(MockitoExtension.class)
+public class TestIcebergHMSCatalogTableBuilder extends 
HMSCatalogSyncClientTestBase {
+
+  @Mock private HadoopTables mockIcebergHadoopTables;
+  @Mock private BaseTable mockIcebergBaseTable;
+  @Mock private TableOperations mockIcebergTableOperations;
+  @Mock private TableMetadata mockIcebergTableMetadata;
+
+  private IcebergHMSCatalogTableBuilder 
mockIcebergHmsCatalogSyncRequestProvider;
+
+  private IcebergHMSCatalogTableBuilder createIcebergHMSHelper() {
+    return new IcebergHMSCatalogTableBuilder(mockHmsSchemaExtractor, 
mockIcebergHadoopTables);
+  }
+
+  void setupCommonMocks() {
+    mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper();
+  }
+
+  void mockHadoopTables() {
+    
when(mockIcebergHadoopTables.load(TEST_BASE_PATH)).thenReturn(mockIcebergBaseTable);
+    mockMetadataFileLocation();
+  }
+
+  void mockMetadataFileLocation() {
+    
when(mockIcebergBaseTable.operations()).thenReturn(mockIcebergTableOperations);
+    
when(mockIcebergTableOperations.current()).thenReturn(mockIcebergTableMetadata);
+    when(mockIcebergTableMetadata.metadataFileLocation())
+        .thenReturn(ICEBERG_METADATA_FILE_LOCATION);
+  }
+
+  @SneakyThrows
+  @Test
+  void testGetCreateTableRequest() {
+    mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper();
+    mockHadoopTables();
+    when(mockHmsSchemaExtractor.toColumns(
+            TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()))
+        .thenReturn(Collections.emptyList());
+    Instant createdTime = Instant.now();
+    try (MockedStatic<Instant> mockZonedDateTime = mockStatic(Instant.class)) {
+      mockZonedDateTime.when(Instant::now).thenReturn(createdTime);
+      Table expected = new Table();
+      expected.setDbName(TEST_HMS_DATABASE);
+      expected.setTableName(TEST_HMS_TABLE);
+      
expected.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+      expected.setCreateTime((int) createdTime.getEpochSecond());
+      expected.setSd(getTestHmsTableStorageDescriptor());
+      expected.setTableType("EXTERNAL_TABLE");
+      expected.setParameters(getTestHmsTableParameters());
+
+      assertEquals(
+          expected,
+          mockIcebergHmsCatalogSyncRequestProvider.getCreateTableRequest(
+              TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER));
+      verify(mockHmsSchemaExtractor, times(1))
+          .toColumns(TableFormat.ICEBERG, 
TEST_ICEBERG_INTERNAL_TABLE.getReadSchema());
+      verify(mockIcebergBaseTable, times(1)).properties();
+      verify(mockIcebergHadoopTables, times(1)).load(TEST_BASE_PATH);
+    }
+  }
+
+  @SneakyThrows
+  @Test
+  void testGetUpdateTableRequest() {
+    setupCommonMocks();
+    mockHadoopTables();
+    when(mockHmsSchemaExtractor.toColumns(
+            TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()))
+        .thenReturn(Collections.emptyList());
+
+    Map<String, String> tableParams = new HashMap<>();
+    tableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION);
+    Table hmsTable =
+        newTable(
+            TEST_HMS_DATABASE, TEST_HMS_TABLE, tableParams, 
getTestHmsTableStorageDescriptor());
+
+    when(mockIcebergTableMetadata.metadataFileLocation())
+        .thenReturn(ICEBERG_METADATA_FILE_LOCATION_V2);
+    when(mockIcebergBaseTable.properties()).thenReturn(Collections.emptyMap());
+    Table output =
+        mockIcebergHmsCatalogSyncRequestProvider.getUpdateTableRequest(
+            TEST_ICEBERG_INTERNAL_TABLE, hmsTable, 
TEST_CATALOG_TABLE_IDENTIFIER);
+    tableParams.put(PREVIOUS_METADATA_LOCATION_PROP, 
ICEBERG_METADATA_FILE_LOCATION);
+    tableParams.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_V2);
+    Table expected =
+        newTable(
+            TEST_HMS_DATABASE, TEST_HMS_TABLE, tableParams, 
getTestHmsTableStorageDescriptor());
+    assertEquals(expected, output);
+    assertEquals(tableParams, hmsTable.getParameters());
+    verify(mockHmsSchemaExtractor, times(1))
+        .toColumns(TableFormat.ICEBERG, 
TEST_ICEBERG_INTERNAL_TABLE.getReadSchema());
+  }
+
+  @Test
+  void testGetStorageDescriptor() {
+    mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper();
+    when(mockHmsSchemaExtractor.toColumns(
+            TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()))
+        .thenReturn(Collections.emptyList());
+    StorageDescriptor expected = getTestHmsTableStorageDescriptor();
+    assertEquals(
+        expected,
+        
mockIcebergHmsCatalogSyncRequestProvider.getStorageDescriptor(TEST_ICEBERG_INTERNAL_TABLE));
+    verify(mockHmsSchemaExtractor, times(1))
+        .toColumns(TableFormat.ICEBERG, 
TEST_ICEBERG_INTERNAL_TABLE.getReadSchema());
+  }
+
+  @Test
+  void testGetTableParameters() {
+    mockIcebergHmsCatalogSyncRequestProvider = createIcebergHMSHelper();
+    mockMetadataFileLocation();
+    when(mockIcebergBaseTable.properties()).thenReturn(Collections.emptyMap());
+    Map<String, String> expected = getTestHmsTableParameters();
+    assertEquals(
+        expected,
+        
mockIcebergHmsCatalogSyncRequestProvider.getTableParameters(mockIcebergBaseTable));
+    verify(mockIcebergBaseTable, times(1)).properties();
+    verify(mockIcebergHadoopTables, never()).load(any());
+  }
+
+  public static StorageDescriptor getTestHmsTableStorageDescriptor() {
+    StorageDescriptor storageDescriptor = new StorageDescriptor();
+    SerDeInfo serDeInfo = new SerDeInfo();
+    storageDescriptor.setCols(Collections.emptyList());
+    storageDescriptor.setLocation(TEST_BASE_PATH);
+    
storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat");
+    
storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
+    
serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe");
+    storageDescriptor.setSerdeInfo(serDeInfo);
+    return storageDescriptor;
+  }
+
+  public static Map<String, String> getTestHmsTableParameters() {
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("EXTERNAL", "TRUE");
+    parameters.put("table_type", "ICEBERG");
+    parameters.put("metadata_location", ICEBERG_METADATA_FILE_LOCATION);
+    parameters.put("storage_handler", 
"org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
+    parameters.put("iceberg.catalog", "location_based_table");
+    return parameters;
+  }
+}
diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml
index 75b26e0b..a550f73c 100644
--- a/xtable-utilities/pom.xml
+++ b/xtable-utilities/pom.xml
@@ -50,6 +50,26 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.xtable</groupId>
+            <artifactId>xtable-hive-metastore</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.mortbay.jetty</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty.aggregate</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.eclipse.jetty.orbit</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <!-- command line arg parsing -->
         <dependency>
             <groupId>commons-cli</groupId>

Reply via email to