This is an automated email from the ASF dual-hosted git repository.
vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 03e5b022 [590] Add Hudi Glue Catalog Sync Implementation
03e5b022 is described below
commit 03e5b02239db0008bd96b1b9f757e469a7b668d0
Author: Vamsi <[email protected]>
AuthorDate: Thu Feb 13 17:14:34 2025 +0530
[590] Add Hudi Glue Catalog Sync Implementation
---
xtable-aws/pom.xml | 8 +
.../org/apache/xtable/glue/GlueCatalogConfig.java | 11 +
.../xtable/glue/GlueCatalogConversionSource.java | 22 +-
.../glue/GlueCatalogPartitionSyncOperations.java | 331 +++++++++++++
.../apache/xtable/glue/GlueCatalogSyncClient.java | 38 +-
.../glue/GlueCatalogTableBuilderFactory.java | 5 +-
.../apache/xtable/glue/GlueCatalogTableUtils.java | 56 +++
.../apache/xtable/glue/GlueSchemaExtractor.java | 2 +-
.../glue/table/HudiGlueCatalogTableBuilder.java | 185 ++++++++
.../xtable/glue/GlueCatalogSyncTestBase.java | 7 +
.../TestGlueCatalogPartitionSyncOperations.java | 519 +++++++++++++++++++++
.../xtable/glue/TestGlueCatalogSyncClient.java | 87 +++-
.../table/TestHudiGlueCatalogTableBuilder.java | 94 ++--
.../HudiCatalogTablePropertiesExtractor.java | 36 +-
.../TestHudiCatalogTablePropertiesExtractor.java | 52 ++-
.../hms/table/HudiHMSCatalogTableBuilder.java | 35 +-
.../hms/table/TestHudiHMSCatalogTableBuilder.java | 40 +-
17 files changed, 1340 insertions(+), 188 deletions(-)
diff --git a/xtable-aws/pom.xml b/xtable-aws/pom.xml
index 099e04d3..7ec24cf9 100644
--- a/xtable-aws/pom.xml
+++ b/xtable-aws/pom.xml
@@ -36,6 +36,14 @@
<version>${project.version}</version>
</dependency>
+ <!-- Hudi dependencies -->
+ <dependency>
+ <groupId>org.apache.hudi</groupId>
+ <artifactId>hudi-hive-sync</artifactId>
+ <version>${hudi.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<!-- Hadoop dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
index 87443698..09e195f1 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java
@@ -28,6 +28,8 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,6 +56,15 @@ public class GlueCatalogConfig {
@JsonProperty("externalCatalog.glue.credentialsProviderClass")
private final String clientCredentialsProviderClass;
+ @JsonProperty("externalCatalog.glue.schema_string_length_thresh")
+ private int schemaLengthThreshold = 4000;
+
+ @JsonProperty("externalCatalog.glue.partition_extractor_class")
+ private String partitionExtractorClass =
MultiPartKeysValueExtractor.class.getName();
+
+ @JsonProperty("externalCatalog.glue.max_partitions_per_request")
+ private int maxPartitionsPerRequest = 1000;
+
/**
* In case a credentialsProviderClass is configured and require additional
properties for
* instantiation, those properties should start with {@link
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
index 370ae44a..dc905cef 100644
---
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
+++
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java
@@ -18,8 +18,6 @@
package org.apache.xtable.glue;
-import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
-
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -31,13 +29,10 @@ import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.conversion.SourceTable;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
-import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.spi.extractor.CatalogConversionSource;
import software.amazon.awssdk.services.glue.GlueClient;
-import software.amazon.awssdk.services.glue.model.GetTableRequest;
-import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.GlueException;
import software.amazon.awssdk.services.glue.model.Table;
@@ -61,19 +56,12 @@ public class GlueCatalogConversionSource implements
CatalogConversionSource {
@Override
public SourceTable getSourceTable(CatalogTableIdentifier tblIdentifier) {
- HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(tblIdentifier);
try {
- GetTableResponse response =
- glueClient.getTable(
- GetTableRequest.builder()
- .catalogId(glueCatalogConfig.getCatalogId())
- .databaseName(tableIdentifier.getDatabaseName())
- .name(tableIdentifier.getTableName())
- .build());
- Table table = response.table();
+ Table table =
+ GlueCatalogTableUtils.getTable(
+ glueClient, glueCatalogConfig.getCatalogId(), tblIdentifier);
if (table == null) {
- throw new IllegalStateException(
- String.format("table: %s is null", tableIdentifier.getId()));
+ throw new IllegalStateException(String.format("table: %s is null",
tblIdentifier.getId()));
}
String tableFormat = TableFormatUtils.getTableFormat(table.parameters());
@@ -91,7 +79,7 @@ public class GlueCatalogConversionSource implements
CatalogConversionSource {
.additionalProperties(tableProperties)
.build();
} catch (GlueException e) {
- throw new CatalogSyncException("Failed to get table: " +
tableIdentifier.getId(), e);
+ throw new CatalogSyncException("Failed to get table: " +
tblIdentifier.getId(), e);
}
}
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..8c2cff24
--- /dev/null
+++
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.glue;
+
+import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
+import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.hudi.common.util.CollectionUtils;
+
+import org.apache.xtable.catalog.CatalogPartition;
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest;
+import
software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequestEntry;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.PartitionInput;
+import software.amazon.awssdk.services.glue.model.PartitionValueList;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+
+@Log4j2
+public class GlueCatalogPartitionSyncOperations implements
CatalogPartitionSyncOperations {
+
+ private final GlueClient glueClient;
+ private final GlueCatalogConfig glueCatalogConfig;
+
+ public GlueCatalogPartitionSyncOperations(
+ GlueClient glueClient, GlueCatalogConfig glueCatalogConfig) {
+ this.glueClient = glueClient;
+ this.glueCatalogConfig = glueCatalogConfig;
+ }
+
+ @Override
+ public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier
catalogTableIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ try {
+ List<CatalogPartition> partitions = new ArrayList<>();
+ String nextToken = null;
+ do {
+ GetPartitionsResponse result =
+ glueClient.getPartitions(
+ GetPartitionsRequest.builder()
+ .databaseName(tableIdentifier.getDatabaseName())
+ .tableName(tableIdentifier.getTableName())
+ .nextToken(nextToken)
+ .build());
+ partitions.addAll(
+ result.partitions().stream()
+ .map(p -> new CatalogPartition(p.values(),
p.storageDescriptor().location()))
+ .collect(Collectors.toList()));
+ nextToken = result.nextToken();
+ } while (nextToken != null);
+ return partitions;
+ } catch (Exception e) {
+ throw new CatalogSyncException(
+ "Failed to get all partitions for table " + tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void addPartitionsToTable(
+ CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition>
partitionsToAdd) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ if (partitionsToAdd.isEmpty()) {
+ log.info("No partitions to add for {}", tableIdentifier);
+ return;
+ }
+ log.info("Adding {} CatalogPartition(s) in table {}",
partitionsToAdd.size(), tableIdentifier);
+ try {
+ Table table =
+ GlueCatalogTableUtils.getTable(
+ glueClient, glueCatalogConfig.getCatalogId(),
catalogTableIdentifier);
+ StorageDescriptor sd = table.storageDescriptor();
+ List<PartitionInput> partitionInputs =
+ partitionsToAdd.stream()
+ .map(partition -> createPartitionInput(table, partition))
+ .collect(Collectors.toList());
+
+ List<BatchCreatePartitionResponse> responses = new ArrayList<>();
+
+ CollectionUtils.batches(partitionInputs,
glueCatalogConfig.getMaxPartitionsPerRequest())
+ .forEach(
+ batch -> {
+ BatchCreatePartitionRequest request =
+ BatchCreatePartitionRequest.builder()
+ .databaseName(tableIdentifier.getDatabaseName())
+ .tableName(tableIdentifier.getTableName())
+ .partitionInputList(batch)
+ .build();
+ responses.add(glueClient.batchCreatePartition(request));
+ });
+
+ responses.forEach(
+ response -> {
+ if (CollectionUtils.nonEmpty(response.errors())) {
+ if (response.errors().stream()
+ .allMatch(
+ (error) ->
+
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
+ log.warn("Partitions already exist in glue: {}",
response.errors());
+ } else {
+ throw new CatalogSyncException(
+ "Fail to add partitions to "
+ + tableIdentifier
+ + " with error(s): "
+ + response.errors());
+ }
+ }
+ });
+ } catch (Exception e) {
+ throw new CatalogSyncException("Fail to add partitions to " +
tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void updatePartitionsToTable(
+ CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition>
changedPartitions) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ if (changedPartitions.isEmpty()) {
+ log.info("No partitions to change for {}",
tableIdentifier.getTableName());
+ return;
+ }
+ log.info("Updating {} partition(s) in table {}", changedPartitions.size(),
tableIdentifier);
+ try {
+ Table table =
+ GlueCatalogTableUtils.getTable(
+ glueClient, glueCatalogConfig.getCatalogId(),
catalogTableIdentifier);
+ StorageDescriptor sd = table.storageDescriptor();
+ List<BatchUpdatePartitionRequestEntry> updatePartitionEntries =
+ changedPartitions.stream()
+ .map(
+ partition ->
+ BatchUpdatePartitionRequestEntry.builder()
+ .partitionInput(createPartitionInput(table,
partition))
+ .partitionValueList(partition.getValues())
+ .build())
+ .collect(Collectors.toList());
+
+ List<BatchUpdatePartitionResponse> responses = new ArrayList<>();
+
+ CollectionUtils.batches(
+ updatePartitionEntries,
glueCatalogConfig.getMaxPartitionsPerRequest())
+ .forEach(
+ batch -> {
+ BatchUpdatePartitionRequest request =
+ BatchUpdatePartitionRequest.builder()
+ .databaseName(tableIdentifier.getDatabaseName())
+ .tableName(tableIdentifier.getTableName())
+ .entries(batch)
+ .build();
+ responses.add(glueClient.batchUpdatePartition(request));
+ });
+
+ responses.forEach(
+ response -> {
+ if (CollectionUtils.nonEmpty(response.errors())) {
+ throw new CatalogSyncException(
+ "Fail to update partitions to "
+ + tableIdentifier
+ + " with error(s): "
+ + response.errors());
+ }
+ });
+ } catch (Exception e) {
+ throw new CatalogSyncException("Fail to update partitions to " +
tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void dropPartitions(
+ CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition>
partitionsToDrop) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ if (isNullOrEmpty(partitionsToDrop)) {
+ log.info("No partitions to drop for {}", tableIdentifier);
+ return;
+ }
+ log.info("Drop {} CatalogPartition(s) in table {}",
partitionsToDrop.size(), tableIdentifier);
+ try {
+ List<BatchDeletePartitionResponse> responses = new ArrayList<>();
+
+ CollectionUtils.batches(partitionsToDrop,
glueCatalogConfig.getMaxPartitionsPerRequest())
+ .forEach(
+ batch -> {
+ List<PartitionValueList> partitionValueLists =
+ batch.stream()
+ .map(
+ CatalogPartition ->
+ PartitionValueList.builder()
+ .values(CatalogPartition.getValues())
+ .build())
+ .collect(Collectors.toList());
+
+ BatchDeletePartitionRequest batchDeletePartitionRequest =
+ BatchDeletePartitionRequest.builder()
+ .databaseName(tableIdentifier.getDatabaseName())
+ .tableName(tableIdentifier.getTableName())
+ .partitionsToDelete(partitionValueLists)
+ .build();
+
responses.add(glueClient.batchDeletePartition(batchDeletePartitionRequest));
+ });
+
+ responses.forEach(
+ response -> {
+ if (CollectionUtils.nonEmpty(response.errors())) {
+ throw new CatalogSyncException(
+ "Fail to drop partitions to "
+ + tableIdentifier
+ + " with error(s): "
+ + response.errors());
+ }
+ });
+ } catch (Exception e) {
+ throw new CatalogSyncException("Fail to drop partitions to " +
tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public Map<String, String> getTableProperties(
+ CatalogTableIdentifier tableIdentifier, List<String> keysToRetrieve) {
+ try {
+ Table table =
+ GlueCatalogTableUtils.getTable(
+ glueClient, glueCatalogConfig.getCatalogId(), tableIdentifier);
+ Map<String, String> tableParameters = table.parameters();
+
+ return keysToRetrieve.stream()
+ .filter(tableParameters::containsKey)
+ .collect(Collectors.toMap(key -> key, tableParameters::get));
+ } catch (Exception e) {
+ throw new CatalogSyncException(
+ "failed to get last time synced properties for table " +
tableIdentifier, e);
+ }
+ }
+
+ @Override
+ public void updateTableProperties(
+ CatalogTableIdentifier catalogTableIdentifier, Map<String, String>
propertiesToUpdate) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ if (isNullOrEmpty(propertiesToUpdate)) {
+ return;
+ }
+ try {
+ Table table =
+ GlueCatalogTableUtils.getTable(
+ glueClient, glueCatalogConfig.getCatalogId(),
catalogTableIdentifier);
+
+ final Map<String, String> newParams = new HashMap<>();
+ newParams.putAll(table.parameters());
+ newParams.putAll(propertiesToUpdate);
+
+ final Instant now = Instant.now();
+ TableInput updatedTableInput =
+ TableInput.builder()
+ .name(tableIdentifier.getTableName())
+ .tableType(table.tableType())
+ .parameters(newParams)
+ .partitionKeys(table.partitionKeys())
+ .storageDescriptor(table.storageDescriptor())
+ .lastAccessTime(now)
+ .lastAnalyzedTime(now)
+ .build();
+
+ UpdateTableRequest request =
+ UpdateTableRequest.builder()
+ .databaseName(tableIdentifier.getDatabaseName())
+ .tableInput(updatedTableInput)
+ .skipArchive(true)
+ .build();
+ glueClient.updateTable(request);
+ } catch (Exception e) {
+ throw new CatalogSyncException(
+ "Fail to update last synced params for table "
+ + tableIdentifier
+ + ": "
+ + propertiesToUpdate,
+ e);
+ }
+ }
+
+ private PartitionInput createPartitionInput(Table table, CatalogPartition
partition) {
+ StorageDescriptor sd = table.storageDescriptor();
+ StorageDescriptor partitionSD =
+ sd.copy(copySd -> copySd.location(partition.getStorageLocation()));
+ return PartitionInput.builder()
+ .values(partition.getValues())
+ .storageDescriptor(partitionSD)
+ .build();
+ }
+}
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
index d7de2dec..37b91753 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java
@@ -21,6 +21,7 @@ package org.apache.xtable.glue;
import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
import java.time.ZonedDateTime;
+import java.util.Optional;
import lombok.extern.log4j.Log4j2;
@@ -28,7 +29,9 @@ import org.apache.hadoop.conf.Configuration;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.catalog.CatalogUtils;
import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.exception.CatalogSyncException;
import org.apache.xtable.model.InternalTable;
@@ -45,8 +48,6 @@ import
software.amazon.awssdk.services.glue.model.DatabaseInput;
import software.amazon.awssdk.services.glue.model.DeleteTableRequest;
import software.amazon.awssdk.services.glue.model.EntityNotFoundException;
import software.amazon.awssdk.services.glue.model.GetDatabaseRequest;
-import software.amazon.awssdk.services.glue.model.GetTableRequest;
-import software.amazon.awssdk.services.glue.model.GetTableResponse;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
@@ -63,6 +64,7 @@ public class GlueCatalogSyncClient implements
CatalogSyncClient<Table> {
private GlueCatalogConfig glueCatalogConfig;
private Configuration configuration;
private CatalogTableBuilder<TableInput, Table> tableBuilder;
+ private Optional<CatalogPartitionSyncTool> partitionSyncTool;
// For loading the instance using ServiceLoader
public GlueCatalogSyncClient() {}
@@ -78,12 +80,14 @@ public class GlueCatalogSyncClient implements
CatalogSyncClient<Table> {
Configuration configuration,
GlueCatalogConfig glueCatalogConfig,
GlueClient glueClient,
- CatalogTableBuilder tableBuilder) {
+ CatalogTableBuilder tableBuilder,
+ Optional<CatalogPartitionSyncTool> partitionSyncTool) {
this.catalogConfig = catalogConfig;
this.configuration = new Configuration(configuration);
this.glueCatalogConfig = glueCatalogConfig;
this.glueClient = glueClient;
this.tableBuilder = tableBuilder;
+ this.partitionSyncTool = partitionSyncTool;
}
@Override
@@ -143,20 +147,13 @@ public class GlueCatalogSyncClient implements
CatalogSyncClient<Table> {
@Override
public Table getTable(CatalogTableIdentifier tableIdentifier) {
- HierarchicalTableIdentifier tblIdentifier =
toHierarchicalTableIdentifier(tableIdentifier);
try {
- GetTableResponse response =
- glueClient.getTable(
- GetTableRequest.builder()
- .catalogId(glueCatalogConfig.getCatalogId())
- .databaseName(tblIdentifier.getDatabaseName())
- .name(tblIdentifier.getTableName())
- .build());
- return response.table();
+ return GlueCatalogTableUtils.getTable(
+ glueClient, glueCatalogConfig.getCatalogId(), tableIdentifier);
} catch (EntityNotFoundException e) {
return null;
} catch (Exception e) {
- throw new CatalogSyncException("Failed to get table: " +
tblIdentifier.getId(), e);
+ throw new CatalogSyncException("Failed to get table: " +
tableIdentifier.getId(), e);
}
}
@@ -174,6 +171,8 @@ public class GlueCatalogSyncClient implements
CatalogSyncClient<Table> {
} catch (Exception e) {
throw new CatalogSyncException("Failed to create table: " +
tblIdentifier.getId(), e);
}
+
+ partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table,
tableIdentifier));
}
@Override
@@ -193,6 +192,8 @@ public class GlueCatalogSyncClient implements
CatalogSyncClient<Table> {
} catch (Exception e) {
throw new CatalogSyncException("Failed to refresh table: " +
tblIdentifier.getId(), e);
}
+
+ partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table,
tableIdentifier));
}
@Override
@@ -230,7 +231,16 @@ public class GlueCatalogSyncClient implements
CatalogSyncClient<Table> {
this.glueCatalogConfig =
GlueCatalogConfig.of(catalogConfig.getCatalogProperties());
this.glueClient = new
DefaultGlueClientFactory(glueCatalogConfig).getGlueClient();
this.configuration = new Configuration(configuration);
- this.tableBuilder =
GlueCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration);
+ this.tableBuilder =
+ GlueCatalogTableBuilderFactory.getInstance(
+ tableFormat, glueCatalogConfig, this.configuration);
+ this.partitionSyncTool =
+ CatalogUtils.getPartitionSyncTool(
+ tableFormat,
+ glueCatalogConfig.getPartitionExtractorClass(),
+ new GlueCatalogPartitionSyncOperations(glueClient,
glueCatalogConfig),
+ configuration);
+ ;
}
@Override
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
index cd0b1d08..7316aaf2 100644
---
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
+++
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.glue.table.DeltaGlueCatalogTableBuilder;
+import org.apache.xtable.glue.table.HudiGlueCatalogTableBuilder;
import org.apache.xtable.glue.table.IcebergGlueCatalogTableBuilder;
import org.apache.xtable.model.storage.TableFormat;
@@ -32,12 +33,14 @@ import
software.amazon.awssdk.services.glue.model.TableInput;
class GlueCatalogTableBuilderFactory {
static CatalogTableBuilder<TableInput, Table> getInstance(
- String tableFormat, Configuration configuration) {
+ String tableFormat, GlueCatalogConfig glueCatalogConfig, Configuration
configuration) {
switch (tableFormat) {
case TableFormat.ICEBERG:
return new IcebergGlueCatalogTableBuilder(configuration);
case TableFormat.DELTA:
return new DeltaGlueCatalogTableBuilder();
+ case TableFormat.HUDI:
+ return new HudiGlueCatalogTableBuilder(glueCatalogConfig,
configuration);
default:
throw new NotSupportedException("Unsupported table format: " +
tableFormat);
}
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java
new file mode 100644
index 00000000..269f79a5
--- /dev/null
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.glue;
+
+import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.Table;
+
+/** Utils class to fetch details about Glue table */
+public class GlueCatalogTableUtils {
+
+ /**
+ * Retrieves a table from AWS Glue based on the provided catalog and table
identifiers.
+ *
+ * @param glueClient The AWS Glue client used to connect to AWS Glue.
+ * @param catalogId The ID of the AWS Glue Data Catalog where the table is
located.
+ * @param catalogTableIdentifier The identifier for the table in the catalog.
+ * @return The retrieved {@link Table} object if found; otherwise, returns
{@code null} if the
+ * table does not exist.
+ */
+ public static Table getTable(
+ GlueClient glueClient, String catalogId, CatalogTableIdentifier
catalogTableIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ GetTableResponse response =
+ glueClient.getTable(
+ GetTableRequest.builder()
+ .catalogId(catalogId)
+ .databaseName(tableIdentifier.getDatabaseName())
+ .name(tableIdentifier.getTableName())
+ .build());
+ return response.table();
+ }
+}
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
index 1bb51adc..24da6ea4 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
@@ -143,7 +143,7 @@ public class GlueSchemaExtractor {
* @param fieldSchema InternalTable field schema
* @return glue column type
*/
- protected String toTypeString(InternalSchema fieldSchema, String
tableFormat) {
+ public String toTypeString(InternalSchema fieldSchema, String tableFormat) {
switch (fieldSchema.getDataType()) {
case BOOLEAN:
return "boolean";
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java
new file mode 100644
index 00000000..9618873c
--- /dev/null
+++
b/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.glue.table;
+
+import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.glue.GlueCatalogConfig;
+import org.apache.xtable.glue.GlueSchemaExtractor;
+import org.apache.xtable.hudi.HudiTableManager;
+import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+import org.apache.xtable.hudi.catalog.HudiInputFormatUtils;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.storage.TableFormat;
+
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.SerDeInfo;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+public class HudiGlueCatalogTableBuilder implements
CatalogTableBuilder<TableInput, Table> {
+ protected static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE";
+
+ private final GlueSchemaExtractor schemaExtractor;
+ private final HudiTableManager hudiTableManager;
+ private final GlueCatalogConfig glueCatalogConfig;
+ private HoodieTableMetaClient metaClient;
+ private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor;
+
+ public HudiGlueCatalogTableBuilder(
+ GlueCatalogConfig glueCatalogConfig, Configuration configuration) {
+ this.glueCatalogConfig = glueCatalogConfig;
+ this.schemaExtractor = GlueSchemaExtractor.getInstance();
+ this.hudiTableManager = HudiTableManager.of(configuration);
+ this.tablePropertiesExtractor =
HudiCatalogTablePropertiesExtractor.getInstance();
+ }
+
+ @VisibleForTesting
+ HudiGlueCatalogTableBuilder(
+ GlueCatalogConfig glueCatalogConfig,
+ GlueSchemaExtractor schemaExtractor,
+ HudiTableManager hudiTableManager,
+ HoodieTableMetaClient metaClient,
+ HudiCatalogTablePropertiesExtractor propertiesExtractor) {
+ this.glueCatalogConfig = glueCatalogConfig;
+ this.hudiTableManager = hudiTableManager;
+ this.schemaExtractor = schemaExtractor;
+ this.metaClient = metaClient;
+ this.tablePropertiesExtractor = propertiesExtractor;
+ }
+
+ HoodieTableMetaClient getMetaClient(String basePath) {
+ if (metaClient == null) {
+ Optional<HoodieTableMetaClient> metaClientOpt =
+ hudiTableManager.loadTableMetaClientIfExists(basePath);
+
+ if (!metaClientOpt.isPresent()) {
+ throw new CatalogSyncException(
+ "Failed to get meta client since table is not present in the base
path " + basePath);
+ }
+
+ metaClient = metaClientOpt.get();
+ }
+ return metaClient;
+ }
+
+ @Override
+ public TableInput getCreateTableRequest(
+ InternalTable table, CatalogTableIdentifier catalogTableIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ final Instant now = Instant.now();
+ return TableInput.builder()
+ .name(tableIdentifier.getTableName())
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(
+ tablePropertiesExtractor.getTableProperties(
+ table, glueCatalogConfig.getSchemaLengthThreshold()))
+ .partitionKeys(getSchemaPartitionKeys(table.getPartitioningFields()))
+ .storageDescriptor(getStorageDescriptor(table))
+ .lastAccessTime(now)
+ .lastAnalyzedTime(now)
+ .build();
+ }
+
+ @Override
+ public TableInput getUpdateTableRequest(
+ InternalTable table, Table glueTable, CatalogTableIdentifier
catalogTableIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
+ toHierarchicalTableIdentifier(catalogTableIdentifier);
+ Map<String, String> tableParameters = new
HashMap<>(glueTable.parameters());
+ tableParameters.putAll(
+ tablePropertiesExtractor.getTableProperties(
+ table, glueCatalogConfig.getSchemaLengthThreshold()));
+ List<Column> newColumns = getSchemaWithoutPartitionKeys(table);
+ StorageDescriptor sd = glueTable.storageDescriptor();
+ StorageDescriptor partitionSD = sd.copy(copySd ->
copySd.columns(newColumns));
+
+ final Instant now = Instant.now();
+ return TableInput.builder()
+ .name(tableIdentifier.getTableName())
+ .tableType(glueTable.tableType())
+ .parameters(tableParameters)
+ .partitionKeys(getSchemaPartitionKeys(table.getPartitioningFields()))
+ .storageDescriptor(partitionSD)
+ .lastAccessTime(now)
+ .lastAnalyzedTime(now)
+ .build();
+ }
+
+ private StorageDescriptor getStorageDescriptor(InternalTable table) {
+ HoodieFileFormat baseFileFormat =
+
getMetaClient(table.getBasePath()).getTableConfig().getBaseFileFormat();
+ SerDeInfo serDeInfo =
+ SerDeInfo.builder()
+
.serializationLibrary(HudiInputFormatUtils.getSerDeClassName(baseFileFormat))
+
.parameters(tablePropertiesExtractor.getSerdeProperties(table.getBasePath()))
+ .build();
+ return StorageDescriptor.builder()
+ .serdeInfo(serDeInfo)
+ .location(table.getBasePath())
+
.inputFormat(HudiInputFormatUtils.getInputFormatClassName(baseFileFormat,
false))
+
.outputFormat(HudiInputFormatUtils.getOutputFormatClassName(baseFileFormat))
+ .columns(getSchemaWithoutPartitionKeys(table))
+ .build();
+ }
+
+ private List<Column> getSchemaWithoutPartitionKeys(InternalTable table) {
+ List<String> partitionKeys =
+ table.getPartitioningFields().stream()
+ .map(field -> field.getSourceField().getName())
+ .collect(Collectors.toList());
+ return schemaExtractor.toColumns(TableFormat.HUDI,
table.getReadSchema()).stream()
+ .filter(c -> !partitionKeys.contains(c.name()))
+ .collect(Collectors.toList());
+ }
+
+ private List<Column> getSchemaPartitionKeys(List<InternalPartitionField>
partitioningFields) {
+ return partitioningFields.stream()
+ .map(
+ field -> {
+ String fieldName = field.getSourceField().getName();
+ String fieldType =
+ schemaExtractor.toTypeString(
+ field.getSourceField().getSchema(), TableFormat.HUDI);
+ return Column.builder().name(fieldName).type(fieldType).build();
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
index d6efeb19..a1f5e73c 100644
---
a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
@@ -138,6 +138,13 @@ public class GlueCatalogSyncTestBase {
.readSchema(INTERNAL_SCHEMA)
.partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
+ protected static final InternalTable TEST_EVOLVED_HUDI_INTERNAL_TABLE =
+ InternalTable.builder()
+ .basePath(TEST_BASE_PATH)
+ .tableFormat(TableFormat.HUDI)
+ .readSchema(UPDATED_INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+ .build();
protected static final InternalTable TEST_DELTA_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java
new file mode 100644
index 00000000..edf3a790
--- /dev/null
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.glue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.catalog.CatalogPartitionSyncOperations;
+import org.apache.xtable.exception.CatalogSyncException;
+
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest;
+import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse;
+import software.amazon.awssdk.services.glue.model.GetPartitionsRequest;
+import software.amazon.awssdk.services.glue.model.GetPartitionsResponse;
+import software.amazon.awssdk.services.glue.model.GetTableRequest;
+import software.amazon.awssdk.services.glue.model.GetTableResponse;
+import software.amazon.awssdk.services.glue.model.Partition;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.UpdateTableRequest;
+import software.amazon.awssdk.services.glue.model.UpdateTableResponse;
+
+@ExtendWith(MockitoExtension.class)
+public class TestGlueCatalogPartitionSyncOperations extends
GlueCatalogSyncTestBase {
+
+ private CatalogPartitionSyncOperations gluePartitionSyncOperations;
+
+ @BeforeEach
+ public void setup() {
+ setupCommonMocks();
+ }
+
+ void setupCommonMocks() {
+ gluePartitionSyncOperations =
+ new GlueCatalogPartitionSyncOperations(mockGlueClient,
mockGlueCatalogConfig);
+ }
+
+ @Test
+ void testGetAllPartitionsSuccess() {
+ // Mock the first response with a next token
+ GetPartitionsResponse firstResponse =
+ GetPartitionsResponse.builder()
+ .partitions(
+ Arrays.asList(
+ Partition.builder()
+ .values(Collections.singletonList("value1"))
+ .storageDescriptor(
+
StorageDescriptor.builder().location("location1").build())
+ .build(),
+ Partition.builder()
+ .values(Collections.singletonList("value2"))
+ .storageDescriptor(
+
StorageDescriptor.builder().location("location2").build())
+ .build()))
+ .nextToken("token123")
+ .build();
+
+ // Mock the second response without a next token
+ GetPartitionsResponse secondResponse =
+ GetPartitionsResponse.builder()
+ .partitions(
+ Collections.singletonList(
+ Partition.builder()
+ .values(Collections.singletonList("value3"))
+ .storageDescriptor(
+
StorageDescriptor.builder().location("location3").build())
+ .build()))
+ .nextToken(null)
+ .build();
+
+ when(mockGlueClient.getPartitions(any(GetPartitionsRequest.class)))
+ .thenReturn(firstResponse)
+ .thenReturn(secondResponse);
+
+ List<org.apache.xtable.catalog.CatalogPartition> partitions =
+
gluePartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER);
+
+ // Validate the result
+ assertEquals(3, partitions.size());
+
+ assertEquals(Collections.singletonList("value1"),
partitions.get(0).getValues());
+ assertEquals("location1", partitions.get(0).getStorageLocation());
+
+ assertEquals(Collections.singletonList("value2"),
partitions.get(1).getValues());
+ assertEquals("location2", partitions.get(1).getStorageLocation());
+
+ assertEquals(Collections.singletonList("value3"),
partitions.get(2).getValues());
+ assertEquals("location3", partitions.get(2).getStorageLocation());
+
+ // Verify GlueClient interactions
+ verify(mockGlueClient,
times(2)).getPartitions(any(GetPartitionsRequest.class));
+ }
+
+ @Test
+ void testGetAllPartitionsHandlesException() {
+ // Mock GlueClient to throw an exception
+ when(mockGlueClient.getPartitions(any(GetPartitionsRequest.class)))
+ .thenThrow(new RuntimeException("Test exception"));
+
+ // Execute and validate exception
+
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
gluePartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER));
+
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+
+ // Verify GlueClient interactions
+ verify(mockGlueClient,
times(1)).getPartitions(any(GetPartitionsRequest.class));
+ }
+
+ @Test
+ void testAddPartitionsToTableSuccess() {
+ // Mock getTable to return a valid table
+ Table mockTable =
+ Table.builder()
+ .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .storageDescriptor(StorageDescriptor.builder().build())
+ .build();
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+ when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+ // Mock glueClient to return a valid response for batchCreatePartition
+ BatchCreatePartitionResponse mockResponse =
mock(BatchCreatePartitionResponse.class);
+
when(mockGlueClient.batchCreatePartition(any(BatchCreatePartitionRequest.class)))
+ .thenReturn(mockResponse);
+
+ // Execute the method
+ gluePartitionSyncOperations.addPartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER,
+ Collections.singletonList(
+ new org.apache.xtable.catalog.CatalogPartition(
+ Collections.singletonList("value1"), "location1")));
+
+ // Verify that batchCreatePartition was called
+ ArgumentCaptor<BatchCreatePartitionRequest>
createPartitionRequestArgumentCaptor =
+ ArgumentCaptor.forClass(BatchCreatePartitionRequest.class);
+ verify(mockGlueClient, times(1))
+ .batchCreatePartition(createPartitionRequestArgumentCaptor.capture());
+
+ BatchCreatePartitionRequest request =
createPartitionRequestArgumentCaptor.getValue();
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
request.databaseName());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
request.tableName());
+ assertEquals(1, request.partitionInputList().size());
+ assertEquals("location1",
request.partitionInputList().get(0).storageDescriptor().location());
+ assertEquals(Collections.singletonList("value1"),
request.partitionInputList().get(0).values());
+ }
+
+ @Test
+ void testAddPartitionsToTableHandlesException() {
+ // Mock getTable to return a valid table
+ Table mockTable = mock(Table.class);
+ StorageDescriptor mockSd = mock(StorageDescriptor.class);
+ when(mockTable.storageDescriptor()).thenReturn(mockSd);
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+ when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+ // Mock glueClient to throw an exception when batchCreatePartition is
called
+
when(mockGlueClient.batchCreatePartition(any(BatchCreatePartitionRequest.class)))
+ .thenThrow(new RuntimeException("Test exception"));
+
+ // Test the exception handling
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ gluePartitionSyncOperations.addPartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER,
+ Collections.singletonList(
+ new org.apache.xtable.catalog.CatalogPartition(
+ Collections.singletonList("value1"),
"location1"))));
+
+ // Verify that the exception was caused by a RuntimeException
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+
+ // Verify glueClient.batchCreatePartition was called
+ verify(mockGlueClient,
times(1)).batchCreatePartition(any(BatchCreatePartitionRequest.class));
+ }
+
+ @Test
+ void testUpdatePartitionsToTableSuccess() {
+ // Mock getTable to return a valid table
+ Table mockTable =
+ Table.builder()
+ .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .storageDescriptor(StorageDescriptor.builder().build())
+ .build();
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+ when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+ // Prepare test partition to be updated
+ org.apache.xtable.catalog.CatalogPartition partitionToUpdate =
+ new org.apache.xtable.catalog.CatalogPartition(
+ Collections.singletonList("value1"), "new_location");
+
+ // Mock glueClient to return a valid response for batchUpdatePartition
+ BatchUpdatePartitionResponse mockResponse =
mock(BatchUpdatePartitionResponse.class);
+
when(mockGlueClient.batchUpdatePartition(any(BatchUpdatePartitionRequest.class)))
+ .thenReturn(mockResponse);
+
+ // Execute the method
+ gluePartitionSyncOperations.updatePartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER,
Collections.singletonList(partitionToUpdate));
+
+ // Verify that batchUpdatePartition was called
+ ArgumentCaptor<BatchUpdatePartitionRequest>
updatePartitionRequestArgumentCaptor =
+ ArgumentCaptor.forClass(BatchUpdatePartitionRequest.class);
+ verify(mockGlueClient, times(1))
+ .batchUpdatePartition(updatePartitionRequestArgumentCaptor.capture());
+
+ // Capture the request
+ BatchUpdatePartitionRequest request =
updatePartitionRequestArgumentCaptor.getValue();
+
+ // Validate the request's properties
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
request.databaseName());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
request.tableName());
+ assertEquals(1, request.entries().size());
+ assertEquals(
+ "new_location",
request.entries().get(0).partitionInput().storageDescriptor().location());
+ assertEquals(
+ Collections.singletonList("value1"),
request.entries().get(0).partitionInput().values());
+ }
+
+ @Test
+ void testUpdatePartitionsToTableHandlesException() {
+ // Mock getTable to return a valid table
+ Table mockTable =
+ Table.builder()
+ .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .storageDescriptor(StorageDescriptor.builder().build())
+ .build();
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+ when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+ // Mock glueClient to throw an exception
+
when(mockGlueClient.batchUpdatePartition(any(BatchUpdatePartitionRequest.class)))
+ .thenThrow(new RuntimeException("Test exception"));
+
+ // Execute and validate exception
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ gluePartitionSyncOperations.updatePartitionsToTable(
+ TEST_CATALOG_TABLE_IDENTIFIER,
+ Collections.singletonList(
+ new org.apache.xtable.catalog.CatalogPartition(
+ Collections.singletonList("value1"),
"new_location"))));
+
+ // Verify the exception cause
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+
+ // Verify glueClient.batchUpdatePartition was called
+ verify(mockGlueClient,
times(1)).batchUpdatePartition(any(BatchUpdatePartitionRequest.class));
+ }
+
+ @Test
+ void testDropPartitionsSuccess() {
+ // Prepare a partition to drop
+ org.apache.xtable.catalog.CatalogPartition partitionToDrop =
+ new org.apache.xtable.catalog.CatalogPartition(
+ Collections.singletonList("value1"), "location1");
+
+ // Mock glueClient to return a valid response for batchDeletePartition
+ BatchDeletePartitionResponse mockResponse =
mock(BatchDeletePartitionResponse.class);
+
when(mockGlueClient.batchDeletePartition(any(BatchDeletePartitionRequest.class)))
+ .thenReturn(mockResponse);
+ when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+ gluePartitionSyncOperations.dropPartitions(
+ TEST_CATALOG_TABLE_IDENTIFIER,
Collections.singletonList(partitionToDrop));
+
+ // Verify that batchDeletePartition was called
+ ArgumentCaptor<BatchDeletePartitionRequest>
deletePartitionRequestArgumentCaptor =
+ ArgumentCaptor.forClass(BatchDeletePartitionRequest.class);
+ verify(mockGlueClient, times(1))
+ .batchDeletePartition(deletePartitionRequestArgumentCaptor.capture());
+
+ // Capture the request
+ BatchDeletePartitionRequest request =
deletePartitionRequestArgumentCaptor.getValue();
+
+ // Validate the request's properties
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
request.databaseName());
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
request.tableName());
+ assertEquals(1, request.partitionsToDelete().size());
+ assertEquals(Collections.singletonList("value1"),
request.partitionsToDelete().get(0).values());
+ }
+
+ @Test
+ void testDropPartitionsHandlesException() {
+ // Mock glueClient to throw an exception during batchDeletePartition
+
when(mockGlueClient.batchDeletePartition(any(BatchDeletePartitionRequest.class)))
+ .thenThrow(new RuntimeException("Test exception"));
+ when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100);
+
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ gluePartitionSyncOperations.dropPartitions(
+ TEST_CATALOG_TABLE_IDENTIFIER,
+ Collections.singletonList(
+ new org.apache.xtable.catalog.CatalogPartition(
+ Collections.singletonList("value1"),
"location1"))));
+
+ // Verify the exception cause
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+
+ // Verify glueClient.batchDeletePartition was called
+ verify(mockGlueClient,
times(1)).batchDeletePartition(any(BatchDeletePartitionRequest.class));
+ }
+
+ @Test
+ void testDropPartitionsNoPartitionsToDrop() {
+ // Execute the method with an empty partition list
+ gluePartitionSyncOperations.dropPartitions(
+ TEST_CATALOG_TABLE_IDENTIFIER, Collections.emptyList());
+
+ // Verify that batchDeletePartition was not called
+ verify(mockGlueClient,
times(0)).batchDeletePartition(any(BatchDeletePartitionRequest.class));
+ }
+
+ @Test
+ void testGetLastTimeSyncedPropertiesSuccess() {
+ // Mock table and parameters
+ Map<String, String> mockParameters = new HashMap<>();
+ mockParameters.put("last_synced_time", "2023-12-01T12:00:00Z");
+ mockParameters.put("last_modified_by", "user123");
+ Table mockTable =
+ Table.builder()
+ .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .storageDescriptor(StorageDescriptor.builder().build())
+ .parameters(mockParameters)
+ .build();
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+
+ // List of keys to look for
+ List<String> lastSyncedKeys = Arrays.asList("last_synced_time",
"last_modified_by");
+
+ // Execute the method
+ Map<String, String> result =
+ gluePartitionSyncOperations.getTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys);
+
+ // Assert the result
+ assertEquals(2, result.size());
+ assertEquals("2023-12-01T12:00:00Z", result.get("last_synced_time"));
+ assertEquals("user123", result.get("last_modified_by"));
+ }
+
+ @Test
+ void testGetLastTimeSyncedPropertiesKeyNotFound() {
+ // Mock table and parameters
+ Map<String, String> mockParameters =
+ Collections.singletonMap("last_synced_time", "2023-12-01T12:00:00Z");
+ Table mockTable =
+ Table.builder()
+ .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .storageDescriptor(StorageDescriptor.builder().build())
+ .parameters(mockParameters)
+ .build();
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+
+ // List of keys to look for (including a non-existent key)
+ List<String> lastSyncedKeys = Arrays.asList("last_synced_time",
"non_existent_key");
+
+ // Execute the method
+ Map<String, String> result =
+ gluePartitionSyncOperations.getTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys);
+
+ // Assert the result
+ assertEquals(1, result.size());
+ assertEquals("2023-12-01T12:00:00Z", result.get("last_synced_time"));
+ }
+
+ @Test
+ void testGetLastTimeSyncedPropertiesHandlesException() {
+ // Mock getTable to throw an exception
+ when(mockGlueClient.getTable(any(GetTableRequest.class)))
+ .thenThrow(new RuntimeException("Test exception"));
+
+ // Execute and validate exception
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ gluePartitionSyncOperations.getTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER,
Collections.singletonList("last_synced_time")));
+
+ // Verify the exception cause
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+
+ // Verify that getTable was called
+ verify(mockGlueClient, times(1)).getTable(any(GetTableRequest.class));
+ }
+
+ @Test
+ void testUpdateLastTimeSyncedPropertiesSuccess() {
+ // Mock table and parameters
+ Map<String, String> mockParameters = new HashMap<>();
+ mockParameters.put("last_synced_time", "2023-12-01T12:00:00Z");
+ mockParameters.put("last_modified_by", "user123");
+ Table mockTable =
+ Table.builder()
+ .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName())
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .storageDescriptor(StorageDescriptor.builder().build())
+ .parameters(mockParameters)
+ .build();
+ GetTableResponse tableResponse =
GetTableResponse.builder().table(mockTable).build();
+
when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse);
+
+ // Mock the updateTable call to verify it's triggered
+ when(mockGlueClient.updateTable(any(UpdateTableRequest.class)))
+ .thenReturn(UpdateTableResponse.builder().build());
+
+ // New properties to update
+ Map<String, String> newProperties = new HashMap<>();
+ newProperties.put("last_synced_time", "2023-12-02T14:00:00Z");
+ newProperties.put("last_modified_by", "user456");
+
+ // Execute the method
+
gluePartitionSyncOperations.updateTableProperties(TEST_CATALOG_TABLE_IDENTIFIER,
newProperties);
+
+ // Capture the UpdateTableRequest sent to the GlueClient
+ ArgumentCaptor<UpdateTableRequest> captor =
ArgumentCaptor.forClass(UpdateTableRequest.class);
+ verify(mockGlueClient, times(1)).updateTable(captor.capture());
+ UpdateTableRequest capturedRequest = captor.getValue();
+
+ // Assert that the table update request contains the new parameters
+ assertEquals(
+ "2023-12-02T14:00:00Z",
capturedRequest.tableInput().parameters().get("last_synced_time"));
+ assertEquals("user456",
capturedRequest.tableInput().parameters().get("last_modified_by"));
+ }
+
+ @Test
+ void testUpdateLastTimeSyncedPropertiesHandlesException() {
+ // Setup mocks to throw an exception
+ when(mockGlueClient.getTable(any(GetTableRequest.class)))
+ .thenThrow(new RuntimeException("Test exception"));
+
+ // New properties to update
+ Map<String, String> newProperties = new HashMap<>();
+ newProperties.put("last_synced_time", "2023-12-02T14:00:00Z");
+
+ // Execute and validate exception
+ CatalogSyncException exception =
+ assertThrows(
+ CatalogSyncException.class,
+ () ->
+ gluePartitionSyncOperations.updateTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, newProperties));
+
+ // Assert that the exception has the right cause
+ assertInstanceOf(RuntimeException.class, exception.getCause());
+ verify(mockGlueClient, times(1)).getTable(any(GetTableRequest.class));
+ }
+
+ @Test
+ void testUpdateLastTimeSyncedPropertiesEmptyProperties() {
+ // Execute the method with empty properties (nothing to update)
+ gluePartitionSyncOperations.updateTableProperties(
+ TEST_CATALOG_TABLE_IDENTIFIER, new HashMap<>());
+
+ // Verify that updateTable was never called
+ verify(mockGlueClient,
times(0)).updateTable(any(UpdateTableRequest.class));
+ }
+}
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
index 0ec0d04b..2fa5a53c 100644
---
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java
@@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
@@ -34,6 +35,7 @@ import static org.mockito.Mockito.when;
import java.time.ZonedDateTime;
import java.util.Collections;
+import java.util.Optional;
import java.util.ServiceLoader;
import org.junit.jupiter.api.Test;
@@ -44,8 +46,10 @@ import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.apache.xtable.catalog.CatalogPartitionSyncTool;
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.exception.CatalogSyncException;
+import org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.spi.sync.CatalogSyncClient;
@@ -71,15 +75,22 @@ import
software.amazon.awssdk.services.glue.model.UpdateTableResponse;
public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase {
@Mock private CatalogTableBuilder<TableInput, Table> mockTableBuilder;
+ @Mock private HudiCatalogPartitionSyncTool mockPartitionSyncTool;
private GlueCatalogSyncClient glueCatalogSyncClient;
- private GlueCatalogSyncClient createGlueCatalogSyncClient() {
+ private GlueCatalogSyncClient createGlueCatalogSyncClient(boolean
includePartitionSyncTool) {
+ Optional<CatalogPartitionSyncTool> partitionSyncToolOpt =
+ includePartitionSyncTool ? Optional.of(mockPartitionSyncTool) :
Optional.empty();
return new GlueCatalogSyncClient(
- catalogConfig, testConfiguration, mockGlueCatalogConfig,
mockGlueClient, mockTableBuilder);
+ catalogConfig,
+ testConfiguration,
+ mockGlueCatalogConfig,
+ mockGlueClient,
+ mockTableBuilder,
+ partitionSyncToolOpt);
}
void setupCommonMocks() {
- glueCatalogSyncClient = createGlueCatalogSyncClient();
when(mockGlueCatalogConfig.getCatalogId()).thenReturn(TEST_GLUE_CATALOG_ID);
}
@@ -87,6 +98,7 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
@ValueSource(booleans = {true, false})
void testHasDatabase(boolean isDbPresent) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
GetDatabaseRequest dbRequest =
getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName());
GetDatabaseResponse dbResponse =
GetDatabaseResponse.builder()
@@ -111,6 +123,7 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
@Test
void testHasDatabaseFailure() {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
GetDatabaseRequest dbRequest =
getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName());
when(mockGlueClient.getDatabase(dbRequest)).thenThrow(TEST_GLUE_EXCEPTION);
CatalogSyncException exception =
@@ -126,6 +139,7 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
@ValueSource(booleans = {true, false})
void testGetTable(boolean isTablePresent) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
GetTableRequest tableRequest =
getTableRequest(
TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
@@ -158,6 +172,7 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
@Test
void testGetTableFailure() {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
GetTableRequest tableRequest =
getTableRequest(
TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
@@ -177,6 +192,7 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
@ValueSource(booleans = {false, true})
void testCreateDatabase(boolean shouldFail) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
CreateDatabaseRequest dbRequest =
createDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName());
if (shouldFail) {
@@ -200,6 +216,7 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
@ValueSource(booleans = {false, true})
void testDropTable(boolean shouldFail) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
DeleteTableRequest deleteRequest =
deleteTableRequest(
TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
@@ -223,9 +240,11 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
verify(mockGlueClient, times(1)).deleteTable(deleteRequest);
}
- @Test
- void testCreateTable_Success() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateTable_Success(boolean syncPartitions) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
CreateTableRequest createTableRequest =
createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
TEST_TABLE_INPUT);
when(mockTableBuilder.getCreateTableRequest(
@@ -237,12 +256,19 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
verify(mockGlueClient, times(1)).createTable(createTableRequest);
verify(mockTableBuilder, times(1))
.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE,
TEST_CATALOG_TABLE_IDENTIFIER);
+ if (syncPartitions) {
+ verify(mockPartitionSyncTool, times(1))
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ } else {
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ }
}
- @Test
- void testCreateTable_ErrorGettingTableInput() {
- glueCatalogSyncClient = createGlueCatalogSyncClient();
-
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateTable_ErrorGettingTableInput(boolean syncPartitions) {
+ glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
// error when getting iceberg table input
doThrow(new RuntimeException("something went wrong"))
.when(mockTableBuilder)
@@ -255,12 +281,15 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
verify(mockTableBuilder, times(1))
.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockGlueClient, never()).createTable(any(CreateTableRequest.class));
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
- @Test
- void testCreateTable_ErrorCreatingTable() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCreateTable_ErrorCreatingTable(boolean syncPartitions) {
setupCommonMocks();
-
+ glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
// error when creating table
CreateTableRequest createTableRequest =
createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
TEST_TABLE_INPUT);
@@ -280,11 +309,15 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
verify(mockTableBuilder, times(1))
.getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockGlueClient, times(1)).createTable(createTableRequest);
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
- @Test
- void testRefreshTable_Success() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRefreshTable_Success(boolean syncPartitions) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
UpdateTableRequest updateTableRequest =
updateTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
TEST_TABLE_INPUT);
Table glueTable =
Table.builder().parameters(Collections.emptyMap()).build();
@@ -299,11 +332,19 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
verify(mockTableBuilder, times(1))
.getUpdateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, glueTable,
TEST_CATALOG_TABLE_IDENTIFIER);
+ if (syncPartitions) {
+ verify(mockPartitionSyncTool, times(1))
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ } else {
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
+ }
}
- @Test
- void testRefreshTable_ErrorCreatingTableInput() {
- glueCatalogSyncClient = createGlueCatalogSyncClient();
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRefreshTable_ErrorCreatingTableInput(boolean syncPartitions) {
+ glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
Table glueTable =
Table.builder().parameters(Collections.emptyMap()).build();
// error while refreshing table
@@ -320,11 +361,15 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
.getUpdateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, glueTable,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockGlueClient, never()).updateTable(any(UpdateTableRequest.class));
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
- @Test
- void testRefreshTable_ErrorRefreshingTable() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testRefreshTable_ErrorRefreshingTable(boolean syncPartitions) {
setupCommonMocks();
+ glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions);
Table glueTable =
Table.builder().parameters(Collections.emptyMap()).build();
UpdateTableRequest updateTableRequest =
@@ -348,12 +393,14 @@ public class TestGlueCatalogSyncClient extends
GlueCatalogSyncTestBase {
.getUpdateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, glueTable,
TEST_CATALOG_TABLE_IDENTIFIER);
verify(mockGlueClient, times(1)).updateTable(updateTableRequest);
+ verify(mockPartitionSyncTool, never())
+ .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE),
eq(TEST_CATALOG_TABLE_IDENTIFIER));
}
@Test
void testCreateOrReplaceTable() {
setupCommonMocks();
-
+ glueCatalogSyncClient = createGlueCatalogSyncClient(false);
ZonedDateTime fixedDateTime =
ZonedDateTime.parse("2024-10-25T10:15:30.00Z");
try (MockedStatic<ZonedDateTime> mockZonedDateTime =
mockStatic(ZonedDateTime.class)) {
mockZonedDateTime.when(ZonedDateTime::now).thenReturn(fixedDateTime);
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java
similarity index 51%
copy from
xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
copy to
xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java
index adf88c99..daeb3df9 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java
@@ -16,11 +16,9 @@
* limitations under the License.
*/
-package org.apache.xtable.hms.table;
+package org.apache.xtable.glue.table;
-import static
org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
@@ -30,10 +28,8 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.stream.Collectors;
-import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -41,33 +37,36 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.xtable.hms.HMSCatalogSyncTestBase;
-import org.apache.xtable.hms.HMSSchemaExtractor;
+import org.apache.xtable.glue.GlueCatalogSyncTestBase;
+import org.apache.xtable.glue.GlueSchemaExtractor;
import org.apache.xtable.hudi.HudiTableManager;
import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
@ExtendWith(MockitoExtension.class)
-public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase {
+public class TestHudiGlueCatalogTableBuilder extends GlueCatalogSyncTestBase {
@Mock private HoodieTableMetaClient mockMetaClient;
@Mock private HudiTableManager mockHudiTableManager;
@Mock private HoodieTableConfig mockTableConfig;
@Mock private HudiCatalogTablePropertiesExtractor
mockTablePropertiesExtractor;
- private HudiHMSCatalogTableBuilder hudiHMSCatalogTableBuilder;
+ private HudiGlueCatalogTableBuilder hudiGlueCatalogTableBuilder;
- private HudiHMSCatalogTableBuilder
createMockHudiHMSCatalogSyncRequestProvider() {
- return new HudiHMSCatalogTableBuilder(
- mockHMSCatalogConfig,
- HMSSchemaExtractor.getInstance(),
+ private HudiGlueCatalogTableBuilder
createMockHudiGlueCatalogSyncRequestProvider() {
+ return new HudiGlueCatalogTableBuilder(
+ mockGlueCatalogConfig,
+ GlueSchemaExtractor.getInstance(),
mockHudiTableManager,
mockMetaClient,
mockTablePropertiesExtractor);
}
void setupCommonMocks() {
- hudiHMSCatalogTableBuilder = createMockHudiHMSCatalogSyncRequestProvider();
- when(mockHMSCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000);
+ hudiGlueCatalogTableBuilder =
createMockHudiGlueCatalogSyncRequestProvider();
+ when(mockGlueCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000);
}
void setupMetaClientMocks() {
@@ -80,31 +79,16 @@ public class TestHudiHMSCatalogTableBuilder extends
HMSCatalogSyncTestBase {
setupCommonMocks();
setupMetaClientMocks();
- List<String> partitionFields =
- TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
- .map(partitionField -> partitionField.getSourceField().getName())
- .collect(Collectors.toList());
-
- Table table =
- hudiHMSCatalogTableBuilder.getCreateTableRequest(
+ TableInput table =
+ hudiGlueCatalogTableBuilder.getCreateTableRequest(
TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
- ArgumentCaptor<List<String>> partitionsCaptor =
ArgumentCaptor.forClass(List.class);
verify(mockTablePropertiesExtractor)
- .getSparkTableProperties(
- partitionsCaptor.capture(),
- eq(""),
- any(Integer.class),
- eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
- assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
- assertEquals(partitionFields, partitionsCaptor.getValue());
- assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
table.getTableName());
- assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
table.getDbName());
- assertEquals(3, table.getSd().getCols().size());
- assertEquals(1, table.getPartitionKeys().size());
- assertNotNull(table.getParameters());
- assertFalse(table.getParameters().isEmpty());
- assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+ .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.name());
+ assertEquals(2, table.storageDescriptor().columns().size());
+ assertEquals(1, table.partitionKeys().size());
+ assertNotNull(table.parameters());
}
@Test
@@ -116,28 +100,26 @@ public class TestHudiHMSCatalogTableBuilder extends
HMSCatalogSyncTestBase {
TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
.map(partitionField -> partitionField.getSourceField().getName())
.collect(Collectors.toList());
- Table table =
- hudiHMSCatalogTableBuilder.getCreateTableRequest(
+
+ TableInput tableInput =
+ hudiGlueCatalogTableBuilder.getCreateTableRequest(
TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
- Table updatedTable =
- hudiHMSCatalogTableBuilder.getUpdateTableRequest(
+ Table table =
+ Table.builder()
+ .name(tableInput.name())
+ .parameters(tableInput.parameters())
+ .storageDescriptor(tableInput.storageDescriptor())
+ .partitionKeys(tableInput.partitionKeys())
+ .build();
+ TableInput updatedTable =
+ hudiGlueCatalogTableBuilder.getUpdateTableRequest(
TEST_EVOLVED_HUDI_INTERNAL_TABLE, table,
TEST_CATALOG_TABLE_IDENTIFIER);
- ArgumentCaptor<List<String>> partitionsCaptor =
ArgumentCaptor.forClass(List.class);
verify(mockTablePropertiesExtractor)
- .getSparkTableProperties(
- partitionsCaptor.capture(),
- eq(""),
- any(Integer.class),
- eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
- assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
- assertEquals(partitionFields, partitionsCaptor.getValue());
- assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
updatedTable.getTableName());
- assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
updatedTable.getDbName());
- assertEquals(4, updatedTable.getSd().getCols().size());
- assertEquals(1, updatedTable.getPartitionKeys().size());
- assertNotNull(updatedTable.getParameters());
- assertFalse(table.getParameters().isEmpty());
- assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
+ .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
+ assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
updatedTable.name());
+ assertEquals(3, updatedTable.storageDescriptor().columns().size());
+ assertEquals(1, updatedTable.partitionKeys().size());
+ assertNotNull(updatedTable.parameters());
}
}
diff --git
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
index ff26e600..07625e44 100644
---
a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java
@@ -22,14 +22,17 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.spark.sql.types.StructType;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
@@ -41,17 +44,28 @@ public class HudiCatalogTablePropertiesExtractor {
private static final HudiCatalogTablePropertiesExtractor INSTANCE =
new HudiCatalogTablePropertiesExtractor();
+ protected static final String HUDI_METADATA_CONFIG =
"hudi.metadata-listing-enabled";
public static HudiCatalogTablePropertiesExtractor getInstance() {
return INSTANCE;
}
- /**
- * Get Spark Sql related table properties. This is used for spark datasource
table.
- *
- * @param schema The schema to write to the table.
- * @return A new parameters added the spark's table properties.
- */
- public Map<String, String> getSparkTableProperties(
+
+ /** Get Hudi table properties that needs to be synced with catalog table */
+ public Map<String, String> getTableProperties(InternalTable table, int
schemaLengthThreshold) {
+ Map<String, String> tableProperties = new HashMap<>();
+ List<String> partitionFields =
+ table.getPartitioningFields().stream()
+ .map(field -> field.getSourceField().getName())
+ .collect(Collectors.toList());
+ tableProperties.put(HUDI_METADATA_CONFIG, "true");
+ Map<String, String> sparkTableProperties =
+ getSparkTableProperties(partitionFields, "", schemaLengthThreshold,
table.getReadSchema());
+ tableProperties.putAll(sparkTableProperties);
+ return tableProperties;
+ }
+
+ /** Get Spark Sql related table properties. This is used for spark
datasource table. */
+ private Map<String, String> getSparkTableProperties(
List<String> partitionNames,
String sparkVersion,
int schemaLengthThreshold,
@@ -123,4 +137,12 @@ public class HudiCatalogTablePropertiesExtractor {
}
return sparkProperties;
}
+
+ /** Get Hudi serde properties that needs to be synced with catalog table */
+ public Map<String, String> getSerdeProperties(String basePath) {
+ Map<String, String> serdeProperties = new HashMap<>();
+ serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
+ serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE,
String.valueOf(false));
+ return serdeProperties;
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
index e08674bf..202e84e2 100644
---
a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java
@@ -18,27 +18,39 @@
package org.apache.xtable.hudi.catalog;
+import static
org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor.HUDI_METADATA_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.Arrays;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
+import org.apache.hudi.common.util.ConfigUtils;
+
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.TableFormat;
public class TestHudiCatalogTablePropertiesExtractor {
@Test
void testGetSparkTableProperties() {
- List<String> partitionNames = Arrays.asList("region", "category");
- String sparkVersion = "3.2.1";
+ InternalPartitionField p1 =
+ InternalPartitionField.builder()
+ .sourceField(InternalField.builder().name("region").build())
+ .build();
+ InternalPartitionField p2 =
+ InternalPartitionField.builder()
+ .sourceField(InternalField.builder().name("category").build())
+ .build();
int schemaLengthThreshold = 1000;
InternalSchema schema =
InternalSchema.builder()
@@ -80,13 +92,20 @@ public class TestHudiCatalogTablePropertiesExtractor {
.name("testSchema")
.build();
+ InternalTable table =
+ InternalTable.builder()
+ .name("test-table")
+ .tableFormat(TableFormat.HUDI)
+ .readSchema(schema)
+ .partitioningFields(Arrays.asList(p1, p2))
+ .build();
+
Map<String, String> result =
HudiCatalogTablePropertiesExtractor.getInstance()
- .getSparkTableProperties(partitionNames, sparkVersion,
schemaLengthThreshold, schema);
+ .getTableProperties(table, schemaLengthThreshold);
// Validate results
assertEquals("hudi", result.get("spark.sql.sources.provider"));
- assertEquals("3.2.1", result.get("spark.sql.create.version"));
assertEquals("1", result.get("spark.sql.sources.schema.numParts"));
assertEquals(
"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"region\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}",
@@ -94,12 +113,12 @@ public class TestHudiCatalogTablePropertiesExtractor {
assertEquals("2", result.get("spark.sql.sources.schema.numPartCols"));
assertEquals("region", result.get("spark.sql.sources.schema.partCol.0"));
assertEquals("category", result.get("spark.sql.sources.schema.partCol.1"));
+ assertEquals("true", result.get(HUDI_METADATA_CONFIG));
}
@Test
void testGetSparkTablePropertiesEmptyPartitions() {
// Setup input data with no partitions
- List<String> partitionNames = Collections.emptyList();
int schemaLengthThreshold = 50;
InternalSchema schema =
InternalSchema.builder()
@@ -125,10 +144,17 @@ public class TestHudiCatalogTablePropertiesExtractor {
.name("testSchema")
.build();
+ InternalTable table =
+ InternalTable.builder()
+ .name("test-table")
+ .tableFormat(TableFormat.HUDI)
+ .readSchema(schema)
+ .partitioningFields(Collections.emptyList())
+ .build();
// Call the method
Map<String, String> result =
HudiCatalogTablePropertiesExtractor.getInstance()
- .getSparkTableProperties(partitionNames, "",
schemaLengthThreshold, schema);
+ .getTableProperties(table, schemaLengthThreshold);
assertEquals("hudi", result.get("spark.sql.sources.provider"));
assertNull(result.get("spark.sql.create.version"));
@@ -140,5 +166,17 @@ public class TestHudiCatalogTablePropertiesExtractor {
+ result.get("spark.sql.sources.schema.part.2")
+ result.get("spark.sql.sources.schema.part.3"));
assertNull(result.get("spark.sql.sources.schema.numPartCols"));
+ assertEquals("true", result.get(HUDI_METADATA_CONFIG));
+ }
+
+ @Test
+ void testGetSerdeProperties() {
+ String basePath = "/test/base/path";
+ Map<String, String> serdeProperties =
+
HudiCatalogTablePropertiesExtractor.getInstance().getSerdeProperties(basePath);
+
+ assertNotNull(serdeProperties);
+ assertEquals(basePath, serdeProperties.get(ConfigUtils.TABLE_SERDE_PATH));
+ assertEquals("false",
serdeProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE));
}
}
diff --git
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
index cc4dc3df..b8b1ea49 100644
---
a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
+++
b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java
@@ -22,7 +22,6 @@ import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifi
import java.io.IOException;
import java.time.Instant;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -39,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.util.ConfigUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -54,7 +52,6 @@ import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.CatalogTableIdentifier;
import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
import org.apache.xtable.model.schema.InternalPartitionField;
-import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.TableFormat;
@Log4j2
@@ -122,7 +119,9 @@ public class HudiHMSCatalogTableBuilder implements
CatalogTableBuilder<Table, Ta
}
newTb.setCreateTime((int) Instant.now().toEpochMilli());
- Map<String, String> tableProperties = getTableProperties(table,
table.getReadSchema());
+ Map<String, String> tableProperties =
+ tablePropertiesExtractor.getTableProperties(
+ table, hmsCatalogConfig.getSchemaLengthThreshold());
newTb.setParameters(tableProperties);
newTb.setSd(getStorageDescriptor(table));
newTb.setPartitionKeys(getSchemaPartitionKeys(table));
@@ -134,7 +133,9 @@ public class HudiHMSCatalogTableBuilder implements
CatalogTableBuilder<Table, Ta
InternalTable table, Table hmsTable, CatalogTableIdentifier
tableIdentifier) {
Map<String, String> parameters = hmsTable.getParameters();
Map<String, String> tableParameters = hmsTable.getParameters();
- tableParameters.putAll(getTableProperties(table, table.getReadSchema()));
+ tableParameters.putAll(
+ tablePropertiesExtractor.getTableProperties(
+ table, hmsCatalogConfig.getSchemaLengthThreshold()));
hmsTable.setParameters(tableParameters);
hmsTable.setSd(getStorageDescriptor(table));
@@ -143,20 +144,6 @@ public class HudiHMSCatalogTableBuilder implements
CatalogTableBuilder<Table, Ta
return hmsTable;
}
- private Map<String, String> getTableProperties(InternalTable table,
InternalSchema schema) {
- List<String> partitionFields =
- table.getPartitioningFields().stream()
- .map(field -> field.getSourceField().getName())
- .collect(Collectors.toList());
- Map<String, String> tableProperties = new HashMap<>();
- tableProperties.put(HUDI_METADATA_CONFIG, "true");
- Map<String, String> sparkTableProperties =
- tablePropertiesExtractor.getSparkTableProperties(
- partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(),
schema);
- tableProperties.putAll(sparkTableProperties);
- return tableProperties;
- }
-
private StorageDescriptor getStorageDescriptor(InternalTable table) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI,
table.getReadSchema()));
@@ -168,7 +155,8 @@ public class HudiHMSCatalogTableBuilder implements
CatalogTableBuilder<Table, Ta
String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat);
storageDescriptor.setInputFormat(inputFormatClassName);
storageDescriptor.setOutputFormat(outputFormatClassName);
- Map<String, String> serdeProperties =
getSerdeProperties(table.getBasePath());
+ Map<String, String> serdeProperties =
+ tablePropertiesExtractor.getSerdeProperties(table.getBasePath());
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setSerializationLib(serdeClassName);
serDeInfo.setParameters(serdeProperties);
@@ -176,13 +164,6 @@ public class HudiHMSCatalogTableBuilder implements
CatalogTableBuilder<Table, Ta
return storageDescriptor;
}
- private static Map<String, String> getSerdeProperties(String basePath) {
- Map<String, String> serdeProperties = new HashMap<>();
- serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath);
- serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE,
String.valueOf(false));
- return serdeProperties;
- }
-
private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) {
List<InternalPartitionField> partitioningFields =
table.getPartitioningFields();
diff --git
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
index adf88c99..4df2ab85 100644
---
a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
+++
b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java
@@ -18,22 +18,15 @@
package org.apache.xtable.hms.table;
-import static
org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.List;
-import java.util.stream.Collectors;
-
import org.apache.hadoop.hive.metastore.api.Table;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -80,31 +73,16 @@ public class TestHudiHMSCatalogTableBuilder extends
HMSCatalogSyncTestBase {
setupCommonMocks();
setupMetaClientMocks();
- List<String> partitionFields =
- TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
- .map(partitionField -> partitionField.getSourceField().getName())
- .collect(Collectors.toList());
-
Table table =
hudiHMSCatalogTableBuilder.getCreateTableRequest(
TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
- ArgumentCaptor<List<String>> partitionsCaptor =
ArgumentCaptor.forClass(List.class);
verify(mockTablePropertiesExtractor)
- .getSparkTableProperties(
- partitionsCaptor.capture(),
- eq(""),
- any(Integer.class),
- eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
- assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
- assertEquals(partitionFields, partitionsCaptor.getValue());
+ .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
table.getTableName());
assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
table.getDbName());
assertEquals(3, table.getSd().getCols().size());
assertEquals(1, table.getPartitionKeys().size());
- assertNotNull(table.getParameters());
- assertFalse(table.getParameters().isEmpty());
- assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
}
@Test
@@ -112,10 +90,6 @@ public class TestHudiHMSCatalogTableBuilder extends
HMSCatalogSyncTestBase {
setupCommonMocks();
setupMetaClientMocks();
- List<String> partitionFields =
- TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream()
- .map(partitionField -> partitionField.getSourceField().getName())
- .collect(Collectors.toList());
Table table =
hudiHMSCatalogTableBuilder.getCreateTableRequest(
TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
@@ -123,21 +97,11 @@ public class TestHudiHMSCatalogTableBuilder extends
HMSCatalogSyncTestBase {
hudiHMSCatalogTableBuilder.getUpdateTableRequest(
TEST_EVOLVED_HUDI_INTERNAL_TABLE, table,
TEST_CATALOG_TABLE_IDENTIFIER);
- ArgumentCaptor<List<String>> partitionsCaptor =
ArgumentCaptor.forClass(List.class);
verify(mockTablePropertiesExtractor)
- .getSparkTableProperties(
- partitionsCaptor.capture(),
- eq(""),
- any(Integer.class),
- eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema()));
- assertEquals(partitionFields.size(), partitionsCaptor.getValue().size());
- assertEquals(partitionFields, partitionsCaptor.getValue());
+ .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class));
assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
updatedTable.getTableName());
assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(),
updatedTable.getDbName());
assertEquals(4, updatedTable.getSd().getCols().size());
assertEquals(1, updatedTable.getPartitionKeys().size());
- assertNotNull(updatedTable.getParameters());
- assertFalse(table.getParameters().isEmpty());
- assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true");
}
}