This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new f194f4cb [590] Add Hudi HMS Catalog Sync Implementation
f194f4cb is described below
commit f194f4cbc8a54a2cfd49c18b14d656abb046baf2
Author: Vamsi <[email protected]>
AuthorDate: Tue Feb 11 20:00:00 2025 +0530
[590] Add Hudi HMS Catalog Sync Implementation
---
...va => HudiCatalogTablePropertiesExtractor.java} | 14 +-
...> TestHudiCatalogTablePropertiesExtractor.java} | 10 +-
xtable-hive-metastore/pom.xml | 8 +
.../org/apache/xtable/hms/HMSCatalogConfig.java | 11 +
.../hms/HMSCatalogPartitionSyncOperations.java | 221 ++++++++++
.../apache/xtable/hms/HMSCatalogSyncClient.java | 22 +-
.../xtable/hms/HMSCatalogTableBuilderFactory.java | 7 +-
.../hms/table/HudiHMSCatalogTableBuilder.java | 204 +++++++++
...ntTestBase.java => HMSCatalogSyncTestBase.java} | 10 +-
.../hms/TestHMSCatalogPartitionSyncOperations.java | 470 +++++++++++++++++++++
.../xtable/hms/TestHMSCatalogSyncClient.java | 95 +++--
.../hms/TestHMSCatalogTableBuilderFactory.java | 8 +-
.../hms/table/TestDeltaHMSCatalogTableBuilder.java | 4 +-
.../hms/table/TestHudiHMSCatalogTableBuilder.java | 143 +++++++
.../table/TestIcebergHMSCatalogTableBuilder.java | 4 +-
.../apache/xtable/utilities/RunCatalogSync.java | 25 +-
.../src/test/resources/catalogConfig.yaml | 1 +
17 files changed, 1200 insertions(+), 57 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
similarity index 91%
rename from
xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
rename to
xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
index e6d99706..ff26e600 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTableUtils.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
@@ -23,6 +23,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
import org.apache.spark.sql.types.StructType;
import org.apache.hudi.common.util.StringUtils;
@@ -33,15 +36,22 @@ import org.apache.xtable.model.schema.InternalType;
import org.apache.xtable.schema.SparkSchemaExtractor;
/** Util class to fetch details about Hudi table */
-public class HudiCatalogTableUtils {
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class HudiCatalogTablePropertiesExtractor {
+
+ private static final HudiCatalogTablePropertiesExtractor INSTANCE =
+ new HudiCatalogTablePropertiesExtractor();
+ public static HudiCatalogTablePropertiesExtractor getInstance() {
+ return INSTANCE;
+ }
/**
* Get Spark Sql related table properties. This is used for spark datasource
table.
*
* @param schema The schema to write to the table.
* @return A new parameters added the spark's table properties.
*/
- public static Map<String, String> getSparkTableProperties(
+ public Map<String, String> getSparkTableProperties(
List<String> partitionNames,
String sparkVersion,
int schemaLengthThreshold,
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
similarity index 94%
rename from
xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
rename to
xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
index e39d63e5..e08674bf 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTableUtils.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
@@ -32,7 +32,7 @@ import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
-public class TestHudiCatalogTableUtils {
+public class TestHudiCatalogTablePropertiesExtractor {
@Test
void testGetSparkTableProperties() {
@@ -81,8 +81,8 @@ public class TestHudiCatalogTableUtils {
.build();
Map<String, String> result =
- HudiCatalogTableUtils.getSparkTableProperties(
- partitionNames, sparkVersion, schemaLengthThreshold, schema);
+ HudiCatalogTablePropertiesExtractor.getInstance()
+ .getSparkTableProperties(partitionNames, sparkVersion,
schemaLengthThreshold, schema);
// Validate results
assertEquals("hudi", result.get("spark.sql.sources.provider"));
@@ -127,8 +127,8 @@ public class TestHudiCatalogTableUtils {
// Call the method
Map<String, String> result =
- HudiCatalogTableUtils.getSparkTableProperties(
- partitionNames, "", schemaLengthThreshold, schema);
+ HudiCatalogTablePropertiesExtractor.getInstance()
+ .getSparkTableProperties(partitionNames, "",
schemaLengthThreshold, schema);
assertEquals("hudi", result.get("spark.sql.sources.provider"));
assertNull(result.get("spark.sql.create.version"));
diff --git a/xtable-hive-metastore/pom.xml b/xtable-hive-metastore/pom.xml
index 01037a1f..20520f13 100644
--- a/xtable-hive-metastore/pom.xml
+++ b/xtable-hive-metastore/pom.xml
@@ -42,6 +42,14 @@
<scope>provided</scope>
</dependency>
+ <!-- Hudi dependencies -->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-hive-sync</artifactId>
+ <version>${hudi.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Iceberg dependencies -->
<dependency>
<groupId>org.apache.iceberg</groupId>
diff --git
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
index f6f9eabd..36fffdd9 100644
---
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
+++
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogConfig.java
@@ -26,6 +26,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
@@ -43,6 +45,15 @@ public class HMSCatalogConfig {
@JsonProperty("externalCatalog.hms.serverUrl")
private final String serverUrl;
+ @JsonProperty("externalCatalog.hms.schema_string_length_thresh")
+ private final int schemaLengthThreshold = 4000;
+
+ @JsonProperty("externalCatalog.hms.partition_extractor_class")
+ private final String partitionExtractorClass =
MultiPartKeysValueExtractor.class.getName();
+
+ @JsonProperty("externalCatalog.hms.max_partitions_per_request")
+ private final int maxPartitionsPerRequest = 1000;
+
protected static HMSCatalogConfig of(Map<String, String> properties) {
try {
return OBJECT_MAPPER.readValue(
diff --git
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..89b53ca5
--- /dev/null
+++
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogPartitionSyncOperations.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hms;
+
+import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.exception.TableNotFoundException;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+
+@Log4j2
+public class HMSCatalogPartitionSyncOperations implements
CatalogPartitionSyncOperations {
+
+ private final IMetaStoreClient metaStoreClient;
+ private final HMSCatalogConfig catalogConfig;
+
+ public HMSCatalogPartitionSyncOperations(
+ IMetaStoreClient metaStoreClient, HMSCatalogConfig hmsCatalogConfig) {
+ this.metaStoreClient = metaStoreClient;
+ this.catalogConfig = hmsCatalogConfig;
+ }
+
+ @Override
+ public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier
catalogTableIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ try {
+ return metaStoreClient
+ .listPartitions(
+ tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName(), (short) -1)
+ .stream()
+ .map(p -> new CatalogPartition(p.getValues(),
p.getSd().getLocation()))
+ .collect(Collectors.toList());
+ } catch (TException e) {
+ throw new CatalogSyncException(
+ "Failed to get all partitions for table " + tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void addPartitionsToTable(
+ CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition>
partitionsToAdd) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ if (partitionsToAdd.isEmpty()) {
+ log.info("No partitions to add for {}", tableIdentifier);
+ return;
+ }
+ log.info("Adding partitions {} to table {}", partitionsToAdd.size(),
tableIdentifier);
+ try {
+ StorageDescriptor sd =
+ metaStoreClient
+ .getTable(tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName())
+ .getSd();
+
+ CollectionUtils.batches(partitionsToAdd,
catalogConfig.getMaxPartitionsPerRequest())
+ .forEach(
+ batch -> {
+ List<Partition> partitionList = new ArrayList<>();
+ batch.forEach(
+ partition -> {
+ partitionList.add(createPartition(tableIdentifier,
partition, sd));
+ });
+ try {
+ metaStoreClient.add_partitions(partitionList, true, false);
+ } catch (TException e) {
+ log.error("{} add partition failed", tableIdentifier, e);
+ throw new CatalogSyncException(tableIdentifier + " add
partition failed", e);
+ }
+ log.info("Add batch partitions done: {}",
partitionList.size());
+ });
+ } catch (TException e) {
+ log.error("Failed to add partitions to table {}", tableIdentifier, e);
+ throw new CatalogSyncException(tableIdentifier + " add partition
failed", e);
+ }
+ }
+
+ @Override
+ public void updatePartitionsToTable(
+ CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition>
changedPartitions) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ try {
+ Table table =
+ metaStoreClient.getTable(
+ tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName());
+ StorageDescriptor tableSd = table.getSd();
+
+ List<Partition> updatedPartitions = new ArrayList<>();
+
+ changedPartitions.forEach(
+ partition -> {
+ updatedPartitions.add(createPartition(tableIdentifier, partition,
tableSd));
+ });
+
+ // Update partitions
+ metaStoreClient.alter_partitions(
+ tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
updatedPartitions);
+ } catch (TException e) {
+ throw new CatalogSyncException(
+ "Failed to update partitions for the table " + tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void dropPartitions(
+ CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition>
partitionsToDrop) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ try {
+ for (CatalogPartition partition : partitionsToDrop) {
+ metaStoreClient.dropPartition(
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(),
+ partition.getValues(),
+ false);
+ }
+ } catch (TException e) {
+ throw new CatalogSyncException("Failed to drop partitions for table " +
tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public Map<String, String> getTableProperties(
+ CatalogTableIdentifier catalogTableIdentifier, List<String>
keysToRetrieve) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ try {
+ Table table =
+ metaStoreClient.getTable(
+ tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName());
+ Map<String, String> tableParameters = table.getParameters();
+
+ return keysToRetrieve.stream()
+ .filter(tableParameters::containsKey)
+ .collect(Collectors.toMap(key -> key, tableParameters::get));
+ } catch (TableNotFoundException | TException e) {
+ throw new CatalogSyncException(
+ "failed to fetch last time synced properties for table" +
tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void updateTableProperties(
+ CatalogTableIdentifier catalogTableIdentifier, Map<String, String>
propertiesToUpdate) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ try {
+ if (propertiesToUpdate == null || propertiesToUpdate.isEmpty()) {
+ return;
+ }
+
+ Table table =
+ metaStoreClient.getTable(
+ tableIdentifier.getDatabaseName(),
tableIdentifier.getTableName());
+ Map<String, String> tableParameters = table.getParameters();
+ tableParameters.putAll(propertiesToUpdate);
+ table.setParameters(tableParameters);
+ metaStoreClient.alter_table(
+ tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
table);
+ } catch (TableNotFoundException | TException e) {
+ throw new CatalogSyncException(
+ "failed to update last time synced properties for table" +
tableIdentifier, e);
+ }
+ }
+
+ private Partition createPartition(
+ HierarchicalTableIdentifier tableIdentifier,
+ CatalogPartition partition,
+ StorageDescriptor sd) {
+ StorageDescriptor partitionSd = new StorageDescriptor();
+ partitionSd.setCols(sd.getCols());
+ partitionSd.setInputFormat(sd.getInputFormat());
+ partitionSd.setOutputFormat(sd.getOutputFormat());
+ partitionSd.setSerdeInfo(sd.getSerdeInfo());
+ partitionSd.setLocation(partition.getStorageLocation());
+
+ return new Partition(
+ partition.getValues(),
+ tableIdentifier.getDatabaseName(),
+ tableIdentifier.getTableName(),
+ 0,
+ 0,
+ partitionSd,
+ null);
+ }
+}
diff --git
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
index 537de834..acfdfbd9 100644
---
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
+++
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogSyncClient.java
@@ -22,6 +22,7 @@ import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifi
import java.time.ZonedDateTime;
import java.util.Collections;
+import java.util.Optional;
import lombok.extern.log4j.Log4j2;
@@ -36,7 +37,9 @@ import org.apache.thrift.TException;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.catalog.CatalogUtils;
import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.InternalTable;
@@ -55,6 +58,7 @@ public class HMSCatalogSyncClient implements
CatalogSyncClient<Table> {
private Configuration configuration;
private IMetaStoreClient metaStoreClient;
private CatalogTableBuilder<Table, Table> tableBuilder;
+ private Optional<CatalogPartitionSyncTool> partitionSyncTool;
// For loading the instance using ServiceLoader
public HMSCatalogSyncClient() {}
@@ -70,12 +74,14 @@ public class HMSCatalogSyncClient implements
CatalogSyncClient<Table> {
HMSCatalogConfig hmsCatalogConfig,
Configuration configuration,
IMetaStoreClient metaStoreClient,
- CatalogTableBuilder tableBuilder) {
+ CatalogTableBuilder tableBuilder,
+ Optional<CatalogPartitionSyncTool> partitionSyncTool) {
this.catalogConfig = catalogConfig;
this.hmsCatalogConfig = hmsCatalogConfig;
this.configuration = configuration;
this.metaStoreClient = metaStoreClient;
this.tableBuilder = tableBuilder;
+ this.partitionSyncTool = partitionSyncTool;
}
@Override
@@ -145,6 +151,8 @@ public class HMSCatalogSyncClient implements
CatalogSyncClient<Table> {
} catch (TException e) {
throw new CatalogSyncException("Failed to create table: " +
tableIdentifier.getId(), e);
}
+
+ partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table,
tableIdentifier));
}
@Override
@@ -158,6 +166,8 @@ public class HMSCatalogSyncClient implements
CatalogSyncClient<Table> {
} catch (TException e) {
throw new CatalogSyncException("Failed to refresh table: " +
tableIdentifier.getId(), e);
}
+
+ partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table,
tableIdentifier));
}
@Override
@@ -194,7 +204,15 @@ public class HMSCatalogSyncClient implements
CatalogSyncClient<Table> {
} catch (MetaException | HiveException e) {
throw new CatalogSyncException("HiveMetastoreClient could not be
created", e);
}
- this.tableBuilder = HMSCatalogTableBuilderFactory.getInstance(tableFormat,
this.configuration);
+ this.tableBuilder =
+ HMSCatalogTableBuilderFactory.getTableBuilder(
+ tableFormat, hmsCatalogConfig, this.configuration);
+ this.partitionSyncTool =
+ CatalogUtils.getPartitionSyncTool(
+ tableFormat,
+ hmsCatalogConfig.getPartitionExtractorClass(),
+ new HMSCatalogPartitionSyncOperations(metaStoreClient,
hmsCatalogConfig),
+ configuration);
}
/**
diff --git
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
index 56c18279..43a77700 100644
---
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
+++
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/HMSCatalogTableBuilderFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.hms.table.DeltaHMSCatalogTableBuilder;
+import org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder;
import org.apache.xtable.hms.table.IcebergHMSCatalogTableBuilder;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
@@ -40,13 +41,15 @@ import org.apache.xtable.model.storage.TableFormat;
public class HMSCatalogTableBuilderFactory {
- static CatalogTableBuilder<Table, Table> getInstance(
- String tableFormat, Configuration configuration) {
+ public static CatalogTableBuilder<Table, Table> getTableBuilder(
+ String tableFormat, HMSCatalogConfig hmsCatalogConfig, Configuration
configuration) {
switch (tableFormat) {
case TableFormat.ICEBERG:
return new IcebergHMSCatalogTableBuilder(configuration);
case TableFormat.DELTA:
return new DeltaHMSCatalogTableBuilder();
+ case TableFormat.HUDI:
+ return new HudiHMSCatalogTableBuilder(hmsCatalogConfig, configuration);
default:
throw new NotSupportedException("Unsupported table format: " +
tableFormat);
}
diff --git
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
new file mode 100644
index 00000000..cc4dc3df
--- /dev/null
+++
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hms.table;
+
+import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ConfigUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.hms.HMSCatalogConfig;
+import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+import org.apache.xtable.hudi.catalog.HudiInputFormatUtils;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.TableFormat;
+
+@Log4j2
+public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table,
Table> {
+
+ private final HudiTableManager hudiTableManager;
+ private final HMSSchemaExtractor schemaExtractor;
+ private final HMSCatalogConfig hmsCatalogConfig;
+ private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor;
+
+ private HoodieTableMetaClient metaClient;
+
+ protected static final String HUDI_METADATA_CONFIG =
"hudi.metadata-listing-enabled";
+
+ public HudiHMSCatalogTableBuilder(
+ HMSCatalogConfig hmsCatalogConfig, Configuration configuration) {
+ this.hudiTableManager = HudiTableManager.of(configuration);
+ this.schemaExtractor = HMSSchemaExtractor.getInstance();
+ this.hmsCatalogConfig = hmsCatalogConfig;
+ this.tablePropertiesExtractor =
HudiCatalogTablePropertiesExtractor.getInstance();
+ }
+
+ @VisibleForTesting
+ HudiHMSCatalogTableBuilder(
+ HMSCatalogConfig hmsCatalogConfig,
+ HMSSchemaExtractor schemaExtractor,
+ HudiTableManager hudiTableManager,
+ HoodieTableMetaClient metaClient,
+ HudiCatalogTablePropertiesExtractor propertiesExtractor) {
+ this.hudiTableManager = hudiTableManager;
+ this.schemaExtractor = schemaExtractor;
+ this.metaClient = metaClient;
+ this.hmsCatalogConfig = hmsCatalogConfig;
+ this.tablePropertiesExtractor = propertiesExtractor;
+ }
+
+ HoodieTableMetaClient getMetaClient(String basePath) {
+ if (metaClient == null) {
+ Optional<HoodieTableMetaClient> metaClientOpt =
+ hudiTableManager.loadTableMetaClientIfExists(basePath);
+
+ if (!metaClientOpt.isPresent()) {
+ throw new CatalogSyncException(
+ "Failed to get meta client since table is not present in the base
path " + basePath);
+ }
+
+ metaClient = metaClientOpt.get();
+ }
+ return metaClient;
+ }
+
+ @Override
+ public Table getCreateTableRequest(
+ InternalTable table, CatalogTableIdentifier catalogTableIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ Table newTb = new Table();
+ newTb.setDbName(tableIdentifier.getDatabaseName());
+ newTb.setTableName(tableIdentifier.getTableName());
+ try {
+ newTb.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+ } catch (IOException e) {
+ throw new CatalogSyncException(
+ "Failed to set owner for hms table: " +
tableIdentifier.getTableName(), e);
+ }
+
+ newTb.setCreateTime((int) Instant.now().toEpochMilli());
+ Map<String, String> tableProperties = getTableProperties(table,
table.getReadSchema());
+ newTb.setParameters(tableProperties);
+ newTb.setSd(getStorageDescriptor(table));
+ newTb.setPartitionKeys(getSchemaPartitionKeys(table));
+ return newTb;
+ }
+
+ @Override
+ public Table getUpdateTableRequest(
+ InternalTable table, Table hmsTable, CatalogTableIdentifier
tableIdentifier) {
+ Map<String, String> parameters = hmsTable.getParameters();
+ Map<String, String> tableParameters = hmsTable.getParameters();
+ tableParameters.putAll(getTableProperties(table, table.getReadSchema()));
+ hmsTable.setParameters(tableParameters);
+ hmsTable.setSd(getStorageDescriptor(table));
+
+ hmsTable.setParameters(parameters);
+ hmsTable.getSd().setCols(schemaExtractor.toColumns(TableFormat.HUDI,
table.getReadSchema()));
+ return hmsTable;
+ }
+
+ private Map<String, String> getTableProperties(InternalTable table,
InternalSchema schema) {
+ List<String> partitionFields =
+ table.getPartitioningFields().stream()
+ .map(field -> field.getSourceField().getName())
+ .collect(Collectors.toList());
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put(HUDI_METADATA_CONFIG, "true");
+ Map<String, String> sparkTableProperties =
+ tablePropertiesExtractor.getSparkTableProperties(
+ partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(),
schema);
+ tableProperties.putAll(sparkTableProperties);
+ return tableProperties;
+ }
+
+ private StorageDescriptor getStorageDescriptor(InternalTable table) {
+ final StorageDescriptor storageDescriptor = new StorageDescriptor();
+ storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI,
table.getReadSchema()));
+ storageDescriptor.setLocation(table.getBasePath());
+ HoodieFileFormat fileFormat =
+
getMetaClient(table.getBasePath()).getTableConfig().getBaseFileFormat();
+ String inputFormatClassName =
HudiInputFormatUtils.getInputFormatClassName(fileFormat, false);
+ String outputFormatClassName =
HudiInputFormatUtils.getOutputFormatClassName(fileFormat);
+ String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat);
+ storageDescriptor.setInputFormat(inputFormatClassName);
+ storageDescriptor.setOutputFormat(outputFormatClassName);
+ Map<String, String> serdeProperties =
getSerdeProperties(table.getBasePath());
+ SerDeInfo serDeInfo = new SerDeInfo();
+ serDeInfo.setSerializationLib(serdeClassName);
+ serDeInfo.setParameters(serdeProperties);
+ storageDescriptor.setSerdeInfo(serDeInfo);
+ return storageDescriptor;
+ }
+
+ private static Map<String, String> getSerdeProperties(String basePath) {
+ Map<String, String> serdeProperties = new HashMap<>();
+ serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
+ serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE,
String.valueOf(false));
+ return serdeProperties;
+ }
+
+ private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) {
+
+ List<InternalPartitionField> partitioningFields =
table.getPartitioningFields();
+ Map<String, FieldSchema> fieldSchemaMap =
+ schemaExtractor.toColumns(TableFormat.HUDI,
table.getReadSchema()).stream()
+ .collect(Collectors.toMap(FieldSchema::getName, field -> field));
+
+ return partitioningFields.stream()
+ .map(
+ partitionField -> {
+ if
(fieldSchemaMap.containsKey(partitionField.getSourceField().getName())) {
+ return
fieldSchemaMap.get(partitionField.getSourceField().getName());
+ } else {
+ return new
FieldSchema(partitionField.getSourceField().getName(), "string", "");
+ }
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java
similarity index 95%
rename from
xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
rename to
xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java
index cd76c6be..62531e75 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncClientTestBase.java
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/HMSCatalogSyncTestBase.java
@@ -43,7 +43,7 @@ import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.model.storage.TableFormat;
-public class HMSCatalogSyncClientTestBase {
+public class HMSCatalogSyncTestBase {
@Mock protected IMetaStoreClient mockMetaStoreClient;
@Mock protected HMSCatalogConfig mockHMSCatalogConfig;
@@ -139,6 +139,14 @@ public class HMSCatalogSyncClientTestBase {
.readSchema(INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
+ protected static final InternalTable TEST_EVOLVED_HUDI_INTERNAL_TABLE =
+ InternalTable.builder()
+ .basePath(TEST_BASE_PATH)
+ .tableFormat(TableFormat.HUDI)
+ .readSchema(UPDATED_INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+ .build();
+
protected static final ThreePartHierarchicalTableIdentifier
TEST_CATALOG_TABLE_IDENTIFIER =
new ThreePartHierarchicalTableIdentifier(TEST_HMS_DATABASE,
TEST_HMS_TABLE);
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..b1422d94
--- /dev/null
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogPartitionSyncOperations.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hms;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import lombok.SneakyThrows;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.thrift.TException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHMSCatalogPartitionSyncOperations extends
HMSCatalogSyncTestBase {
+
+ private CatalogPartitionSyncOperations hmsPartitionSyncOperations;
+
+ void setupCommonMocks() {
+ hmsPartitionSyncOperations =
+ new HMSCatalogPartitionSyncOperations(mockMetaStoreClient,
mockHMSCatalogConfig);
+ }
+
+ @SneakyThrows
+ @Test
+ void testGetAllPartitions() {
+ setupCommonMocks();
+
+ Partition hivePartition1 = new Partition();
+ hivePartition1.setValues(Collections.singletonList("value1"));
+ StorageDescriptor sd1 = new StorageDescriptor();
+ sd1.setLocation("location1");
+ hivePartition1.setSd(sd1);
+
+ Partition hivePartition2 = new Partition();
+ hivePartition2.setValues(Collections.singletonList("value2"));
+ StorageDescriptor sd2 = new StorageDescriptor();
+ sd2.setLocation("location2");
+ hivePartition2.setSd(sd2);
+
+ List<Partition> hivePartitions = Arrays.asList(hivePartition1,
hivePartition2);
+ when(mockMetaStoreClient.listPartitions(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+ (short) -1))
+ .thenReturn(hivePartitions);
+ List<CatalogPartition> partitions =
+
hmsPartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER);
+
+ assertEquals(2, partitions.size());
+ assertEquals("location1", partitions.get(0).getStorageLocation());
+ assertEquals(1, partitions.get(0).getValues().size());
+ assertEquals("value1", partitions.get(0).getValues().get(0));
+ assertEquals("location2", partitions.get(1).getStorageLocation());
+ assertEquals(1, partitions.get(1).getValues().size());
+ assertEquals("value2", partitions.get(1).getValues().get(0));
+
+ verify(mockMetaStoreClient, times(1))
+ .listPartitions(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+ (short) -1);
+ }
+
+ @Test
+ void testAddPartitionsToTableSuccess() throws Exception {
+ setupCommonMocks();
+ when(mockHMSCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+ CatalogPartition partition1 =
+ new CatalogPartition(Collections.singletonList("value1"), "location1");
+ CatalogPartition partition2 =
+ new CatalogPartition(Collections.singletonList("value2"), "location2");
+ List<CatalogPartition> partitionsToAdd = Arrays.asList(partition1,
partition2);
+
+ StorageDescriptor tableSd = new StorageDescriptor();
+ tableSd.setCols(Collections.emptyList());
+ tableSd.setInputFormat("inputFormat");
+ tableSd.setOutputFormat("outputFormat");
+ tableSd.setSerdeInfo(new SerDeInfo());
+
+ Table table = new Table();
+ table.setSd(tableSd);
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenReturn(table);
+
+ // Execute the method
+
hmsPartitionSyncOperations.addPartitionsToTable(TEST_CATALOG_TABLE_IDENTIFIER,
partitionsToAdd);
+
+ // Verify behavior
+ ArgumentCaptor<List<Partition>> partitionCaptor =
ArgumentCaptor.forClass(List.class);
+
+ verify(mockMetaStoreClient, times(1))
+ .add_partitions(partitionCaptor.capture(), eq(true), eq(false));
+
+ // Validate the captured partitions
+ List<Partition> capturedPartitions = partitionCaptor.getValue();
+ assertEquals(2, capturedPartitions.size());
+
+ Partition capturedPartition1 = capturedPartitions.get(0);
+ assertEquals(partition1.getValues(), capturedPartition1.getValues());
+ assertEquals(partition1.getStorageLocation(),
capturedPartition1.getSd().getLocation());
+
+ Partition capturedPartition2 = capturedPartitions.get(1);
+ assertEquals(partition2.getValues(), capturedPartition2.getValues());
+ assertEquals(partition2.getStorageLocation(),
capturedPartition2.getSd().getLocation());
+ }
+
+ @Test
+ void testAddPartitionsToTableThrowsException() throws Exception {
+ setupCommonMocks();
+ List<CatalogPartition> partitionsToAdd =
+ Collections.singletonList(
+ new CatalogPartition(Collections.singletonList("value1"),
"location1"));
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenThrow(new TException("Test exception"));
+
+ // Execute and validate exception
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ hmsPartitionSyncOperations.addPartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER, partitionsToAdd));
+
+ assertInstanceOf(TException.class, exception.getCause());
+ verify(mockMetaStoreClient, times(1))
+ .getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+ verify(mockMetaStoreClient, never()).add_partitions(anyList(),
anyBoolean(), anyBoolean());
+ }
+
+ @Test
+ void testUpdatePartitionsToTableSuccess() throws Exception {
+ setupCommonMocks();
+
+ CatalogPartition changedPartition1 =
+ new CatalogPartition(Collections.singletonList("value1"), "location1");
+ CatalogPartition changedPartition2 =
+ new CatalogPartition(Collections.singletonList("value2"), "location2");
+ List<CatalogPartition> changedPartitions =
Arrays.asList(changedPartition1, changedPartition2);
+
+ StorageDescriptor tableSd = new StorageDescriptor();
+ tableSd.setCols(Collections.emptyList());
+ tableSd.setInputFormat("inputFormat");
+ tableSd.setOutputFormat("outputFormat");
+ tableSd.setSerdeInfo(new SerDeInfo());
+
+ Table table = new Table();
+ table.setSd(tableSd);
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenReturn(table);
+
+ // Execute the method
+ hmsPartitionSyncOperations.updatePartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER, changedPartitions);
+
+ // Capture calls to dropPartition and add_partition
+ ArgumentCaptor<List<Partition>> partitionCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(mockMetaStoreClient, times(1))
+ .alter_partitions(
+ eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()),
+ eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()),
+ partitionCaptor.capture());
+ List<Partition> capturedPartitions = partitionCaptor.getValue();
+
+ assertEquals(2, capturedPartitions.size());
+
+ Partition capturedPartition1 = capturedPartitions.get(0);
+ assertEquals(changedPartition1.getValues(),
capturedPartition1.getValues());
+ assertEquals(changedPartition1.getStorageLocation(),
capturedPartition1.getSd().getLocation());
+
+ Partition capturedPartition2 = capturedPartitions.get(1);
+ assertEquals(changedPartition2.getValues(),
capturedPartition2.getValues());
+ assertEquals(changedPartition2.getStorageLocation(),
capturedPartition2.getSd().getLocation());
+ }
+
+ @Test
+ void testUpdatePartitionsToTableThrowsException() throws Exception {
+ setupCommonMocks();
+ CatalogPartition changedPartition =
+ new CatalogPartition(Collections.singletonList("value1"), "location1");
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenThrow(new TException("Test exception"));
+
+ // Execute and validate exception
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ hmsPartitionSyncOperations.updatePartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER,
Collections.singletonList(changedPartition)));
+
+ assertInstanceOf(TException.class, exception.getCause());
+
+ verify(mockMetaStoreClient, times(1))
+ .getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+ verify(mockMetaStoreClient, never()).alter_partitions(anyString(),
anyString(), anyList());
+ }
+
+ @Test
+ void testDropPartitionsSuccess() throws Exception {
+ setupCommonMocks();
+
+ CatalogPartition partition1 =
+ new CatalogPartition(Collections.singletonList("value1"), "location1");
+ CatalogPartition partition2 =
+ new CatalogPartition(Collections.singletonList("value2"), "location2");
+ List<CatalogPartition> partitionsToDrop = Arrays.asList(partition1,
partition2);
+
+ // Execute the method
+ hmsPartitionSyncOperations.dropPartitions(TEST_CATALOG_TABLE_IDENTIFIER,
partitionsToDrop);
+
+ // Capture calls to dropPartition
+ ArgumentCaptor<List<String>> partitionValuesCaptor =
ArgumentCaptor.forClass(List.class);
+
+ verify(mockMetaStoreClient, times(2))
+ .dropPartition(
+ eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()),
+ eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()),
+ partitionValuesCaptor.capture(),
+ eq(false));
+
+ // Validate captured arguments
+ List<List<String>> capturedPartitionValues =
partitionValuesCaptor.getAllValues();
+ assertEquals(2, capturedPartitionValues.size());
+ assertEquals(partition1.getValues(), capturedPartitionValues.get(0));
+ assertEquals(partition2.getValues(), capturedPartitionValues.get(1));
+ }
+
+ @Test
+ void testDropPartitionsEmptyList() throws Exception {
+ setupCommonMocks();
+ List<CatalogPartition> partitionsToDrop = Collections.emptyList();
+
+ hmsPartitionSyncOperations.dropPartitions(TEST_CATALOG_TABLE_IDENTIFIER,
partitionsToDrop);
+
+ // Verify no calls to dropPartition
+ verify(mockMetaStoreClient, never())
+ .dropPartition(anyString(), anyString(), anyList(), anyBoolean());
+ }
+
+ @Test
+ void testDropPartitionsThrowsException() throws Exception {
+ setupCommonMocks();
+
+ CatalogPartition partition1 =
+ new CatalogPartition(Collections.singletonList("value1"), "location1");
+ List<CatalogPartition> partitionsToDrop =
Collections.singletonList(partition1);
+
+ doThrow(new TException("Test exception"))
+ .when(mockMetaStoreClient)
+ .dropPartition(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+ partition1.getValues(),
+ false);
+
+ // Execute and validate exception
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ hmsPartitionSyncOperations.dropPartitions(
+ TEST_CATALOG_TABLE_IDENTIFIER, partitionsToDrop));
+
+ assertInstanceOf(TException.class, exception.getCause());
+
+ // Verify dropPartition call is made once
+ verify(mockMetaStoreClient, times(1))
+ .dropPartition(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+ partition1.getValues(),
+ false);
+ }
+
+ @Test
+ void testGetTablePropertiesSuccess() throws Exception {
+ setupCommonMocks();
+
+ List<String> lastSyncedKeys = Arrays.asList("key1", "key2", "key3");
+
+ Map<String, String> mockParameters = new HashMap<>();
+ mockParameters.put("key1", "value1");
+ mockParameters.put("key2", "value2");
+ mockParameters.put("irrelevantKey", "irrelevantKey");
+
+ Table mockTable = new Table();
+ mockTable.setParameters(mockParameters);
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenReturn(mockTable);
+
+ // Execute the method
+ Map<String, String> result =
+ hmsPartitionSyncOperations.getTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys);
+
+ // Validate the result
+ assertEquals(2, result.size());
+ assertEquals("value1", result.get("key1"));
+ assertEquals("value2", result.get("key2"));
+ assertNull(result.get("key3")); // key3 is not in mockParameters
+ }
+
+ @Test
+ void testGetTablePropertiesThrowsException() throws Exception {
+ setupCommonMocks();
+
+ List<String> lastSyncedKeys = Arrays.asList("key1", "key2");
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenThrow(new TException("Test exception"));
+
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ hmsPartitionSyncOperations.getTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys));
+
+ assertInstanceOf(TException.class, exception.getCause());
+ }
+
+ @Test
+ void testUpdateTablePropertiesSuccess() throws Exception {
+ setupCommonMocks();
+
+ Map<String, String> lastTimeSyncedProperties = new HashMap<>();
+ lastTimeSyncedProperties.put("last_synced_time", "2023-12-01T12:00:00Z");
+ lastTimeSyncedProperties.put("last_modified_by", "user123");
+
+ Map<String, String> existingParameters = new HashMap<>();
+ existingParameters.put("existing_key", "existing_value");
+
+ Table mockTable = new Table();
+ mockTable.setParameters(existingParameters);
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenReturn(mockTable);
+
+ // Execute the method
+ hmsPartitionSyncOperations.updateTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties);
+
+ // Verify behavior
+ verify(mockMetaStoreClient, times(1))
+ .alter_table(
+ eq(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()),
+ eq(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()),
+ eq(mockTable));
+
+ // Validate updated parameters
+ Map<String, String> updatedParameters = mockTable.getParameters();
+ assertEquals(3, updatedParameters.size());
+ assertEquals("2023-12-01T12:00:00Z",
updatedParameters.get("last_synced_time"));
+ assertEquals("user123", updatedParameters.get("last_modified_by"));
+ assertEquals("existing_value", updatedParameters.get("existing_key"));
+ }
+
+ @Test
+ void testUpdateTablePropertiesNoChanges() throws Exception {
+ setupCommonMocks();
+
+ // Empty properties map
+ Map<String, String> lastTimeSyncedProperties = Collections.emptyMap();
+
+ // Execute the method
+ hmsPartitionSyncOperations.updateTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties);
+
+ // Verify no calls to MetaStoreClient
+ verify(mockMetaStoreClient, never()).getTable(anyString(), anyString());
+ verify(mockMetaStoreClient, never()).alter_table(anyString(), anyString(),
any());
+ }
+
+ @Test
+ void testUpdateTablePropertiesThrowsException() throws Exception {
+ setupCommonMocks();
+
+ Map<String, String> lastTimeSyncedProperties =
+ Collections.singletonMap("last_synced_time", "2023-12-01T12:00:00Z");
+
+ when(mockMetaStoreClient.getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName()))
+ .thenThrow(new TException("Test exception"));
+
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ hmsPartitionSyncOperations.updateTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastTimeSyncedProperties));
+
+ assertInstanceOf(TException.class, exception.getCause());
+
+ // Verify no alter table calls are made
+ verify(mockMetaStoreClient, times(1))
+ .getTable(
+ TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
+ TEST_CATALOG_TABLE_IDENTIFIER.getTableName());
+ verify(mockMetaStoreClient, never()).alter_table(anyString(), anyString(),
any());
+ }
+}
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
index fad22ad2..9fae7eb2 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogSyncClient.java
@@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@@ -37,6 +38,7 @@ import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collections;
+import java.util.Optional;
import java.util.ServiceLoader;
import lombok.SneakyThrows;
@@ -54,6 +56,7 @@ import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
@@ -61,29 +64,29 @@ import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.spi.sync.CatalogSyncClient;
@ExtendWith(MockitoExtension.class)
-public class TestHMSCatalogSyncClient extends HMSCatalogSyncClientTestBase {
+public class TestHMSCatalogSyncClient extends HMSCatalogSyncTestBase {
@Mock private CatalogTableBuilder<Table, Table> mockTableBuilder;
+ @Mock private CatalogPartitionSyncTool mockPartitionSyncTool;
private HMSCatalogSyncClient hmsCatalogSyncClient;
- private HMSCatalogSyncClient createHMSCatalogSyncClient() {
+ private HMSCatalogSyncClient createHMSCatalogSyncClient(boolean
includePartitionSyncTool) {
+ Optional<CatalogPartitionSyncTool> partitionSyncToolOpt =
+ includePartitionSyncTool ? Optional.of(mockPartitionSyncTool) :
Optional.empty();
return new HMSCatalogSyncClient(
TEST_CATALOG_CONFIG,
mockHMSCatalogConfig,
testConfiguration,
mockMetaStoreClient,
- mockTableBuilder);
- }
-
- void setupCommonMocks() {
- hmsCatalogSyncClient = createHMSCatalogSyncClient();
+ mockTableBuilder,
+ partitionSyncToolOpt);
}
@SneakyThrows
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testHasDatabase(boolean isDbPresent) {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
Database db = new Database(TEST_HMS_DATABASE, null, null,
Collections.emptyMap());
if (isDbPresent) {
when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE)).thenReturn(db);
@@ -103,7 +106,7 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
@SneakyThrows
@Test
void testHasDatabaseFailure() {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
when(mockMetaStoreClient.getDatabase(TEST_HMS_DATABASE))
.thenThrow(new TException("something went wrong"));
CatalogSyncException exception =
@@ -119,7 +122,7 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testGetTable(boolean isTablePresent) {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
Table table = newTable(TEST_HMS_DATABASE, TEST_HMS_TABLE);
if (isTablePresent) {
when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE,
TEST_HMS_TABLE)).thenReturn(table);
@@ -139,7 +142,7 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
@SneakyThrows
@Test
void testGetTableFailure() {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
when(mockMetaStoreClient.getTable(TEST_HMS_DATABASE, TEST_HMS_TABLE))
.thenThrow(new TException("something went wrong"));
CatalogSyncException exception =
@@ -156,7 +159,7 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void testCreateDatabase(boolean shouldFail) {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
Database database = newDatabase(TEST_HMS_DATABASE);
if (shouldFail) {
Mockito.doThrow(new TException("something went wrong"))
@@ -179,7 +182,7 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
@ParameterizedTest
@ValueSource(booleans = {false, true})
void testDropTable(boolean shouldFail) {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
if (shouldFail) {
Mockito.doThrow(new TException("something went wrong"))
.when(mockMetaStoreClient)
@@ -200,9 +203,10 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
}
@SneakyThrows
- @Test
- void testCreateTable_Success() {
- setupCommonMocks();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateTable_Success(boolean syncPartitions) {
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
Table testTable = new Table();
when(mockTableBuilder.getCreateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER))
@@ -211,12 +215,20 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
verify(mockMetaStoreClient, times(1)).createTable(testTable);
verify(mockTableBuilder, times(1))
.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE,
TEST_CATALOG_TABLE_IDENTIFIER);
+ if (syncPartitions) {
+ verify(mockPartitionSyncTool, times(1))
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ } else {
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ }
}
@SneakyThrows
- @Test
- void testCreateTable_ErrorGettingTableInput() {
- setupCommonMocks();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateTable_ErrorGettingTableInput(boolean syncPartitions) {
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
// error when getting iceberg table input
doThrow(new RuntimeException("something went wrong"))
@@ -230,12 +242,15 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
verify(mockTableBuilder, times(1))
.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockMetaStoreClient, never()).createTable(any());
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
@SneakyThrows
- @Test
- void testCreateTable_ErrorCreatingTable() {
- setupCommonMocks();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateTable_ErrorCreatingTable(boolean syncPartitions) {
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
// error when creating table
Table testTable = new Table();
@@ -257,12 +272,15 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
verify(mockTableBuilder, times(1))
.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockMetaStoreClient, times(1)).createTable(testTable);
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
@SneakyThrows
- @Test
- void testRefreshTable_Success() {
- setupCommonMocks();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRefreshTable_Success(boolean syncPartitions) {
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
Table origTable = new Table();
Table updatedTable = new Table(origTable);
updatedTable.putToParameters(METADATA_LOCATION_PROP,
ICEBERG_METADATA_FILE_LOCATION_V2);
@@ -276,12 +294,20 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
verify(mockTableBuilder, times(1))
.getUpdateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, origTable,
TEST_CATALOG_TABLE_IDENTIFIER);
+ if (syncPartitions) {
+ verify(mockPartitionSyncTool, times(1))
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ } else {
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ }
}
@SneakyThrows
- @Test
- void testRefreshTable_ErrorGettingUpdatedTable() {
- setupCommonMocks();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRefreshTable_ErrorGettingUpdatedTable(boolean syncPartitions) {
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
// error when getting iceberg table input
Table testTable = new Table();
@@ -298,12 +324,15 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
.getUpdateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, testTable,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockMetaStoreClient, never()).alter_table(any(), any(), any());
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
@SneakyThrows
- @Test
- void testRefreshTable_ErrorRefreshingTable() {
- setupCommonMocks();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRefreshTable_ErrorRefreshingTable(boolean syncPartitions) {
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(syncPartitions);
// error when creating table
Table origTable = new Table();
@@ -329,12 +358,14 @@ public class TestHMSCatalogSyncClient extends
HMSCatalogSyncClientTestBase {
TEST_ICEBERG_INTERNAL_TABLE, origTable,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockMetaStoreClient, times(1))
.alter_table(TEST_HMS_DATABASE, TEST_HMS_TABLE, updatedTable);
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
@SneakyThrows
@Test
void testCreateOrReplaceTable() {
- setupCommonMocks();
+ hmsCatalogSyncClient = createHMSCatalogSyncClient(false);
ZonedDateTime zonedDateTime =
Instant.ofEpochMilli(System.currentTimeMillis()).atZone(ZoneId.systemDefault());
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
index 45d45a2d..31ea33c9 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/TestHMSCatalogTableBuilderFactory.java
@@ -18,10 +18,10 @@
package org.apache.xtable.hms;
-import static org.apache.xtable.hms.HMSCatalogSyncClientTestBase.FIELD_SCHEMA;
-import static
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
-import static
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_DATABASE;
-import static
org.apache.xtable.hms.HMSCatalogSyncClientTestBase.TEST_HMS_TABLE;
+import static org.apache.xtable.hms.HMSCatalogSyncTestBase.FIELD_SCHEMA;
+import static
org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_CATALOG_TABLE_IDENTIFIER;
+import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_HMS_DATABASE;
+import static org.apache.xtable.hms.HMSCatalogSyncTestBase.TEST_HMS_TABLE;
import static
org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableParameters;
import static
org.apache.xtable.hms.table.TestIcebergHMSCatalogTableBuilder.getTestHmsTableStorageDescriptor;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
index 1a7d0cb4..4e799915 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestDeltaHMSCatalogTableBuilder.java
@@ -38,11 +38,11 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
-import org.apache.xtable.hms.HMSCatalogSyncClientTestBase;
+import org.apache.xtable.hms.HMSCatalogSyncTestBase;
import org.apache.xtable.model.storage.TableFormat;
@ExtendWith(MockitoExtension.class)
-public class TestDeltaHMSCatalogTableBuilder extends
HMSCatalogSyncClientTestBase {
+public class TestDeltaHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
private DeltaHMSCatalogTableBuilder mockDeltaHmsCatalogSyncRequestProvider;
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
new file mode 100644
index 00000000..adf88c99
--- /dev/null
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.hms.table;
+
+import static
org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import org.apache.xtable.hms.HMSCatalogSyncTestBase;
+import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+
+@ExtendWith(MockitoExtension.class)
+public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
+
+ @Mock private HoodieTableMetaClient mockMetaClient;
+ @Mock private HudiTableManager mockHudiTableManager;
+ @Mock private HoodieTableConfig mockTableConfig;
+ @Mock private HudiCatalogTablePropertiesExtractor
mockTablePropertiesExtractor;
+
+ private HudiHMSCatalogTableBuilder hudiHMSCatalogTableBuilder;
+
+ private HudiHMSCatalogTableBuilder
createMockHudiHMSCatalogSyncRequestProvider() {
+ return new HudiHMSCatalogTableBuilder(
+ mockHMSCatalogConfig,
+ HMSSchemaExtractor.getInstance(),
+ mockHudiTableManager,
+ mockMetaClient,
+ mockTablePropertiesExtractor);
+ }
+
+ void setupCommonMocks() {
+ hudiHMSCatalogTableBuilder = createMockHudiHMSCatalogSyncRequestProvider();
+ when(mockHMSCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000);
+ }
+
+ void setupMetaClientMocks() {
+
when(mockTableConfig.getBaseFileFormat()).thenReturn(HoodieFileFormat.PARQUET);
+ when(mockMetaClient.getTableConfig()).thenReturn(mockTableConfig);
+ }
+
+ @Test
+ void testGetCreateTableInput() {
+ setupCommonMocks();
+ setupMetaClientMocks();
+
+ List<String> partitionFields =
+ TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
+ .map(partitionField -> partitionField.getSourceField().getName())
+ .collect(Collectors.toList());
+
+ Table table =
+ hudiHMSCatalogTableBuilder.getCreateTableRequest(
+ TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
+
+ ArgumentCaptor<List<String>> partitionsCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(mockTablePropertiesExtractor)
+ .getSparkTableProperties(
+ partitionsCaptor.capture(),
+ eq(""),
+ any(Integer.class),
+ eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
+ assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
+ assertEquals(partitionFields, partitionsCaptor.getValue());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
table.getTableName());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
table.getDbName());
+ assertEquals(3, table.getSd().getCols().size());
+ assertEquals(1, table.getPartitionKeys().size());
+ assertNotNull(table.getParameters());
+ assertFalse(table.getParameters().isEmpty());
+ assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+ }
+
+ @Test
+ void testGetUpdateTableInput() {
+ setupCommonMocks();
+ setupMetaClientMocks();
+
+ List<String> partitionFields =
+ TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
+ .map(partitionField -> partitionField.getSourceField().getName())
+ .collect(Collectors.toList());
+ Table table =
+ hudiHMSCatalogTableBuilder.getCreateTableRequest(
+ TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
+ Table updatedTable =
+ hudiHMSCatalogTableBuilder.getUpdateTableRequest(
+ TEST_EVOLVED_HUDI_INTERNAL_TABLE, table,
TEST_CATALOG_TABLE_IDENTIFIER);
+
+ ArgumentCaptor<List<String>> partitionsCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(mockTablePropertiesExtractor)
+ .getSparkTableProperties(
+ partitionsCaptor.capture(),
+ eq(""),
+ any(Integer.class),
+ eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
+ assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
+ assertEquals(partitionFields, partitionsCaptor.getValue());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
updatedTable.getTableName());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
updatedTable.getDbName());
+ assertEquals(4, updatedTable.getSd().getCols().size());
+ assertEquals(1, updatedTable.getPartitionKeys().size());
+ assertNotNull(updatedTable.getParameters());
+ assertFalse(table.getParameters().isEmpty());
+ assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+ }
+}
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
index 14d39c44..b1e8df77 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestIcebergHMSCatalogTableBuilder.java
@@ -52,11 +52,11 @@ import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.xtable.hms.HMSCatalogSyncClientTestBase;
+import org.apache.xtable.hms.HMSCatalogSyncTestBase;
import org.apache.xtable.hms.HMSSchemaExtractor;
@ExtendWith(MockitoExtension.class)
-public class TestIcebergHMSCatalogTableBuilder extends
HMSCatalogSyncClientTestBase {
+public class TestIcebergHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
@Mock private HadoopTables mockIcebergHadoopTables;
@Mock private BaseTable mockIcebergBaseTable;
diff --git
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 349b5ca9..899365e4 100644
---
a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++
b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -64,6 +64,7 @@ import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.model.storage.CatalogType;
+import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.reflection.ReflectionUtils;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
@@ -153,7 +154,9 @@ public class RunCatalogSync {
TargetTable targetTable =
TargetTable.builder()
.name(sourceTable.getName())
- .basePath(sourceTable.getBasePath())
+ .basePath(
+ getSourceTableLocation(
+ targetCatalogTableIdentifier.getTableFormat(),
sourceTable))
.namespace(sourceTable.getNamespace())
.formatName(targetCatalogTableIdentifier.getTableFormat())
.additionalProperties(sourceTable.getAdditionalProperties())
@@ -228,15 +231,25 @@ public class RunCatalogSync {
.additionalProperties(sourceProperties)
.build();
} else if (catalogConversionSource.isPresent()) {
+ TableIdentifier tableIdentifier =
sourceTableIdentifier.getTableIdentifier();
sourceTable =
- catalogConversionSource
- .get()
- .getSourceTable(
-
getCatalogTableIdentifier(sourceTableIdentifier.getTableIdentifier()));
+
catalogConversionSource.get().getSourceTable(getCatalogTableIdentifier(tableIdentifier));
+ if (tableIdentifier.getPartitionSpec() != null) {
+ sourceTable
+ .getAdditionalProperties()
+ .put(HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG,
tableIdentifier.getPartitionSpec());
+ }
}
return sourceTable;
}
+ static String getSourceTableLocation(String targetTableFormat, SourceTable
sourceTable) {
+ return sourceTable.getFormatName().equals(TableFormat.ICEBERG)
+ && targetTableFormat.equals(TableFormat.HUDI)
+ ? sourceTable.getDataPath()
+ : sourceTable.getBasePath();
+ }
+
static Map<String, ConversionSourceProvider> getConversionSourceProviders(
List<String> tableFormats,
TableFormatConverters tableFormatConverters,
@@ -345,6 +358,8 @@ public class RunCatalogSync {
* HierarchicalTableIdentifier}
*/
String hierarchicalId;
+ /** Specifies the partition spec of the table */
+ String partitionSpec;
}
/**
diff --git a/xtable-utilities/src/test/resources/catalogConfig.yaml
b/xtable-utilities/src/test/resources/catalogConfig.yaml
index 05b2df4b..88cff2b6 100644
--- a/xtable-utilities/src/test/resources/catalogConfig.yaml
+++ b/xtable-utilities/src/test/resources/catalogConfig.yaml
@@ -45,6 +45,7 @@ datasets:
- sourceCatalogTableIdentifier:
tableIdentifier:
hierarchicalId: "source-database-1.source-1"
+ partitionSpec: "cs_sold_date_sk:VALUE"
targetCatalogTableIdentifiers:
- catalogId: "target-1"
tableFormat: "DELTA"