This is an automated email from the ASF dual-hosted git repository. vinish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push: new 03e5b022 [590] Add Hudi Glue Catalog Sync Implementation 03e5b022 is described below commit 03e5b02239db0008bd96b1b9f757e469a7b668d0 Author: Vamsi <va...@onehouse.ai> AuthorDate: Thu Feb 13 17:14:34 2025 +0530 [590] Add Hudi Glue Catalog Sync Implementation --- xtable-aws/pom.xml | 8 + .../org/apache/xtable/glue/GlueCatalogConfig.java | 11 + .../xtable/glue/GlueCatalogConversionSource.java | 22 +- .../glue/GlueCatalogPartitionSyncOperations.java | 331 +++++++++++++ .../apache/xtable/glue/GlueCatalogSyncClient.java | 38 +- .../glue/GlueCatalogTableBuilderFactory.java | 5 +- .../apache/xtable/glue/GlueCatalogTableUtils.java | 56 +++ .../apache/xtable/glue/GlueSchemaExtractor.java | 2 +- .../glue/table/HudiGlueCatalogTableBuilder.java | 185 ++++++++ .../xtable/glue/GlueCatalogSyncTestBase.java | 7 + .../TestGlueCatalogPartitionSyncOperations.java | 519 +++++++++++++++++++++ .../xtable/glue/TestGlueCatalogSyncClient.java | 87 +++- .../table/TestHudiGlueCatalogTableBuilder.java | 94 ++-- .../HudiCatalogTablePropertiesExtractor.java | 36 +- .../TestHudiCatalogTablePropertiesExtractor.java | 52 ++- .../hms/table/HudiHMSCatalogTableBuilder.java | 35 +- .../hms/table/TestHudiHMSCatalogTableBuilder.java | 40 +- 17 files changed, 1340 insertions(+), 188 deletions(-) diff --git a/xtable-aws/pom.xml b/xtable-aws/pom.xml index 099e04d3..7ec24cf9 100644 --- a/xtable-aws/pom.xml +++ b/xtable-aws/pom.xml @@ -36,6 +36,14 @@ <version>${project.version}</version> </dependency> + <!-- Hudi dependencies --> + <dependency> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-hive-sync</artifactId> + <version>${hudi.version}</version> + <scope>provided</scope> + </dependency> + <!-- Hadoop dependencies --> <dependency> <groupId>org.apache.hadoop</groupId> diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java index 87443698..09e195f1 100644 --- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConfig.java @@ -28,6 +28,8 @@ import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.ToString; +import org.apache.hudi.hive.MultiPartKeysValueExtractor; + import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -54,6 +56,15 @@ public class GlueCatalogConfig { @JsonProperty("externalCatalog.glue.credentialsProviderClass") private final String clientCredentialsProviderClass; + @JsonProperty("externalCatalog.glue.schema_string_length_thresh") + private int schemaLengthThreshold = 4000; + + @JsonProperty("externalCatalog.glue.partition_extractor_class") + private String partitionExtractorClass = MultiPartKeysValueExtractor.class.getName(); + + @JsonProperty("externalCatalog.glue.max_partitions_per_request") + private int maxPartitionsPerRequest = 1000; + /** * In case a credentialsProviderClass is configured and require additional properties for * instantiation, those properties should start with {@link diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java index 370ae44a..dc905cef 100644 --- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogConversionSource.java @@ -18,8 +18,6 @@ package org.apache.xtable.glue; -import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; - import java.util.Properties; import org.apache.hadoop.conf.Configuration; @@ -31,13 +29,10 @@ import org.apache.xtable.conversion.ExternalCatalogConfig; import org.apache.xtable.conversion.SourceTable; import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.catalog.CatalogTableIdentifier; -import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.spi.extractor.CatalogConversionSource; import software.amazon.awssdk.services.glue.GlueClient; -import software.amazon.awssdk.services.glue.model.GetTableRequest; -import software.amazon.awssdk.services.glue.model.GetTableResponse; import software.amazon.awssdk.services.glue.model.GlueException; import software.amazon.awssdk.services.glue.model.Table; @@ -61,19 +56,12 @@ public class GlueCatalogConversionSource implements CatalogConversionSource { @Override public SourceTable getSourceTable(CatalogTableIdentifier tblIdentifier) { - HierarchicalTableIdentifier tableIdentifier = toHierarchicalTableIdentifier(tblIdentifier); try { - GetTableResponse response = - glueClient.getTable( - GetTableRequest.builder() - .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tableIdentifier.getDatabaseName()) - .name(tableIdentifier.getTableName()) - .build()); - Table table = response.table(); + Table table = + GlueCatalogTableUtils.getTable( + glueClient, glueCatalogConfig.getCatalogId(), tblIdentifier); if (table == null) { - throw new IllegalStateException( - String.format("table: %s is null", tableIdentifier.getId())); + throw new IllegalStateException(String.format("table: %s is null", tblIdentifier.getId())); } String tableFormat = TableFormatUtils.getTableFormat(table.parameters()); @@ -91,7 +79,7 @@ public class GlueCatalogConversionSource implements CatalogConversionSource { .additionalProperties(tableProperties) .build(); } catch (GlueException e) { - throw new CatalogSyncException("Failed to get table: " + tableIdentifier.getId(), e); + throw new CatalogSyncException("Failed to get table: " + tblIdentifier.getId(), e); } } diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java new file mode 100644 index 00000000..8c2cff24 --- /dev/null +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.glue; + +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; +import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.extern.log4j.Log4j2; + +import org.apache.hudi.common.util.CollectionUtils; + +import org.apache.xtable.catalog.CatalogPartition; +import org.apache.xtable.catalog.CatalogPartitionSyncOperations; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse; +import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequestEntry; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.PartitionInput; +import software.amazon.awssdk.services.glue.model.PartitionValueList; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; + +@Log4j2 +public class GlueCatalogPartitionSyncOperations implements CatalogPartitionSyncOperations { + + private final GlueClient glueClient; + private final GlueCatalogConfig glueCatalogConfig; + + public GlueCatalogPartitionSyncOperations( + GlueClient glueClient, GlueCatalogConfig glueCatalogConfig) { + this.glueClient = glueClient; + this.glueCatalogConfig = glueCatalogConfig; + } + + @Override + public List<CatalogPartition> getAllPartitions(CatalogTableIdentifier catalogTableIdentifier) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + try { + List<CatalogPartition> partitions = new ArrayList<>(); + String nextToken = null; + do { + GetPartitionsResponse result = + glueClient.getPartitions( + GetPartitionsRequest.builder() + .databaseName(tableIdentifier.getDatabaseName()) + .tableName(tableIdentifier.getTableName()) + .nextToken(nextToken) + .build()); + partitions.addAll( + result.partitions().stream() + .map(p -> new CatalogPartition(p.values(), p.storageDescriptor().location())) + .collect(Collectors.toList())); + nextToken = result.nextToken(); + } while (nextToken != null); + return partitions; + } catch (Exception e) { + throw new CatalogSyncException( + "Failed to get all partitions for table " + tableIdentifier, e); + } + } + + @Override + public void addPartitionsToTable( + CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToAdd) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + if (partitionsToAdd.isEmpty()) { + log.info("No partitions to add for {}", tableIdentifier); + return; + } + log.info("Adding {} CatalogPartition(s) in table {}", partitionsToAdd.size(), tableIdentifier); + try { + Table table = + GlueCatalogTableUtils.getTable( + glueClient, glueCatalogConfig.getCatalogId(), catalogTableIdentifier); + StorageDescriptor sd = table.storageDescriptor(); + List<PartitionInput> partitionInputs = + partitionsToAdd.stream() + .map(partition -> createPartitionInput(table, partition)) + .collect(Collectors.toList()); + + List<BatchCreatePartitionResponse> responses = new ArrayList<>(); + + CollectionUtils.batches(partitionInputs, glueCatalogConfig.getMaxPartitionsPerRequest()) + .forEach( + batch -> { + BatchCreatePartitionRequest request = + BatchCreatePartitionRequest.builder() + .databaseName(tableIdentifier.getDatabaseName()) + .tableName(tableIdentifier.getTableName()) + .partitionInputList(batch) + .build(); + responses.add(glueClient.batchCreatePartition(request)); + }); + + responses.forEach( + response -> { + if (CollectionUtils.nonEmpty(response.errors())) { + if (response.errors().stream() + .allMatch( + (error) -> + "AlreadyExistsException".equals(error.errorDetail().errorCode()))) { + log.warn("Partitions already exist in glue: {}", response.errors()); + } else { + throw new CatalogSyncException( + "Fail to add partitions to " + + tableIdentifier + + " with error(s): " + + response.errors()); + } + } + }); + } catch (Exception e) { + throw new CatalogSyncException("Fail to add partitions to " + tableIdentifier, e); + } + } + + @Override + public void updatePartitionsToTable( + CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> changedPartitions) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + if (changedPartitions.isEmpty()) { + log.info("No partitions to change for {}", tableIdentifier.getTableName()); + return; + } + log.info("Updating {} partition(s) in table {}", changedPartitions.size(), tableIdentifier); + try { + Table table = + GlueCatalogTableUtils.getTable( + glueClient, glueCatalogConfig.getCatalogId(), catalogTableIdentifier); + StorageDescriptor sd = table.storageDescriptor(); + List<BatchUpdatePartitionRequestEntry> updatePartitionEntries = + changedPartitions.stream() + .map( + partition -> + BatchUpdatePartitionRequestEntry.builder() + .partitionInput(createPartitionInput(table, partition)) + .partitionValueList(partition.getValues()) + .build()) + .collect(Collectors.toList()); + + List<BatchUpdatePartitionResponse> responses = new ArrayList<>(); + + CollectionUtils.batches( + updatePartitionEntries, glueCatalogConfig.getMaxPartitionsPerRequest()) + .forEach( + batch -> { + BatchUpdatePartitionRequest request = + BatchUpdatePartitionRequest.builder() + .databaseName(tableIdentifier.getDatabaseName()) + .tableName(tableIdentifier.getTableName()) + .entries(batch) + .build(); + responses.add(glueClient.batchUpdatePartition(request)); + }); + + responses.forEach( + response -> { + if (CollectionUtils.nonEmpty(response.errors())) { + throw new CatalogSyncException( + "Fail to update partitions to " + + tableIdentifier + + " with error(s): " + + response.errors()); + } + }); + } catch (Exception e) { + throw new CatalogSyncException("Fail to update partitions to " + tableIdentifier, e); + } + } + + @Override + public void dropPartitions( + CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToDrop) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + if (isNullOrEmpty(partitionsToDrop)) { + log.info("No partitions to drop for {}", tableIdentifier); + return; + } + log.info("Drop {} CatalogPartition(s) in table {}", partitionsToDrop.size(), tableIdentifier); + try { + List<BatchDeletePartitionResponse> responses = new ArrayList<>(); + + CollectionUtils.batches(partitionsToDrop, glueCatalogConfig.getMaxPartitionsPerRequest()) + .forEach( + batch -> { + List<PartitionValueList> partitionValueLists = + batch.stream() + .map( + CatalogPartition -> + PartitionValueList.builder() + .values(CatalogPartition.getValues()) + .build()) + .collect(Collectors.toList()); + + BatchDeletePartitionRequest batchDeletePartitionRequest = + BatchDeletePartitionRequest.builder() + .databaseName(tableIdentifier.getDatabaseName()) + .tableName(tableIdentifier.getTableName()) + .partitionsToDelete(partitionValueLists) + .build(); + responses.add(glueClient.batchDeletePartition(batchDeletePartitionRequest)); + }); + + responses.forEach( + response -> { + if (CollectionUtils.nonEmpty(response.errors())) { + throw new CatalogSyncException( + "Fail to drop partitions to " + + tableIdentifier + + " with error(s): " + + response.errors()); + } + }); + } catch (Exception e) { + throw new CatalogSyncException("Fail to drop partitions to " + tableIdentifier, e); + } + } + + @Override + public Map<String, String> getTableProperties( + CatalogTableIdentifier tableIdentifier, List<String> keysToRetrieve) { + try { + Table table = + GlueCatalogTableUtils.getTable( + glueClient, glueCatalogConfig.getCatalogId(), tableIdentifier); + Map<String, String> tableParameters = table.parameters(); + + return keysToRetrieve.stream() + .filter(tableParameters::containsKey) + .collect(Collectors.toMap(key -> key, tableParameters::get)); + } catch (Exception e) { + throw new CatalogSyncException( + "failed to get last time synced properties for table " + tableIdentifier, e); + } + } + + @Override + public void updateTableProperties( + CatalogTableIdentifier catalogTableIdentifier, Map<String, String> propertiesToUpdate) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + if (isNullOrEmpty(propertiesToUpdate)) { + return; + } + try { + Table table = + GlueCatalogTableUtils.getTable( + glueClient, glueCatalogConfig.getCatalogId(), catalogTableIdentifier); + + final Map<String, String> newParams = new HashMap<>(); + newParams.putAll(table.parameters()); + newParams.putAll(propertiesToUpdate); + + final Instant now = Instant.now(); + TableInput updatedTableInput = + TableInput.builder() + .name(tableIdentifier.getTableName()) + .tableType(table.tableType()) + .parameters(newParams) + .partitionKeys(table.partitionKeys()) + .storageDescriptor(table.storageDescriptor()) + .lastAccessTime(now) + .lastAnalyzedTime(now) + .build(); + + UpdateTableRequest request = + UpdateTableRequest.builder() + .databaseName(tableIdentifier.getDatabaseName()) + .tableInput(updatedTableInput) + .skipArchive(true) + .build(); + glueClient.updateTable(request); + } catch (Exception e) { + throw new CatalogSyncException( + "Fail to update last synced params for table " + + tableIdentifier + + ": " + + propertiesToUpdate, + e); + } + } + + private PartitionInput createPartitionInput(Table table, CatalogPartition partition) { + StorageDescriptor sd = table.storageDescriptor(); + StorageDescriptor partitionSD = + sd.copy(copySd -> copySd.location(partition.getStorageLocation())); + return PartitionInput.builder() + .values(partition.getValues()) + .storageDescriptor(partitionSD) + .build(); + } +} diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java index d7de2dec..37b91753 100644 --- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogSyncClient.java @@ -21,6 +21,7 @@ package org.apache.xtable.glue; import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; import java.time.ZonedDateTime; +import java.util.Optional; import lombok.extern.log4j.Log4j2; @@ -28,7 +29,9 @@ import org.apache.hadoop.conf.Configuration; import com.google.common.annotations.VisibleForTesting; +import org.apache.xtable.catalog.CatalogPartitionSyncTool; import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.catalog.CatalogUtils; import org.apache.xtable.conversion.ExternalCatalogConfig; import org.apache.xtable.exception.CatalogSyncException; import org.apache.xtable.model.InternalTable; @@ -45,8 +48,6 @@ import software.amazon.awssdk.services.glue.model.DatabaseInput; import software.amazon.awssdk.services.glue.model.DeleteTableRequest; import software.amazon.awssdk.services.glue.model.EntityNotFoundException; import software.amazon.awssdk.services.glue.model.GetDatabaseRequest; -import software.amazon.awssdk.services.glue.model.GetTableRequest; -import software.amazon.awssdk.services.glue.model.GetTableResponse; import software.amazon.awssdk.services.glue.model.Table; import software.amazon.awssdk.services.glue.model.TableInput; import software.amazon.awssdk.services.glue.model.UpdateTableRequest; @@ -63,6 +64,7 @@ public class GlueCatalogSyncClient implements CatalogSyncClient<Table> { private GlueCatalogConfig glueCatalogConfig; private Configuration configuration; private CatalogTableBuilder<TableInput, Table> tableBuilder; + private Optional<CatalogPartitionSyncTool> partitionSyncTool; // For loading the instance using ServiceLoader public GlueCatalogSyncClient() {} @@ -78,12 +80,14 @@ public class GlueCatalogSyncClient implements CatalogSyncClient<Table> { Configuration configuration, GlueCatalogConfig glueCatalogConfig, GlueClient glueClient, - CatalogTableBuilder tableBuilder) { + CatalogTableBuilder tableBuilder, + Optional<CatalogPartitionSyncTool> partitionSyncTool) { this.catalogConfig = catalogConfig; this.configuration = new Configuration(configuration); this.glueCatalogConfig = glueCatalogConfig; this.glueClient = glueClient; this.tableBuilder = tableBuilder; + this.partitionSyncTool = partitionSyncTool; } @Override @@ -143,20 +147,13 @@ public class GlueCatalogSyncClient implements CatalogSyncClient<Table> { @Override public Table getTable(CatalogTableIdentifier tableIdentifier) { - HierarchicalTableIdentifier tblIdentifier = toHierarchicalTableIdentifier(tableIdentifier); try { - GetTableResponse response = - glueClient.getTable( - GetTableRequest.builder() - .catalogId(glueCatalogConfig.getCatalogId()) - .databaseName(tblIdentifier.getDatabaseName()) - .name(tblIdentifier.getTableName()) - .build()); - return response.table(); + return GlueCatalogTableUtils.getTable( + glueClient, glueCatalogConfig.getCatalogId(), tableIdentifier); } catch (EntityNotFoundException e) { return null; } catch (Exception e) { - throw new CatalogSyncException("Failed to get table: " + tblIdentifier.getId(), e); + throw new CatalogSyncException("Failed to get table: " + tableIdentifier.getId(), e); } } @@ -174,6 +171,8 @@ public class GlueCatalogSyncClient implements CatalogSyncClient<Table> { } catch (Exception e) { throw new CatalogSyncException("Failed to create table: " + tblIdentifier.getId(), e); } + + partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, tableIdentifier)); } @Override @@ -193,6 +192,8 @@ public class GlueCatalogSyncClient implements CatalogSyncClient<Table> { } catch (Exception e) { throw new CatalogSyncException("Failed to refresh table: " + tblIdentifier.getId(), e); } + + partitionSyncTool.ifPresent(tool -> tool.syncPartitions(table, tableIdentifier)); } @Override @@ -230,7 +231,16 @@ public class GlueCatalogSyncClient implements CatalogSyncClient<Table> { this.glueCatalogConfig = GlueCatalogConfig.of(catalogConfig.getCatalogProperties()); this.glueClient = new DefaultGlueClientFactory(glueCatalogConfig).getGlueClient(); this.configuration = new Configuration(configuration); - this.tableBuilder = GlueCatalogTableBuilderFactory.getInstance(tableFormat, this.configuration); + this.tableBuilder = + GlueCatalogTableBuilderFactory.getInstance( + tableFormat, glueCatalogConfig, this.configuration); + this.partitionSyncTool = + CatalogUtils.getPartitionSyncTool( + tableFormat, + glueCatalogConfig.getPartitionExtractorClass(), + new GlueCatalogPartitionSyncOperations(glueClient, glueCatalogConfig), + configuration); + ; } @Override diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java index cd0b1d08..7316aaf2 100644 --- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.xtable.catalog.CatalogTableBuilder; import org.apache.xtable.exception.NotSupportedException; import org.apache.xtable.glue.table.DeltaGlueCatalogTableBuilder; +import org.apache.xtable.glue.table.HudiGlueCatalogTableBuilder; import org.apache.xtable.glue.table.IcebergGlueCatalogTableBuilder; import org.apache.xtable.model.storage.TableFormat; @@ -32,12 +33,14 @@ import software.amazon.awssdk.services.glue.model.TableInput; class GlueCatalogTableBuilderFactory { static CatalogTableBuilder<TableInput, Table> getInstance( - String tableFormat, Configuration configuration) { + String tableFormat, GlueCatalogConfig glueCatalogConfig, Configuration configuration) { switch (tableFormat) { case TableFormat.ICEBERG: return new IcebergGlueCatalogTableBuilder(configuration); case TableFormat.DELTA: return new DeltaGlueCatalogTableBuilder(); + case TableFormat.HUDI: + return new HudiGlueCatalogTableBuilder(glueCatalogConfig, configuration); default: throw new NotSupportedException("Unsupported table format: " + tableFormat); } diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java new file mode 100644 index 00000000..269f79a5 --- /dev/null +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableUtils.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.glue; + +import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; + +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; + +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.Table; + +/** Utils class to fetch details about Glue table */ +public class GlueCatalogTableUtils { + + /** + * Retrieves a table from AWS Glue based on the provided catalog and table identifiers. + * + * @param glueClient The AWS Glue client used to connect to AWS Glue. + * @param catalogId The ID of the AWS Glue Data Catalog where the table is located. + * @param catalogTableIdentifier The identifier for the table in the catalog. + * @return The retrieved {@link Table} object if found; otherwise, returns {@code null} if the + * table does not exist. + */ + public static Table getTable( + GlueClient glueClient, String catalogId, CatalogTableIdentifier catalogTableIdentifier) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + GetTableResponse response = + glueClient.getTable( + GetTableRequest.builder() + .catalogId(catalogId) + .databaseName(tableIdentifier.getDatabaseName()) + .name(tableIdentifier.getTableName()) + .build()); + return response.table(); + } +} diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java index 1bb51adc..24da6ea4 100644 --- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java @@ -143,7 +143,7 @@ public class GlueSchemaExtractor { * @param fieldSchema InternalTable field schema * @return glue column type */ - protected String toTypeString(InternalSchema fieldSchema, String tableFormat) { + public String toTypeString(InternalSchema fieldSchema, String tableFormat) { switch (fieldSchema.getDataType()) { case BOOLEAN: return "boolean"; diff --git a/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java b/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java new file mode 100644 index 00000000..9618873c --- /dev/null +++ b/xtable-aws/src/main/java/org/apache/xtable/glue/table/HudiGlueCatalogTableBuilder.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.glue.table; + +import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier; + +import java.time.Instant; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.table.HoodieTableMetaClient; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.xtable.catalog.CatalogTableBuilder; +import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.glue.GlueCatalogConfig; +import org.apache.xtable.glue.GlueSchemaExtractor; +import org.apache.xtable.hudi.HudiTableManager; +import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor; +import org.apache.xtable.hudi.catalog.HudiInputFormatUtils; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.catalog.CatalogTableIdentifier; +import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.storage.TableFormat; + +import software.amazon.awssdk.services.glue.model.Column; +import software.amazon.awssdk.services.glue.model.SerDeInfo; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + +public class HudiGlueCatalogTableBuilder implements CatalogTableBuilder<TableInput, Table> { + protected static final String GLUE_EXTERNAL_TABLE_TYPE = "EXTERNAL_TABLE"; + + private final GlueSchemaExtractor schemaExtractor; + private final HudiTableManager hudiTableManager; + private final GlueCatalogConfig glueCatalogConfig; + private HoodieTableMetaClient metaClient; + private final HudiCatalogTablePropertiesExtractor tablePropertiesExtractor; + + public HudiGlueCatalogTableBuilder( + GlueCatalogConfig glueCatalogConfig, Configuration configuration) { + this.glueCatalogConfig = glueCatalogConfig; + this.schemaExtractor = GlueSchemaExtractor.getInstance(); + this.hudiTableManager = HudiTableManager.of(configuration); + this.tablePropertiesExtractor = HudiCatalogTablePropertiesExtractor.getInstance(); + } + + @VisibleForTesting + HudiGlueCatalogTableBuilder( + GlueCatalogConfig glueCatalogConfig, + GlueSchemaExtractor schemaExtractor, + HudiTableManager hudiTableManager, + HoodieTableMetaClient metaClient, + HudiCatalogTablePropertiesExtractor propertiesExtractor) { + this.glueCatalogConfig = glueCatalogConfig; + this.hudiTableManager = hudiTableManager; + this.schemaExtractor = schemaExtractor; + this.metaClient = metaClient; + this.tablePropertiesExtractor = propertiesExtractor; + } + + HoodieTableMetaClient getMetaClient(String basePath) { + if (metaClient == null) { + Optional<HoodieTableMetaClient> metaClientOpt = + hudiTableManager.loadTableMetaClientIfExists(basePath); + + if (!metaClientOpt.isPresent()) { + throw new CatalogSyncException( + "Failed to get meta client since table is not present in the base path " + basePath); + } + + metaClient = metaClientOpt.get(); + } + return metaClient; + } + + @Override + public TableInput getCreateTableRequest( + InternalTable table, CatalogTableIdentifier catalogTableIdentifier) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + final Instant now = Instant.now(); + return TableInput.builder() + .name(tableIdentifier.getTableName()) + .tableType(GLUE_EXTERNAL_TABLE_TYPE) + .parameters( + tablePropertiesExtractor.getTableProperties( + table, glueCatalogConfig.getSchemaLengthThreshold())) + .partitionKeys(getSchemaPartitionKeys(table.getPartitioningFields())) + .storageDescriptor(getStorageDescriptor(table)) + .lastAccessTime(now) + .lastAnalyzedTime(now) + .build(); + } + + @Override + public TableInput getUpdateTableRequest( + InternalTable table, Table glueTable, CatalogTableIdentifier catalogTableIdentifier) { + HierarchicalTableIdentifier tableIdentifier = + toHierarchicalTableIdentifier(catalogTableIdentifier); + Map<String, String> tableParameters = new HashMap<>(glueTable.parameters()); + tableParameters.putAll( + tablePropertiesExtractor.getTableProperties( + table, glueCatalogConfig.getSchemaLengthThreshold())); + List<Column> newColumns = getSchemaWithoutPartitionKeys(table); + StorageDescriptor sd = glueTable.storageDescriptor(); + StorageDescriptor partitionSD = sd.copy(copySd -> copySd.columns(newColumns)); + + final Instant now = Instant.now(); + return TableInput.builder() + .name(tableIdentifier.getTableName()) + .tableType(glueTable.tableType()) + .parameters(tableParameters) + .partitionKeys(getSchemaPartitionKeys(table.getPartitioningFields())) + .storageDescriptor(partitionSD) + .lastAccessTime(now) + .lastAnalyzedTime(now) + .build(); + } + + private StorageDescriptor getStorageDescriptor(InternalTable table) { + HoodieFileFormat baseFileFormat = + getMetaClient(table.getBasePath()).getTableConfig().getBaseFileFormat(); + SerDeInfo serDeInfo = + SerDeInfo.builder() + .serializationLibrary(HudiInputFormatUtils.getSerDeClassName(baseFileFormat)) + .parameters(tablePropertiesExtractor.getSerdeProperties(table.getBasePath())) + .build(); + return StorageDescriptor.builder() + .serdeInfo(serDeInfo) + .location(table.getBasePath()) + .inputFormat(HudiInputFormatUtils.getInputFormatClassName(baseFileFormat, false)) + .outputFormat(HudiInputFormatUtils.getOutputFormatClassName(baseFileFormat)) + .columns(getSchemaWithoutPartitionKeys(table)) + .build(); + } + + private List<Column> getSchemaWithoutPartitionKeys(InternalTable table) { + List<String> partitionKeys = + table.getPartitioningFields().stream() + .map(field -> field.getSourceField().getName()) + .collect(Collectors.toList()); + return schemaExtractor.toColumns(TableFormat.HUDI, table.getReadSchema()).stream() + .filter(c -> !partitionKeys.contains(c.name())) + .collect(Collectors.toList()); + } + + private List<Column> getSchemaPartitionKeys(List<InternalPartitionField> partitioningFields) { + return partitioningFields.stream() + .map( + field -> { + String fieldName = field.getSourceField().getName(); + String fieldType = + schemaExtractor.toTypeString( + field.getSourceField().getSchema(), TableFormat.HUDI); + return Column.builder().name(fieldName).type(fieldType).build(); + }) + .collect(Collectors.toList()); + } +} diff --git a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java index d6efeb19..a1f5e73c 100644 --- a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java +++ b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java @@ -138,6 +138,13 @@ public class GlueCatalogSyncTestBase { .readSchema(INTERNAL_SCHEMA) .partitioningFields(Collections.singletonList(PARTITION_FIELD)) .build(); + protected static final InternalTable TEST_EVOLVED_HUDI_INTERNAL_TABLE = + InternalTable.builder() + .basePath(TEST_BASE_PATH) + .tableFormat(TableFormat.HUDI) + .readSchema(UPDATED_INTERNAL_SCHEMA) + .partitioningFields(Collections.singletonList(PARTITION_FIELD)) + .build(); protected static final InternalTable TEST_DELTA_INTERNAL_TABLE = InternalTable.builder() .basePath(TEST_BASE_PATH) diff --git a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java new file mode 100644 index 00000000..edf3a790 --- /dev/null +++ b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogPartitionSyncOperations.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.glue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.apache.xtable.catalog.CatalogPartitionSyncOperations; +import org.apache.xtable.exception.CatalogSyncException; + +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchCreatePartitionResponse; +import software.amazon.awssdk.services.glue.model.BatchDeletePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchDeletePartitionResponse; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionRequest; +import software.amazon.awssdk.services.glue.model.BatchUpdatePartitionResponse; +import software.amazon.awssdk.services.glue.model.GetPartitionsRequest; +import software.amazon.awssdk.services.glue.model.GetPartitionsResponse; +import software.amazon.awssdk.services.glue.model.GetTableRequest; +import software.amazon.awssdk.services.glue.model.GetTableResponse; +import software.amazon.awssdk.services.glue.model.Partition; +import software.amazon.awssdk.services.glue.model.StorageDescriptor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.UpdateTableRequest; +import software.amazon.awssdk.services.glue.model.UpdateTableResponse; + +@ExtendWith(MockitoExtension.class) +public class TestGlueCatalogPartitionSyncOperations extends GlueCatalogSyncTestBase { + + private CatalogPartitionSyncOperations gluePartitionSyncOperations; + + @BeforeEach + public void setup() { + setupCommonMocks(); + } + + void setupCommonMocks() { + gluePartitionSyncOperations = + new GlueCatalogPartitionSyncOperations(mockGlueClient, mockGlueCatalogConfig); + } + + @Test + void testGetAllPartitionsSuccess() { + // Mock the first response with a next token + GetPartitionsResponse firstResponse = + GetPartitionsResponse.builder() + .partitions( + Arrays.asList( + Partition.builder() + .values(Collections.singletonList("value1")) + .storageDescriptor( + StorageDescriptor.builder().location("location1").build()) + .build(), + Partition.builder() + .values(Collections.singletonList("value2")) + .storageDescriptor( + StorageDescriptor.builder().location("location2").build()) + .build())) + .nextToken("token123") + .build(); + + // Mock the second response without a next token + GetPartitionsResponse secondResponse = + GetPartitionsResponse.builder() + .partitions( + Collections.singletonList( + Partition.builder() + .values(Collections.singletonList("value3")) + .storageDescriptor( + StorageDescriptor.builder().location("location3").build()) + .build())) + .nextToken(null) + .build(); + + when(mockGlueClient.getPartitions(any(GetPartitionsRequest.class))) + .thenReturn(firstResponse) + .thenReturn(secondResponse); + + List<org.apache.xtable.catalog.CatalogPartition> partitions = + gluePartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER); + + // Validate the result + assertEquals(3, partitions.size()); + + assertEquals(Collections.singletonList("value1"), partitions.get(0).getValues()); + assertEquals("location1", partitions.get(0).getStorageLocation()); + + assertEquals(Collections.singletonList("value2"), partitions.get(1).getValues()); + assertEquals("location2", partitions.get(1).getStorageLocation()); + + assertEquals(Collections.singletonList("value3"), partitions.get(2).getValues()); + assertEquals("location3", partitions.get(2).getStorageLocation()); + + // Verify GlueClient interactions + verify(mockGlueClient, times(2)).getPartitions(any(GetPartitionsRequest.class)); + } + + @Test + void testGetAllPartitionsHandlesException() { + // Mock GlueClient to throw an exception + when(mockGlueClient.getPartitions(any(GetPartitionsRequest.class))) + .thenThrow(new RuntimeException("Test exception")); + + // Execute and validate exception + + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> gluePartitionSyncOperations.getAllPartitions(TEST_CATALOG_TABLE_IDENTIFIER)); + + assertInstanceOf(RuntimeException.class, exception.getCause()); + + // Verify GlueClient interactions + verify(mockGlueClient, times(1)).getPartitions(any(GetPartitionsRequest.class)); + } + + @Test + void testAddPartitionsToTableSuccess() { + // Mock getTable to return a valid table + Table mockTable = + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .storageDescriptor(StorageDescriptor.builder().build()) + .build(); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + + // Mock glueClient to return a valid response for batchCreatePartition + BatchCreatePartitionResponse mockResponse = mock(BatchCreatePartitionResponse.class); + when(mockGlueClient.batchCreatePartition(any(BatchCreatePartitionRequest.class))) + .thenReturn(mockResponse); + + // Execute the method + gluePartitionSyncOperations.addPartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, + Collections.singletonList( + new org.apache.xtable.catalog.CatalogPartition( + Collections.singletonList("value1"), "location1"))); + + // Verify that batchCreatePartition was called + ArgumentCaptor<BatchCreatePartitionRequest> createPartitionRequestArgumentCaptor = + ArgumentCaptor.forClass(BatchCreatePartitionRequest.class); + verify(mockGlueClient, times(1)) + .batchCreatePartition(createPartitionRequestArgumentCaptor.capture()); + + BatchCreatePartitionRequest request = createPartitionRequestArgumentCaptor.getValue(); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), request.databaseName()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), request.tableName()); + assertEquals(1, request.partitionInputList().size()); + assertEquals("location1", request.partitionInputList().get(0).storageDescriptor().location()); + assertEquals(Collections.singletonList("value1"), request.partitionInputList().get(0).values()); + } + + @Test + void testAddPartitionsToTableHandlesException() { + // Mock getTable to return a valid table + Table mockTable = mock(Table.class); + StorageDescriptor mockSd = mock(StorageDescriptor.class); + when(mockTable.storageDescriptor()).thenReturn(mockSd); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + + // Mock glueClient to throw an exception when batchCreatePartition is called + when(mockGlueClient.batchCreatePartition(any(BatchCreatePartitionRequest.class))) + .thenThrow(new RuntimeException("Test exception")); + + // Test the exception handling + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + gluePartitionSyncOperations.addPartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, + Collections.singletonList( + new org.apache.xtable.catalog.CatalogPartition( + Collections.singletonList("value1"), "location1")))); + + // Verify that the exception was caused by a RuntimeException + assertInstanceOf(RuntimeException.class, exception.getCause()); + + // Verify glueClient.batchCreatePartition was called + verify(mockGlueClient, times(1)).batchCreatePartition(any(BatchCreatePartitionRequest.class)); + } + + @Test + void testUpdatePartitionsToTableSuccess() { + // Mock getTable to return a valid table + Table mockTable = + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .storageDescriptor(StorageDescriptor.builder().build()) + .build(); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + + // Prepare test partition to be updated + org.apache.xtable.catalog.CatalogPartition partitionToUpdate = + new org.apache.xtable.catalog.CatalogPartition( + Collections.singletonList("value1"), "new_location"); + + // Mock glueClient to return a valid response for batchUpdatePartition + BatchUpdatePartitionResponse mockResponse = mock(BatchUpdatePartitionResponse.class); + when(mockGlueClient.batchUpdatePartition(any(BatchUpdatePartitionRequest.class))) + .thenReturn(mockResponse); + + // Execute the method + gluePartitionSyncOperations.updatePartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, Collections.singletonList(partitionToUpdate)); + + // Verify that batchUpdatePartition was called + ArgumentCaptor<BatchUpdatePartitionRequest> updatePartitionRequestArgumentCaptor = + ArgumentCaptor.forClass(BatchUpdatePartitionRequest.class); + verify(mockGlueClient, times(1)) + .batchUpdatePartition(updatePartitionRequestArgumentCaptor.capture()); + + // Capture the request + BatchUpdatePartitionRequest request = updatePartitionRequestArgumentCaptor.getValue(); + + // Validate the request's properties + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), request.databaseName()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), request.tableName()); + assertEquals(1, request.entries().size()); + assertEquals( + "new_location", request.entries().get(0).partitionInput().storageDescriptor().location()); + assertEquals( + Collections.singletonList("value1"), request.entries().get(0).partitionInput().values()); + } + + @Test + void testUpdatePartitionsToTableHandlesException() { + // Mock getTable to return a valid table + Table mockTable = + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .storageDescriptor(StorageDescriptor.builder().build()) + .build(); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + + // Mock glueClient to throw an exception + when(mockGlueClient.batchUpdatePartition(any(BatchUpdatePartitionRequest.class))) + .thenThrow(new RuntimeException("Test exception")); + + // Execute and validate exception + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + gluePartitionSyncOperations.updatePartitionsToTable( + TEST_CATALOG_TABLE_IDENTIFIER, + Collections.singletonList( + new org.apache.xtable.catalog.CatalogPartition( + Collections.singletonList("value1"), "new_location")))); + + // Verify the exception cause + assertInstanceOf(RuntimeException.class, exception.getCause()); + + // Verify glueClient.batchUpdatePartition was called + verify(mockGlueClient, times(1)).batchUpdatePartition(any(BatchUpdatePartitionRequest.class)); + } + + @Test + void testDropPartitionsSuccess() { + // Prepare a partition to drop + org.apache.xtable.catalog.CatalogPartition partitionToDrop = + new org.apache.xtable.catalog.CatalogPartition( + Collections.singletonList("value1"), "location1"); + + // Mock glueClient to return a valid response for batchDeletePartition + BatchDeletePartitionResponse mockResponse = mock(BatchDeletePartitionResponse.class); + when(mockGlueClient.batchDeletePartition(any(BatchDeletePartitionRequest.class))) + .thenReturn(mockResponse); + when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + + gluePartitionSyncOperations.dropPartitions( + TEST_CATALOG_TABLE_IDENTIFIER, Collections.singletonList(partitionToDrop)); + + // Verify that batchDeletePartition was called + ArgumentCaptor<BatchDeletePartitionRequest> deletePartitionRequestArgumentCaptor = + ArgumentCaptor.forClass(BatchDeletePartitionRequest.class); + verify(mockGlueClient, times(1)) + .batchDeletePartition(deletePartitionRequestArgumentCaptor.capture()); + + // Capture the request + BatchDeletePartitionRequest request = deletePartitionRequestArgumentCaptor.getValue(); + + // Validate the request's properties + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), request.databaseName()); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), request.tableName()); + assertEquals(1, request.partitionsToDelete().size()); + assertEquals(Collections.singletonList("value1"), request.partitionsToDelete().get(0).values()); + } + + @Test + void testDropPartitionsHandlesException() { + // Mock glueClient to throw an exception during batchDeletePartition + when(mockGlueClient.batchDeletePartition(any(BatchDeletePartitionRequest.class))) + .thenThrow(new RuntimeException("Test exception")); + when(mockGlueCatalogConfig.getMaxPartitionsPerRequest()).thenReturn(100); + + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + gluePartitionSyncOperations.dropPartitions( + TEST_CATALOG_TABLE_IDENTIFIER, + Collections.singletonList( + new org.apache.xtable.catalog.CatalogPartition( + Collections.singletonList("value1"), "location1")))); + + // Verify the exception cause + assertInstanceOf(RuntimeException.class, exception.getCause()); + + // Verify glueClient.batchDeletePartition was called + verify(mockGlueClient, times(1)).batchDeletePartition(any(BatchDeletePartitionRequest.class)); + } + + @Test + void testDropPartitionsNoPartitionsToDrop() { + // Execute the method with an empty partition list + gluePartitionSyncOperations.dropPartitions( + TEST_CATALOG_TABLE_IDENTIFIER, Collections.emptyList()); + + // Verify that batchDeletePartition was not called + verify(mockGlueClient, times(0)).batchDeletePartition(any(BatchDeletePartitionRequest.class)); + } + + @Test + void testGetLastTimeSyncedPropertiesSuccess() { + // Mock table and parameters + Map<String, String> mockParameters = new HashMap<>(); + mockParameters.put("last_synced_time", "2023-12-01T12:00:00Z"); + mockParameters.put("last_modified_by", "user123"); + Table mockTable = + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .storageDescriptor(StorageDescriptor.builder().build()) + .parameters(mockParameters) + .build(); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + + // List of keys to look for + List<String> lastSyncedKeys = Arrays.asList("last_synced_time", "last_modified_by"); + + // Execute the method + Map<String, String> result = + gluePartitionSyncOperations.getTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys); + + // Assert the result + assertEquals(2, result.size()); + assertEquals("2023-12-01T12:00:00Z", result.get("last_synced_time")); + assertEquals("user123", result.get("last_modified_by")); + } + + @Test + void testGetLastTimeSyncedPropertiesKeyNotFound() { + // Mock table and parameters + Map<String, String> mockParameters = + Collections.singletonMap("last_synced_time", "2023-12-01T12:00:00Z"); + Table mockTable = + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .storageDescriptor(StorageDescriptor.builder().build()) + .parameters(mockParameters) + .build(); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + + // List of keys to look for (including a non-existent key) + List<String> lastSyncedKeys = Arrays.asList("last_synced_time", "non_existent_key"); + + // Execute the method + Map<String, String> result = + gluePartitionSyncOperations.getTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, lastSyncedKeys); + + // Assert the result + assertEquals(1, result.size()); + assertEquals("2023-12-01T12:00:00Z", result.get("last_synced_time")); + } + + @Test + void testGetLastTimeSyncedPropertiesHandlesException() { + // Mock getTable to throw an exception + when(mockGlueClient.getTable(any(GetTableRequest.class))) + .thenThrow(new RuntimeException("Test exception")); + + // Execute and validate exception + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + gluePartitionSyncOperations.getTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, Collections.singletonList("last_synced_time"))); + + // Verify the exception cause + assertInstanceOf(RuntimeException.class, exception.getCause()); + + // Verify that getTable was called + verify(mockGlueClient, times(1)).getTable(any(GetTableRequest.class)); + } + + @Test + void testUpdateLastTimeSyncedPropertiesSuccess() { + // Mock table and parameters + Map<String, String> mockParameters = new HashMap<>(); + mockParameters.put("last_synced_time", "2023-12-01T12:00:00Z"); + mockParameters.put("last_modified_by", "user123"); + Table mockTable = + Table.builder() + .databaseName(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()) + .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName()) + .storageDescriptor(StorageDescriptor.builder().build()) + .parameters(mockParameters) + .build(); + GetTableResponse tableResponse = GetTableResponse.builder().table(mockTable).build(); + when(mockGlueClient.getTable(any(GetTableRequest.class))).thenReturn(tableResponse); + + // Mock the updateTable call to verify it's triggered + when(mockGlueClient.updateTable(any(UpdateTableRequest.class))) + .thenReturn(UpdateTableResponse.builder().build()); + + // New properties to update + Map<String, String> newProperties = new HashMap<>(); + newProperties.put("last_synced_time", "2023-12-02T14:00:00Z"); + newProperties.put("last_modified_by", "user456"); + + // Execute the method + gluePartitionSyncOperations.updateTableProperties(TEST_CATALOG_TABLE_IDENTIFIER, newProperties); + + // Capture the UpdateTableRequest sent to the GlueClient + ArgumentCaptor<UpdateTableRequest> captor = ArgumentCaptor.forClass(UpdateTableRequest.class); + verify(mockGlueClient, times(1)).updateTable(captor.capture()); + UpdateTableRequest capturedRequest = captor.getValue(); + + // Assert that the table update request contains the new parameters + assertEquals( + "2023-12-02T14:00:00Z", capturedRequest.tableInput().parameters().get("last_synced_time")); + assertEquals("user456", capturedRequest.tableInput().parameters().get("last_modified_by")); + } + + @Test + void testUpdateLastTimeSyncedPropertiesHandlesException() { + // Setup mocks to throw an exception + when(mockGlueClient.getTable(any(GetTableRequest.class))) + .thenThrow(new RuntimeException("Test exception")); + + // New properties to update + Map<String, String> newProperties = new HashMap<>(); + newProperties.put("last_synced_time", "2023-12-02T14:00:00Z"); + + // Execute and validate exception + CatalogSyncException exception = + assertThrows( + CatalogSyncException.class, + () -> + gluePartitionSyncOperations.updateTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, newProperties)); + + // Assert that the exception has the right cause + assertInstanceOf(RuntimeException.class, exception.getCause()); + verify(mockGlueClient, times(1)).getTable(any(GetTableRequest.class)); + } + + @Test + void testUpdateLastTimeSyncedPropertiesEmptyProperties() { + // Execute the method with empty properties (nothing to update) + gluePartitionSyncOperations.updateTableProperties( + TEST_CATALOG_TABLE_IDENTIFIER, new HashMap<>()); + + // Verify that updateTable was never called + verify(mockGlueClient, times(0)).updateTable(any(UpdateTableRequest.class)); + } +} diff --git a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java index 0ec0d04b..2fa5a53c 100644 --- a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java +++ b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueCatalogSyncClient.java @@ -25,6 +25,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; @@ -34,6 +35,7 @@ import static org.mockito.Mockito.when; import java.time.ZonedDateTime; import java.util.Collections; +import java.util.Optional; import java.util.ServiceLoader; import org.junit.jupiter.api.Test; @@ -44,8 +46,10 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import org.apache.xtable.catalog.CatalogPartitionSyncTool; import org.apache.xtable.catalog.CatalogTableBuilder; import org.apache.xtable.exception.CatalogSyncException; +import org.apache.xtable.hudi.catalog.HudiCatalogPartitionSyncTool; import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier; import org.apache.xtable.model.storage.CatalogType; import org.apache.xtable.spi.sync.CatalogSyncClient; @@ -71,15 +75,22 @@ import software.amazon.awssdk.services.glue.model.UpdateTableResponse; public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @Mock private CatalogTableBuilder<TableInput, Table> mockTableBuilder; + @Mock private HudiCatalogPartitionSyncTool mockPartitionSyncTool; private GlueCatalogSyncClient glueCatalogSyncClient; - private GlueCatalogSyncClient createGlueCatalogSyncClient() { + private GlueCatalogSyncClient createGlueCatalogSyncClient(boolean includePartitionSyncTool) { + Optional<CatalogPartitionSyncTool> partitionSyncToolOpt = + includePartitionSyncTool ? Optional.of(mockPartitionSyncTool) : Optional.empty(); return new GlueCatalogSyncClient( - catalogConfig, testConfiguration, mockGlueCatalogConfig, mockGlueClient, mockTableBuilder); + catalogConfig, + testConfiguration, + mockGlueCatalogConfig, + mockGlueClient, + mockTableBuilder, + partitionSyncToolOpt); } void setupCommonMocks() { - glueCatalogSyncClient = createGlueCatalogSyncClient(); when(mockGlueCatalogConfig.getCatalogId()).thenReturn(TEST_GLUE_CATALOG_ID); } @@ -87,6 +98,7 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @ValueSource(booleans = {true, false}) void testHasDatabase(boolean isDbPresent) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(false); GetDatabaseRequest dbRequest = getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); GetDatabaseResponse dbResponse = GetDatabaseResponse.builder() @@ -111,6 +123,7 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @Test void testHasDatabaseFailure() { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(false); GetDatabaseRequest dbRequest = getDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); when(mockGlueClient.getDatabase(dbRequest)).thenThrow(TEST_GLUE_EXCEPTION); CatalogSyncException exception = @@ -126,6 +139,7 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @ValueSource(booleans = {true, false}) void testGetTable(boolean isTablePresent) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(false); GetTableRequest tableRequest = getTableRequest( TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), @@ -158,6 +172,7 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @Test void testGetTableFailure() { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(false); GetTableRequest tableRequest = getTableRequest( TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), @@ -177,6 +192,7 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @ValueSource(booleans = {false, true}) void testCreateDatabase(boolean shouldFail) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(false); CreateDatabaseRequest dbRequest = createDbRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName()); if (shouldFail) { @@ -200,6 +216,7 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { @ValueSource(booleans = {false, true}) void testDropTable(boolean shouldFail) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(false); DeleteTableRequest deleteRequest = deleteTableRequest( TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), @@ -223,9 +240,11 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { verify(mockGlueClient, times(1)).deleteTable(deleteRequest); } - @Test - void testCreateTable_Success() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateTable_Success(boolean syncPartitions) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions); CreateTableRequest createTableRequest = createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); when(mockTableBuilder.getCreateTableRequest( @@ -237,12 +256,19 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { verify(mockGlueClient, times(1)).createTable(createTableRequest); verify(mockTableBuilder, times(1)) .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); + if (syncPartitions) { + verify(mockPartitionSyncTool, times(1)) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } else { + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } } - @Test - void testCreateTable_ErrorGettingTableInput() { - glueCatalogSyncClient = createGlueCatalogSyncClient(); - + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateTable_ErrorGettingTableInput(boolean syncPartitions) { + glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions); // error when getting iceberg table input doThrow(new RuntimeException("something went wrong")) .when(mockTableBuilder) @@ -255,12 +281,15 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { verify(mockTableBuilder, times(1)) .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockGlueClient, never()).createTable(any(CreateTableRequest.class)); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } - @Test - void testCreateTable_ErrorCreatingTable() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCreateTable_ErrorCreatingTable(boolean syncPartitions) { setupCommonMocks(); - + glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions); // error when creating table CreateTableRequest createTableRequest = createTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); @@ -280,11 +309,15 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { verify(mockTableBuilder, times(1)) .getCreateTableRequest(TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockGlueClient, times(1)).createTable(createTableRequest); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } - @Test - void testRefreshTable_Success() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testRefreshTable_Success(boolean syncPartitions) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions); UpdateTableRequest updateTableRequest = updateTableRequest(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), TEST_TABLE_INPUT); Table glueTable = Table.builder().parameters(Collections.emptyMap()).build(); @@ -299,11 +332,19 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { verify(mockTableBuilder, times(1)) .getUpdateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); + if (syncPartitions) { + verify(mockPartitionSyncTool, times(1)) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } else { + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); + } } - @Test - void testRefreshTable_ErrorCreatingTableInput() { - glueCatalogSyncClient = createGlueCatalogSyncClient(); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testRefreshTable_ErrorCreatingTableInput(boolean syncPartitions) { + glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions); Table glueTable = Table.builder().parameters(Collections.emptyMap()).build(); // error while refreshing table @@ -320,11 +361,15 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { .getUpdateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockGlueClient, never()).updateTable(any(UpdateTableRequest.class)); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } - @Test - void testRefreshTable_ErrorRefreshingTable() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testRefreshTable_ErrorRefreshingTable(boolean syncPartitions) { setupCommonMocks(); + glueCatalogSyncClient = createGlueCatalogSyncClient(syncPartitions); Table glueTable = Table.builder().parameters(Collections.emptyMap()).build(); UpdateTableRequest updateTableRequest = @@ -348,12 +393,14 @@ public class TestGlueCatalogSyncClient extends GlueCatalogSyncTestBase { .getUpdateTableRequest( TEST_ICEBERG_INTERNAL_TABLE, glueTable, TEST_CATALOG_TABLE_IDENTIFIER); verify(mockGlueClient, times(1)).updateTable(updateTableRequest); + verify(mockPartitionSyncTool, never()) + .syncPartitions(eq(TEST_ICEBERG_INTERNAL_TABLE), eq(TEST_CATALOG_TABLE_IDENTIFIER)); } @Test void testCreateOrReplaceTable() { setupCommonMocks(); - + glueCatalogSyncClient = createGlueCatalogSyncClient(false); ZonedDateTime fixedDateTime = ZonedDateTime.parse("2024-10-25T10:15:30.00Z"); try (MockedStatic<ZonedDateTime> mockZonedDateTime = mockStatic(ZonedDateTime.class)) { mockZonedDateTime.when(ZonedDateTime::now).thenReturn(fixedDateTime); diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java similarity index 51% copy from xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java copy to xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java index adf88c99..daeb3df9 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java +++ b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestHudiGlueCatalogTableBuilder.java @@ -16,11 +16,9 @@ * limitations under the License. */ -package org.apache.xtable.hms.table; +package org.apache.xtable.glue.table; -import static org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -30,10 +28,8 @@ import static org.mockito.Mockito.when; import java.util.List; import java.util.stream.Collectors; -import org.apache.hadoop.hive.metastore.api.Table; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -41,33 +37,36 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.xtable.hms.HMSCatalogSyncTestBase; -import org.apache.xtable.hms.HMSSchemaExtractor; +import org.apache.xtable.glue.GlueCatalogSyncTestBase; +import org.apache.xtable.glue.GlueSchemaExtractor; import org.apache.xtable.hudi.HudiTableManager; import org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor; +import software.amazon.awssdk.services.glue.model.Table; +import software.amazon.awssdk.services.glue.model.TableInput; + @ExtendWith(MockitoExtension.class) -public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { +public class TestHudiGlueCatalogTableBuilder extends GlueCatalogSyncTestBase { @Mock private HoodieTableMetaClient mockMetaClient; @Mock private HudiTableManager mockHudiTableManager; @Mock private HoodieTableConfig mockTableConfig; @Mock private HudiCatalogTablePropertiesExtractor mockTablePropertiesExtractor; - private HudiHMSCatalogTableBuilder hudiHMSCatalogTableBuilder; + private HudiGlueCatalogTableBuilder hudiGlueCatalogTableBuilder; - private HudiHMSCatalogTableBuilder createMockHudiHMSCatalogSyncRequestProvider() { - return new HudiHMSCatalogTableBuilder( - mockHMSCatalogConfig, - HMSSchemaExtractor.getInstance(), + private HudiGlueCatalogTableBuilder createMockHudiGlueCatalogSyncRequestProvider() { + return new HudiGlueCatalogTableBuilder( + mockGlueCatalogConfig, + GlueSchemaExtractor.getInstance(), mockHudiTableManager, mockMetaClient, mockTablePropertiesExtractor); } void setupCommonMocks() { - hudiHMSCatalogTableBuilder = createMockHudiHMSCatalogSyncRequestProvider(); - when(mockHMSCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000); + hudiGlueCatalogTableBuilder = createMockHudiGlueCatalogSyncRequestProvider(); + when(mockGlueCatalogConfig.getSchemaLengthThreshold()).thenReturn(1000); } void setupMetaClientMocks() { @@ -80,31 +79,16 @@ public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { setupCommonMocks(); setupMetaClientMocks(); - List<String> partitionFields = - TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream() - .map(partitionField -> partitionField.getSourceField().getName()) - .collect(Collectors.toList()); - - Table table = - hudiHMSCatalogTableBuilder.getCreateTableRequest( + TableInput table = + hudiGlueCatalogTableBuilder.getCreateTableRequest( TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); - ArgumentCaptor<List<String>> partitionsCaptor = ArgumentCaptor.forClass(List.class); verify(mockTablePropertiesExtractor) - .getSparkTableProperties( - partitionsCaptor.capture(), - eq(""), - any(Integer.class), - eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema())); - assertEquals(partitionFields.size(), partitionsCaptor.getValue().size()); - assertEquals(partitionFields, partitionsCaptor.getValue()); - assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.getTableName()); - assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), table.getDbName()); - assertEquals(3, table.getSd().getCols().size()); - assertEquals(1, table.getPartitionKeys().size()); - assertNotNull(table.getParameters()); - assertFalse(table.getParameters().isEmpty()); - assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true"); + .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class)); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.name()); + assertEquals(2, table.storageDescriptor().columns().size()); + assertEquals(1, table.partitionKeys().size()); + assertNotNull(table.parameters()); } @Test @@ -116,28 +100,26 @@ public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream() .map(partitionField -> partitionField.getSourceField().getName()) .collect(Collectors.toList()); - Table table = - hudiHMSCatalogTableBuilder.getCreateTableRequest( + + TableInput tableInput = + hudiGlueCatalogTableBuilder.getCreateTableRequest( TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); - Table updatedTable = - hudiHMSCatalogTableBuilder.getUpdateTableRequest( + Table table = + Table.builder() + .name(tableInput.name()) + .parameters(tableInput.parameters()) + .storageDescriptor(tableInput.storageDescriptor()) + .partitionKeys(tableInput.partitionKeys()) + .build(); + TableInput updatedTable = + hudiGlueCatalogTableBuilder.getUpdateTableRequest( TEST_EVOLVED_HUDI_INTERNAL_TABLE, table, TEST_CATALOG_TABLE_IDENTIFIER); - ArgumentCaptor<List<String>> partitionsCaptor = ArgumentCaptor.forClass(List.class); verify(mockTablePropertiesExtractor) - .getSparkTableProperties( - partitionsCaptor.capture(), - eq(""), - any(Integer.class), - eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema())); - assertEquals(partitionFields.size(), partitionsCaptor.getValue().size()); - assertEquals(partitionFields, partitionsCaptor.getValue()); - assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), updatedTable.getTableName()); - assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), updatedTable.getDbName()); - assertEquals(4, updatedTable.getSd().getCols().size()); - assertEquals(1, updatedTable.getPartitionKeys().size()); - assertNotNull(updatedTable.getParameters()); - assertFalse(table.getParameters().isEmpty()); - assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true"); + .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class)); + assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), updatedTable.name()); + assertEquals(3, updatedTable.storageDescriptor().columns().size()); + assertEquals(1, updatedTable.partitionKeys().size()); + assertNotNull(updatedTable.parameters()); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java index ff26e600..07625e44 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/catalog/HudiCatalogTablePropertiesExtractor.java @@ -22,14 +22,17 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.NoArgsConstructor; import org.apache.spark.sql.types.StructType; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.StringUtils; +import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.schema.InternalField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; @@ -41,17 +44,28 @@ public class HudiCatalogTablePropertiesExtractor { private static final HudiCatalogTablePropertiesExtractor INSTANCE = new HudiCatalogTablePropertiesExtractor(); + protected static final String HUDI_METADATA_CONFIG = "hudi.metadata-listing-enabled"; public static HudiCatalogTablePropertiesExtractor getInstance() { return INSTANCE; } - /** - * Get Spark Sql related table properties. This is used for spark datasource table. - * - * @param schema The schema to write to the table. - * @return A new parameters added the spark's table properties. - */ - public Map<String, String> getSparkTableProperties( + + /** Get Hudi table properties that needs to be synced with catalog table */ + public Map<String, String> getTableProperties(InternalTable table, int schemaLengthThreshold) { + Map<String, String> tableProperties = new HashMap<>(); + List<String> partitionFields = + table.getPartitioningFields().stream() + .map(field -> field.getSourceField().getName()) + .collect(Collectors.toList()); + tableProperties.put(HUDI_METADATA_CONFIG, "true"); + Map<String, String> sparkTableProperties = + getSparkTableProperties(partitionFields, "", schemaLengthThreshold, table.getReadSchema()); + tableProperties.putAll(sparkTableProperties); + return tableProperties; + } + + /** Get Spark Sql related table properties. This is used for spark datasource table. */ + private Map<String, String> getSparkTableProperties( List<String> partitionNames, String sparkVersion, int schemaLengthThreshold, @@ -123,4 +137,12 @@ public class HudiCatalogTablePropertiesExtractor { } return sparkProperties; } + + /** Get Hudi serde properties that needs to be synced with catalog table */ + public Map<String, String> getSerdeProperties(String basePath) { + Map<String, String> serdeProperties = new HashMap<>(); + serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath); + serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(false)); + return serdeProperties; + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java index e08674bf..202e84e2 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/catalog/TestHudiCatalogTablePropertiesExtractor.java @@ -18,27 +18,39 @@ package org.apache.xtable.hudi.catalog; +import static org.apache.xtable.hudi.catalog.HudiCatalogTablePropertiesExtractor.HUDI_METADATA_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import java.util.Arrays; import java.util.Collections; -import java.util.List; import java.util.Map; import org.junit.jupiter.api.Test; +import org.apache.hudi.common.util.ConfigUtils; + +import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalPartitionField; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.TableFormat; public class TestHudiCatalogTablePropertiesExtractor { @Test void testGetSparkTableProperties() { - List<String> partitionNames = Arrays.asList("region", "category"); - String sparkVersion = "3.2.1"; + InternalPartitionField p1 = + InternalPartitionField.builder() + .sourceField(InternalField.builder().name("region").build()) + .build(); + InternalPartitionField p2 = + InternalPartitionField.builder() + .sourceField(InternalField.builder().name("category").build()) + .build(); int schemaLengthThreshold = 1000; InternalSchema schema = InternalSchema.builder() @@ -80,13 +92,20 @@ public class TestHudiCatalogTablePropertiesExtractor { .name("testSchema") .build(); + InternalTable table = + InternalTable.builder() + .name("test-table") + .tableFormat(TableFormat.HUDI) + .readSchema(schema) + .partitioningFields(Arrays.asList(p1, p2)) + .build(); + Map<String, String> result = HudiCatalogTablePropertiesExtractor.getInstance() - .getSparkTableProperties(partitionNames, sparkVersion, schemaLengthThreshold, schema); + .getTableProperties(table, schemaLengthThreshold); // Validate results assertEquals("hudi", result.get("spark.sql.sources.provider")); - assertEquals("3.2.1", result.get("spark.sql.create.version")); assertEquals("1", result.get("spark.sql.sources.schema.numParts")); assertEquals( "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"region\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}", @@ -94,12 +113,12 @@ public class TestHudiCatalogTablePropertiesExtractor { assertEquals("2", result.get("spark.sql.sources.schema.numPartCols")); assertEquals("region", result.get("spark.sql.sources.schema.partCol.0")); assertEquals("category", result.get("spark.sql.sources.schema.partCol.1")); + assertEquals("true", result.get(HUDI_METADATA_CONFIG)); } @Test void testGetSparkTablePropertiesEmptyPartitions() { // Setup input data with no partitions - List<String> partitionNames = Collections.emptyList(); int schemaLengthThreshold = 50; InternalSchema schema = InternalSchema.builder() @@ -125,10 +144,17 @@ public class TestHudiCatalogTablePropertiesExtractor { .name("testSchema") .build(); + InternalTable table = + InternalTable.builder() + .name("test-table") + .tableFormat(TableFormat.HUDI) + .readSchema(schema) + .partitioningFields(Collections.emptyList()) + .build(); // Call the method Map<String, String> result = HudiCatalogTablePropertiesExtractor.getInstance() - .getSparkTableProperties(partitionNames, "", schemaLengthThreshold, schema); + .getTableProperties(table, schemaLengthThreshold); assertEquals("hudi", result.get("spark.sql.sources.provider")); assertNull(result.get("spark.sql.create.version")); @@ -140,5 +166,17 @@ public class TestHudiCatalogTablePropertiesExtractor { + result.get("spark.sql.sources.schema.part.2") + result.get("spark.sql.sources.schema.part.3")); assertNull(result.get("spark.sql.sources.schema.numPartCols")); + assertEquals("true", result.get(HUDI_METADATA_CONFIG)); + } + + @Test + void testGetSerdeProperties() { + String basePath = "/test/base/path"; + Map<String, String> serdeProperties = + HudiCatalogTablePropertiesExtractor.getInstance().getSerdeProperties(basePath); + + assertNotNull(serdeProperties); + assertEquals(basePath, serdeProperties.get(ConfigUtils.TABLE_SERDE_PATH)); + assertEquals("false", serdeProperties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE)); } } diff --git a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java index cc4dc3df..b8b1ea49 100644 --- a/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java +++ b/xtable-hive-metastore/src/main/java/org/apache/xtable/hms/table/HudiHMSCatalogTableBuilder.java @@ -22,7 +22,6 @@ import static org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifi import java.io.IOException; import java.time.Instant; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -39,7 +38,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.util.ConfigUtils; import com.google.common.annotations.VisibleForTesting; @@ -54,7 +52,6 @@ import org.apache.xtable.model.InternalTable; import org.apache.xtable.model.catalog.CatalogTableIdentifier; import org.apache.xtable.model.catalog.HierarchicalTableIdentifier; import org.apache.xtable.model.schema.InternalPartitionField; -import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.TableFormat; @Log4j2 @@ -122,7 +119,9 @@ public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Ta } newTb.setCreateTime((int) Instant.now().toEpochMilli()); - Map<String, String> tableProperties = getTableProperties(table, table.getReadSchema()); + Map<String, String> tableProperties = + tablePropertiesExtractor.getTableProperties( + table, hmsCatalogConfig.getSchemaLengthThreshold()); newTb.setParameters(tableProperties); newTb.setSd(getStorageDescriptor(table)); newTb.setPartitionKeys(getSchemaPartitionKeys(table)); @@ -134,7 +133,9 @@ public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Ta InternalTable table, Table hmsTable, CatalogTableIdentifier tableIdentifier) { Map<String, String> parameters = hmsTable.getParameters(); Map<String, String> tableParameters = hmsTable.getParameters(); - tableParameters.putAll(getTableProperties(table, table.getReadSchema())); + tableParameters.putAll( + tablePropertiesExtractor.getTableProperties( + table, hmsCatalogConfig.getSchemaLengthThreshold())); hmsTable.setParameters(tableParameters); hmsTable.setSd(getStorageDescriptor(table)); @@ -143,20 +144,6 @@ public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Ta return hmsTable; } - private Map<String, String> getTableProperties(InternalTable table, InternalSchema schema) { - List<String> partitionFields = - table.getPartitioningFields().stream() - .map(field -> field.getSourceField().getName()) - .collect(Collectors.toList()); - Map<String, String> tableProperties = new HashMap<>(); - tableProperties.put(HUDI_METADATA_CONFIG, "true"); - Map<String, String> sparkTableProperties = - tablePropertiesExtractor.getSparkTableProperties( - partitionFields, "", hmsCatalogConfig.getSchemaLengthThreshold(), schema); - tableProperties.putAll(sparkTableProperties); - return tableProperties; - } - private StorageDescriptor getStorageDescriptor(InternalTable table) { final StorageDescriptor storageDescriptor = new StorageDescriptor(); storageDescriptor.setCols(schemaExtractor.toColumns(TableFormat.HUDI, table.getReadSchema())); @@ -168,7 +155,8 @@ public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Ta String serdeClassName = HudiInputFormatUtils.getSerDeClassName(fileFormat); storageDescriptor.setInputFormat(inputFormatClassName); storageDescriptor.setOutputFormat(outputFormatClassName); - Map<String, String> serdeProperties = getSerdeProperties(table.getBasePath()); + Map<String, String> serdeProperties = + tablePropertiesExtractor.getSerdeProperties(table.getBasePath()); SerDeInfo serDeInfo = new SerDeInfo(); serDeInfo.setSerializationLib(serdeClassName); serDeInfo.setParameters(serdeProperties); @@ -176,13 +164,6 @@ public class HudiHMSCatalogTableBuilder implements CatalogTableBuilder<Table, Ta return storageDescriptor; } - private static Map<String, String> getSerdeProperties(String basePath) { - Map<String, String> serdeProperties = new HashMap<>(); - serdeProperties.put(ConfigUtils.TABLE_SERDE_PATH, basePath); - serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(false)); - return serdeProperties; - } - private List<FieldSchema> getSchemaPartitionKeys(InternalTable table) { List<InternalPartitionField> partitioningFields = table.getPartitioningFields(); diff --git a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java index adf88c99..4df2ab85 100644 --- a/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java +++ b/xtable-hive-metastore/src/test/java/org/apache/xtable/hms/table/TestHudiHMSCatalogTableBuilder.java @@ -18,22 +18,15 @@ package org.apache.xtable.hms.table; -import static org.apache.xtable.hms.table.HudiHMSCatalogTableBuilder.HUDI_METADATA_CONFIG; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import java.util.List; -import java.util.stream.Collectors; - import org.apache.hadoop.hive.metastore.api.Table; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -80,31 +73,16 @@ public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { setupCommonMocks(); setupMetaClientMocks(); - List<String> partitionFields = - TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream() - .map(partitionField -> partitionField.getSourceField().getName()) - .collect(Collectors.toList()); - Table table = hudiHMSCatalogTableBuilder.getCreateTableRequest( TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); - ArgumentCaptor<List<String>> partitionsCaptor = ArgumentCaptor.forClass(List.class); verify(mockTablePropertiesExtractor) - .getSparkTableProperties( - partitionsCaptor.capture(), - eq(""), - any(Integer.class), - eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema())); - assertEquals(partitionFields.size(), partitionsCaptor.getValue().size()); - assertEquals(partitionFields, partitionsCaptor.getValue()); + .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class)); assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), table.getTableName()); assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), table.getDbName()); assertEquals(3, table.getSd().getCols().size()); assertEquals(1, table.getPartitionKeys().size()); - assertNotNull(table.getParameters()); - assertFalse(table.getParameters().isEmpty()); - assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true"); } @Test @@ -112,10 +90,6 @@ public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { setupCommonMocks(); setupMetaClientMocks(); - List<String> partitionFields = - TEST_HUDI_INTERNAL_TABLE.getPartitioningFields().stream() - .map(partitionField -> partitionField.getSourceField().getName()) - .collect(Collectors.toList()); Table table = hudiHMSCatalogTableBuilder.getCreateTableRequest( TEST_HUDI_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER); @@ -123,21 +97,11 @@ public class TestHudiHMSCatalogTableBuilder extends HMSCatalogSyncTestBase { hudiHMSCatalogTableBuilder.getUpdateTableRequest( TEST_EVOLVED_HUDI_INTERNAL_TABLE, table, TEST_CATALOG_TABLE_IDENTIFIER); - ArgumentCaptor<List<String>> partitionsCaptor = ArgumentCaptor.forClass(List.class); verify(mockTablePropertiesExtractor) - .getSparkTableProperties( - partitionsCaptor.capture(), - eq(""), - any(Integer.class), - eq(TEST_HUDI_INTERNAL_TABLE.getReadSchema())); - assertEquals(partitionFields.size(), partitionsCaptor.getValue().size()); - assertEquals(partitionFields, partitionsCaptor.getValue()); + .getTableProperties(eq(TEST_HUDI_INTERNAL_TABLE), any(Integer.class)); assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), updatedTable.getTableName()); assertEquals(TEST_CATALOG_TABLE_IDENTIFIER.getDatabaseName(), updatedTable.getDbName()); assertEquals(4, updatedTable.getSd().getCols().size()); assertEquals(1, updatedTable.getPartitionKeys().size()); - assertNotNull(updatedTable.getParameters()); - assertFalse(table.getParameters().isEmpty()); - assertEquals(table.getParameters().get(HUDI_METADATA_CONFIG), "true"); } }