This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new f2045be9 [590] Add Delta Glue Catalog Sync implementation
f2045be9 is described below
commit f2045be951fa1af4c1c736ac524e6a3e50acb870
Author: Roushan Kumar <[email protected]>
AuthorDate: Fri Feb 7 00:07:23 2025 +0530
[590] Add Delta Glue Catalog Sync implementation
---
.../glue/GlueCatalogTableBuilderFactory.java | 3 +
.../apache/xtable/glue/GlueSchemaExtractor.java | 38 +++++++
.../glue/table/DeltaGlueCatalogTableBuilder.java | 116 ++++++++++++++++++++
.../xtable/glue/GlueCatalogSyncTestBase.java | 110 +++++++++++++++----
.../xtable/glue/TestGlueSchemaExtractor.java | 119 ++++++++++++++++++++-
.../table/TestDeltaGlueCatalogTableBuilder.java | 106 ++++++++++++++++++
.../table/TestIcebergGlueCatalogTableBuilder.java | 50 +++++----
7 files changed, 501 insertions(+), 41 deletions(-)
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
index 77f9464a..cd0b1d08 100644
---
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
+++
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogTableBuilderFactory.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.xtable.catalog.CatalogTableBuilder;
import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.glue.table.DeltaGlueCatalogTableBuilder;
import org.apache.xtable.glue.table.IcebergGlueCatalogTableBuilder;
import org.apache.xtable.model.storage.TableFormat;
@@ -35,6 +36,8 @@ class GlueCatalogTableBuilderFactory {
switch (tableFormat) {
case TableFormat.ICEBERG:
return new IcebergGlueCatalogTableBuilder(configuration);
+ case TableFormat.DELTA:
+ return new DeltaGlueCatalogTableBuilder();
default:
throw new NotSupportedException("Unsupported table format: " +
tableFormat);
}
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
index 97287cee..1bb51adc 100644
--- a/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
+++ b/xtable-aws/src/main/java/org/apache/xtable/glue/GlueSchemaExtractor.java
@@ -18,6 +18,7 @@
package org.apache.xtable.glue;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -39,6 +40,7 @@ import com.google.common.collect.Sets;
import org.apache.xtable.exception.NotSupportedException;
import org.apache.xtable.exception.SchemaExtractorException;
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
import org.apache.xtable.model.schema.InternalSchema;
@@ -231,4 +233,40 @@ public class GlueSchemaExtractor {
protected static String getColumnProperty(String tableFormat, String
property) {
return String.format("%s.%s", tableFormat.toLowerCase(Locale.ENGLISH),
property);
}
+
+ public List<Column> getNonPartitionColumns(InternalTable table, Map<String,
Column> columnsMap) {
+ List<String> partitionKeys = getPartitionKeys(table);
+ return columnsMap.values().stream()
+ .filter(c -> !partitionKeys.contains(c.name()))
+ .collect(Collectors.toList());
+ }
+
+ public List<Column> getPartitionColumns(InternalTable table, Map<String,
Column> columnsMap) {
+ /**
+ * When converting delta schema to InternalSchema, generated columns are
excluded: {@link
+ * org.apache.xtable.delta.DeltaSchemaExtractor#toInternalSchema}. In case
of partition field
+ * being a generated column, it won't be present in columnsMap, so
defaulting to string type
+ * until support is there
+ */
+ return getPartitionKeys(table).stream()
+ .map(
+ pKey ->
+ columnsMap.getOrDefault(pKey,
Column.builder().name(pKey).type("string").build()))
+ .collect(Collectors.toList());
+ }
+
+ private List<String> getPartitionKeys(InternalTable table) {
+ List<String> partitionKeys = new ArrayList<>();
+ table
+ .getPartitioningFields()
+ .forEach(
+ field -> {
+ if (!field.getPartitionFieldNames().isEmpty()) {
+ partitionKeys.addAll(field.getPartitionFieldNames());
+ } else {
+ partitionKeys.add(field.getSourceField().getName());
+ }
+ });
+ return partitionKeys;
+ }
}
diff --git
a/xtable-aws/src/main/java/org/apache/xtable/glue/table/DeltaGlueCatalogTableBuilder.java
b/xtable-aws/src/main/java/org/apache/xtable/glue/table/DeltaGlueCatalogTableBuilder.java
new file mode 100644
index 00000000..015bde7b
--- /dev/null
+++
b/xtable-aws/src/main/java/org/apache/xtable/glue/table/DeltaGlueCatalogTableBuilder.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.glue.table;
+
+import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static
org.apache.xtable.catalog.CatalogUtils.toHierarchicalTableIdentifier;
+import static org.apache.xtable.catalog.Constants.PROP_EXTERNAL;
+import static org.apache.xtable.catalog.Constants.PROP_PATH;
+import static org.apache.xtable.catalog.Constants.PROP_SERIALIZATION_FORMAT;
+import static
org.apache.xtable.catalog.Constants.PROP_SPARK_SQL_SOURCES_PROVIDER;
+import static
org.apache.xtable.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.xtable.catalog.CatalogTableBuilder;
+import org.apache.xtable.glue.GlueSchemaExtractor;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.catalog.CatalogTableIdentifier;
+import org.apache.xtable.model.catalog.HierarchicalTableIdentifier;
+import org.apache.xtable.model.storage.TableFormat;
+
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.SerDeInfo;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+/** Delta specific table operations for Glue catalog sync */
+public class DeltaGlueCatalogTableBuilder implements
CatalogTableBuilder<TableInput, Table> {
+
+ private final GlueSchemaExtractor schemaExtractor;
+ private static final String tableFormat = TableFormat.DELTA;
+
+ public DeltaGlueCatalogTableBuilder() {
+ this.schemaExtractor = GlueSchemaExtractor.getInstance();
+ }
+
+ @Override
+ public TableInput getCreateTableRequest(
+ InternalTable table, CatalogTableIdentifier tblIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(tblIdentifier);
+ Map<String, Column> columnsMap =
+ schemaExtractor.toColumns(tableFormat, table.getReadSchema()).stream()
+ .collect(Collectors.toMap(Column::name, c -> c));
+
+ return TableInput.builder()
+ .name(tableIdentifier.getTableName())
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(getTableParameters())
+ .storageDescriptor(
+ StorageDescriptor.builder()
+ .columns(schemaExtractor.getNonPartitionColumns(table,
columnsMap))
+ .location(table.getBasePath())
+
.serdeInfo(SerDeInfo.builder().parameters(getSerDeParameters(table)).build())
+ .build())
+ .partitionKeys(schemaExtractor.getPartitionColumns(table, columnsMap))
+ .build();
+ }
+
+ @Override
+ public TableInput getUpdateTableRequest(
+ InternalTable table, Table catalogTable, CatalogTableIdentifier
tblIdentifier) {
+ HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(tblIdentifier);
+ Map<String, String> parameters = new HashMap<>(catalogTable.parameters());
+ Map<String, Column> columnsMap =
+ schemaExtractor.toColumns(tableFormat, table.getReadSchema(),
catalogTable).stream()
+ .collect(Collectors.toMap(Column::name, c -> c));
+ return TableInput.builder()
+ .name(tableIdentifier.getTableName())
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(parameters)
+ .storageDescriptor(
+ catalogTable.storageDescriptor().toBuilder()
+ .columns(schemaExtractor.getNonPartitionColumns(table,
columnsMap))
+ .build())
+ .partitionKeys(schemaExtractor.getPartitionColumns(table, columnsMap))
+ .build();
+ }
+
+ @VisibleForTesting
+ Map<String, String> getTableParameters() {
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put(TABLE_TYPE_PROP, tableFormat);
+ parameters.put(PROP_SPARK_SQL_SOURCES_PROVIDER, tableFormat);
+ parameters.put(PROP_EXTERNAL, "TRUE");
+ return parameters;
+ }
+
+ @VisibleForTesting
+ Map<String, String> getSerDeParameters(InternalTable table) {
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put(PROP_SERIALIZATION_FORMAT, "1");
+ parameters.put(PROP_PATH, table.getBasePath());
+ return parameters;
+ }
+}
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
index eb8ee76e..d6efeb19 100644
---
a/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/GlueCatalogSyncTestBase.java
@@ -18,10 +18,11 @@
package org.apache.xtable.glue;
-import static
org.apache.xtable.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE;
+import static org.apache.xtable.glue.TestGlueSchemaExtractor.getColumn;
+import java.util.Arrays;
import java.util.Collections;
-import java.util.Map;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.mockito.Mock;
@@ -29,11 +30,16 @@ import org.mockito.Mock;
import org.apache.xtable.conversion.ExternalCatalogConfig;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.catalog.ThreePartHierarchicalTableIdentifier;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.storage.CatalogType;
import org.apache.xtable.model.storage.TableFormat;
import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.Column;
import software.amazon.awssdk.services.glue.model.CreateDatabaseRequest;
import software.amazon.awssdk.services.glue.model.CreateTableRequest;
import software.amazon.awssdk.services.glue.model.DatabaseInput;
@@ -60,17 +66,91 @@ public class GlueCatalogSyncTestBase {
protected static final String TEST_CATALOG_NAME = "aws-glue-1";
protected static final String ICEBERG_METADATA_FILE_LOCATION =
"base-path/metadata";
protected static final String ICEBERG_METADATA_FILE_LOCATION_v2 =
"base-path/v2-metadata";
+ protected static final InternalPartitionField PARTITION_FIELD =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("partitionField")
+ .schema(
+
InternalSchema.builder().name("string").dataType(InternalType.STRING).build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+ protected static final InternalSchema INTERNAL_SCHEMA =
+ InternalSchema.builder()
+ .dataType(InternalType.RECORD)
+ .fields(
+ Arrays.asList(
+ getInternalField("intField", "int", InternalType.INT),
+ getInternalField("stringField", "string",
InternalType.STRING),
+ getInternalField("partitionField", "string",
InternalType.STRING)))
+ .build();
+ protected static final InternalSchema UPDATED_INTERNAL_SCHEMA =
+ InternalSchema.builder()
+ .dataType(InternalType.RECORD)
+ .fields(
+ Arrays.asList(
+ getInternalField("intField", "int", InternalType.INT),
+ getInternalField("stringField", "string",
InternalType.STRING),
+ getInternalField("partitionField", "string",
InternalType.STRING),
+ getInternalField("booleanField", "boolean",
InternalType.BOOLEAN)))
+ .build();
+ protected static final List<Column> PARTITION_KEYS =
+ Collections.singletonList(getColumn(TableFormat.DELTA, "partitionField",
"string"));
+ protected static final List<Column> DELTA_GLUE_SCHEMA =
+ Arrays.asList(
+ getColumn(TableFormat.DELTA, "intField", "int"),
+ getColumn(TableFormat.DELTA, "stringField", "string"));
+ protected static final List<Column> UPDATED_DELTA_GLUE_SCHEMA =
+ Arrays.asList(
+ getColumn(TableFormat.DELTA, "booleanField", "boolean"),
+ getColumn(TableFormat.DELTA, "intField", "int"),
+ getColumn(TableFormat.DELTA, "stringField", "string"));
+ protected static final List<Column> ICEBERG_GLUE_SCHEMA =
+ Arrays.asList(
+ getColumn(TableFormat.ICEBERG, "intField", "int"),
+ getColumn(TableFormat.ICEBERG, "stringField", "string"),
+ getColumn(TableFormat.ICEBERG, "partitionField", "string"));
+ protected static final List<Column> UPDATED_ICEBERG_GLUE_SCHEMA =
+ Arrays.asList(
+ getColumn(TableFormat.ICEBERG, "intField", "int"),
+ getColumn(TableFormat.ICEBERG, "stringField", "string"),
+ getColumn(TableFormat.ICEBERG, "partitionField", "string"),
+ getColumn(TableFormat.ICEBERG, "booleanField", "boolean"));
protected static final InternalTable TEST_ICEBERG_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.ICEBERG)
-
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
+ .readSchema(INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+ .build();
+ protected static final InternalTable TEST_UPDATED_ICEBERG_INTERNAL_TABLE =
+ InternalTable.builder()
+ .basePath(TEST_BASE_PATH)
+ .tableFormat(TableFormat.ICEBERG)
+ .readSchema(UPDATED_INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final InternalTable TEST_HUDI_INTERNAL_TABLE =
InternalTable.builder()
.basePath(TEST_BASE_PATH)
.tableFormat(TableFormat.HUDI)
-
.readSchema(InternalSchema.builder().fields(Collections.emptyList()).build())
+ .readSchema(INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+ .build();
+ protected static final InternalTable TEST_DELTA_INTERNAL_TABLE =
+ InternalTable.builder()
+ .basePath(TEST_BASE_PATH)
+ .tableFormat(TableFormat.DELTA)
+ .readSchema(INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
+ .build();
+ protected static final InternalTable TEST_UPDATED_DELTA_INTERNAL_TABLE =
+ InternalTable.builder()
+ .basePath(TEST_BASE_PATH)
+ .tableFormat(TableFormat.DELTA)
+ .readSchema(UPDATED_INTERNAL_SCHEMA)
+ .partitioningFields(Collections.singletonList(PARTITION_FIELD))
.build();
protected static final ThreePartHierarchicalTableIdentifier
TEST_CATALOG_TABLE_IDENTIFIER =
new ThreePartHierarchicalTableIdentifier(TEST_GLUE_DATABASE,
TEST_GLUE_TABLE);
@@ -108,20 +188,6 @@ public class GlueCatalogSyncTestBase {
.build();
}
- protected TableInput getCreateOrUpdateTableInput(
- String tableName, Map<String, String> params, InternalTable
internalTable) {
- return TableInput.builder()
- .name(tableName)
- .tableType(GLUE_EXTERNAL_TABLE_TYPE)
- .parameters(params)
- .storageDescriptor(
- StorageDescriptor.builder()
- .location(internalTable.getBasePath())
- .columns(Collections.emptyList())
- .build())
- .build();
- }
-
protected CreateTableRequest createTableRequest(String dbName, TableInput
tableInput) {
return CreateTableRequest.builder()
.catalogId(TEST_GLUE_CATALOG_ID)
@@ -154,4 +220,12 @@ public class GlueCatalogSyncTestBase {
.storageDescriptor(StorageDescriptor.builder().location(location).build())
.build();
}
+
+ private static InternalField getInternalField(
+ String fieldName, String schemaName, InternalType dataType) {
+ return InternalField.builder()
+ .name(fieldName)
+
.schema(InternalSchema.builder().name(schemaName).dataType(dataType).build())
+ .build();
+ }
}
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueSchemaExtractor.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueSchemaExtractor.java
index 0410d07c..fd084ee7 100644
---
a/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueSchemaExtractor.java
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/TestGlueSchemaExtractor.java
@@ -25,19 +25,28 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.xtable.catalog.TestSchemaExtractorBase;
import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
import org.apache.xtable.model.storage.TableFormat;
import software.amazon.awssdk.services.glue.model.Column;
@@ -46,13 +55,58 @@ import software.amazon.awssdk.services.glue.model.Table;
public class TestGlueSchemaExtractor extends TestSchemaExtractorBase {
- private Column getCurrentGlueTableColumn(
+ private static final List<Column> testColumns =
+ Arrays.asList(
+ getColumn("id", "string"),
+ getColumn("firstName", "string"),
+ getColumn("gender", "string"),
+ getColumn("birthTs", "timestamp"),
+ getColumn("dateOfBirth", "int"));
+
+ private static final Map<String, Column> testColumnMap =
+ testColumns.stream().collect(Collectors.toMap(Column::name, c -> c));
+
+ private static final InternalPartitionField simplePartitionField =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("gender")
+ .schema(
+
InternalSchema.builder().name("string").dataType(InternalType.STRING).build())
+ .build())
+ .transformType(PartitionTransformType.VALUE)
+ .build();
+
+ private static final InternalPartitionField complexPartitionField =
+ InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name("birthTimestamp")
+ .schema(
+ InternalSchema.builder()
+ .name("timestamp")
+ .dataType(InternalType.TIMESTAMP)
+ .build())
+ .build())
+ .transformType(PartitionTransformType.DAY)
+ .partitionFieldNames(Collections.singletonList("dateOfBirth"))
+ .build();
+
+ private static Column getColumn(String name, String type) {
+ return Column.builder().name(name).type(type).build();
+ }
+
+ public static Column getColumn(String tableFormat, String name, String type)
{
+ return getCurrentGlueTableColumn(tableFormat, name, type, null, false);
+ }
+
+ private static Column getCurrentGlueTableColumn(
String tableFormat, String colName, String colType, Integer fieldId,
boolean isNullable) {
fieldId = fieldId != null ? fieldId : -1;
return getCurrentGlueTableColumn(tableFormat, colName, colType, null,
fieldId, isNullable);
}
- private Column getCurrentGlueTableColumn(
+ private static Column getCurrentGlueTableColumn(
String tableFormat,
String colName,
String colType,
@@ -72,7 +126,8 @@ public class TestGlueSchemaExtractor extends
TestSchemaExtractorBase {
.build();
}
- private Column getPreviousGlueTableColumn(String tableFormat, String
colName, String colType) {
+ private static Column getPreviousGlueTableColumn(
+ String tableFormat, String colName, String colType) {
return Column.builder()
.name(colName)
.type(colType)
@@ -80,6 +135,10 @@ public class TestGlueSchemaExtractor extends
TestSchemaExtractorBase {
.build();
}
+ private static InternalTable getInternalTable(List<InternalPartitionField>
partitionFields) {
+ return InternalTable.builder().partitioningFields(partitionFields).build();
+ }
+
@Test
void testPrimitiveTypes_NoExistingTable() {
int precision = 10;
@@ -683,4 +742,58 @@ public class TestGlueSchemaExtractor extends
TestSchemaExtractorBase {
column = getCurrentGlueTableColumn(tableFormat, "booleanField", "boolean",
comment, 1, false);
assertEquals(column, GlueSchemaExtractor.getInstance().toColumn(field,
tableFormat));
}
+
+ static Stream<Arguments> getNonPartitionColumnsTestArgs() {
+ return Stream.of(
+ // table with no partition fields
+ Arguments.of(getInternalTable(Collections.emptyList()), testColumns),
+ // table with simple partition field
+ Arguments.of(
+ getInternalTable(Collections.singletonList(simplePartitionField)),
+ Arrays.asList(
+ testColumns.get(0), testColumns.get(1), testColumns.get(3),
testColumns.get(4))),
+ // table with complex partition field
+ Arguments.of(
+ getInternalTable(Collections.singletonList(complexPartitionField)),
+ Arrays.asList(
+ testColumns.get(0), testColumns.get(1), testColumns.get(2),
testColumns.get(3))),
+ // table with multiple partition field
+ Arguments.of(
+ getInternalTable(Arrays.asList(simplePartitionField,
complexPartitionField)),
+ Arrays.asList(testColumns.get(0), testColumns.get(1),
testColumns.get(3))));
+ }
+
+ @ParameterizedTest
+ @MethodSource("getNonPartitionColumnsTestArgs")
+ void testGetNonPartitionColumns(InternalTable table, List<Column> expected) {
+ List<Column> output =
+ GlueSchemaExtractor.getInstance().getNonPartitionColumns(table,
testColumnMap);
+ assertEquals(new HashSet<>(expected), new HashSet<>(output));
+ }
+
+ static Stream<Arguments> getPartitionColumnsTestArgs() {
+ return Stream.of(
+ // table with no partition fields
+ Arguments.of(getInternalTable(Collections.emptyList()),
Collections.emptyList()),
+ // table with simple partition field
+ Arguments.of(
+ getInternalTable(Collections.singletonList(simplePartitionField)),
+ Collections.singletonList(testColumns.get(2))),
+ // table with complex partition field
+ Arguments.of(
+ getInternalTable(Collections.singletonList(complexPartitionField)),
+ Collections.singletonList(testColumns.get(4))),
+ // table with multiple partition field
+ Arguments.of(
+ getInternalTable(Arrays.asList(simplePartitionField,
complexPartitionField)),
+ Arrays.asList(testColumns.get(2), testColumns.get(4))));
+ }
+
+ @ParameterizedTest
+ @MethodSource("getPartitionColumnsTestArgs")
+ void testGetPartitionColumns(InternalTable table, List<Column> expected) {
+ List<Column> output =
+ GlueSchemaExtractor.getInstance().getPartitionColumns(table,
testColumnMap);
+ assertEquals(new HashSet<>(expected), new HashSet<>(output));
+ }
}
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestDeltaGlueCatalogTableBuilder.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestDeltaGlueCatalogTableBuilder.java
new file mode 100644
index 00000000..59b3452d
--- /dev/null
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestDeltaGlueCatalogTableBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.glue.table;
+
+import static
org.apache.xtable.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.glue.GlueCatalogSyncTestBase;
+
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.SerDeInfo;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
+import software.amazon.awssdk.services.glue.model.Table;
+import software.amazon.awssdk.services.glue.model.TableInput;
+
+@ExtendWith(MockitoExtension.class)
+public class TestDeltaGlueCatalogTableBuilder extends GlueCatalogSyncTestBase {
+
+ private DeltaGlueCatalogTableBuilder deltaGlueCatalogTableBuilder;
+
+ private DeltaGlueCatalogTableBuilder createDeltaGlueCatalogSyncHelper() {
+ return new DeltaGlueCatalogTableBuilder();
+ }
+
+ void setupCommonMocks() {
+ deltaGlueCatalogTableBuilder = createDeltaGlueCatalogSyncHelper();
+ }
+
+ @Test
+ void testGetCreateTableRequest() {
+ setupCommonMocks();
+
+ TableInput expected =
+ TableInput.builder()
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(deltaGlueCatalogTableBuilder.getTableParameters())
+ .storageDescriptor(getTestStorageDescriptor(DELTA_GLUE_SCHEMA))
+ .partitionKeys(PARTITION_KEYS)
+ .build();
+
+ TableInput output =
+ deltaGlueCatalogTableBuilder.getCreateTableRequest(
+ TEST_DELTA_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
+ assertEquals(expected, output);
+ }
+
+ @Test
+ void testGetUpdateTableInput() {
+ setupCommonMocks();
+ Table glueTable =
+ Table.builder()
+ .parameters(deltaGlueCatalogTableBuilder.getTableParameters())
+ .storageDescriptor(getTestStorageDescriptor(DELTA_GLUE_SCHEMA))
+ .partitionKeys(PARTITION_KEYS)
+ .build();
+
+ TableInput expected =
+ TableInput.builder()
+ .name(TEST_CATALOG_TABLE_IDENTIFIER.getTableName())
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(deltaGlueCatalogTableBuilder.getTableParameters())
+
.storageDescriptor(getTestStorageDescriptor(UPDATED_DELTA_GLUE_SCHEMA))
+ .partitionKeys(PARTITION_KEYS)
+ .build();
+
+ TableInput output =
+ deltaGlueCatalogTableBuilder.getUpdateTableRequest(
+ TEST_UPDATED_DELTA_INTERNAL_TABLE, glueTable,
TEST_CATALOG_TABLE_IDENTIFIER);
+ assertEquals(expected, output);
+ }
+
+ private StorageDescriptor getTestStorageDescriptor(List<Column> columns) {
+ return StorageDescriptor.builder()
+ .columns(columns)
+ .location(TEST_BASE_PATH)
+ .serdeInfo(
+ SerDeInfo.builder()
+ .parameters(
+
deltaGlueCatalogTableBuilder.getSerDeParameters(TEST_DELTA_INTERNAL_TABLE))
+ .build())
+ .build();
+ }
+}
diff --git
a/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestIcebergGlueCatalogTableBuilder.java
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestIcebergGlueCatalogTableBuilder.java
index b8edf5fb..f670b6b7 100644
---
a/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestIcebergGlueCatalogTableBuilder.java
+++
b/xtable-aws/src/test/java/org/apache/xtable/glue/table/TestIcebergGlueCatalogTableBuilder.java
@@ -21,13 +21,12 @@ package org.apache.xtable.glue.table;
import static
org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static
org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.TABLE_TYPE_PROP;
+import static
org.apache.xtable.glue.GlueCatalogSyncClient.GLUE_EXTERNAL_TABLE_TYPE;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
@@ -41,8 +40,11 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.xtable.glue.GlueCatalogSyncTestBase;
+import org.apache.xtable.glue.GlueSchemaExtractor;
import org.apache.xtable.model.storage.TableFormat;
+import software.amazon.awssdk.services.glue.model.Column;
+import software.amazon.awssdk.services.glue.model.StorageDescriptor;
import software.amazon.awssdk.services.glue.model.Table;
import software.amazon.awssdk.services.glue.model.TableInput;
@@ -56,7 +58,8 @@ public class TestIcebergGlueCatalogTableBuilder extends
GlueCatalogSyncTestBase
private IcebergGlueCatalogTableBuilder icebergGlueCatalogTableBuilder;
private IcebergGlueCatalogTableBuilder createIcebergGlueCatalogSyncHelper() {
- return new IcebergGlueCatalogTableBuilder(mockGlueSchemaExtractor,
mockIcebergHadoopTables);
+ return new IcebergGlueCatalogTableBuilder(
+ GlueSchemaExtractor.getInstance(), mockIcebergHadoopTables);
}
void setupCommonMocks() {
@@ -79,21 +82,16 @@ public class TestIcebergGlueCatalogTableBuilder extends
GlueCatalogSyncTestBase
void testGetCreateTableRequest() {
setupCommonMocks();
mockIcebergHadoopTables();
- when(mockGlueSchemaExtractor.toColumns(
- TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema()))
- .thenReturn(Collections.emptyList());
TableInput expected =
getCreateOrUpdateTableInput(
- TEST_CATALOG_TABLE_IDENTIFIER.getTableName(),
+ TEST_GLUE_TABLE,
icebergGlueCatalogTableBuilder.getTableParameters(mockIcebergBaseTable),
- TEST_ICEBERG_INTERNAL_TABLE);
+ getTestStorageDescriptor(ICEBERG_GLUE_SCHEMA));
TableInput output =
icebergGlueCatalogTableBuilder.getCreateTableRequest(
TEST_ICEBERG_INTERNAL_TABLE, TEST_CATALOG_TABLE_IDENTIFIER);
assertEquals(expected, output);
- verify(mockGlueSchemaExtractor, times(1))
- .toColumns(TableFormat.ICEBERG,
TEST_ICEBERG_INTERNAL_TABLE.getReadSchema());
}
@Test
@@ -103,7 +101,11 @@ public class TestIcebergGlueCatalogTableBuilder extends
GlueCatalogSyncTestBase
Map<String, String> glueTableParams = new HashMap<>();
glueTableParams.put(METADATA_LOCATION_PROP,
ICEBERG_METADATA_FILE_LOCATION);
- Table glueTable = Table.builder().parameters(glueTableParams).build();
+ Table glueTable =
+ Table.builder()
+ .parameters(glueTableParams)
+ .storageDescriptor(getTestStorageDescriptor(ICEBERG_GLUE_SCHEMA))
+ .build();
Map<String, String> parameters = new HashMap<>();
parameters.put(PREVIOUS_METADATA_LOCATION_PROP,
glueTableParams.get(METADATA_LOCATION_PROP));
@@ -111,19 +113,13 @@ public class TestIcebergGlueCatalogTableBuilder extends
GlueCatalogSyncTestBase
.thenReturn(ICEBERG_METADATA_FILE_LOCATION_v2);
parameters.put(METADATA_LOCATION_PROP, ICEBERG_METADATA_FILE_LOCATION_v2);
- when(mockGlueSchemaExtractor.toColumns(
- TableFormat.ICEBERG, TEST_ICEBERG_INTERNAL_TABLE.getReadSchema(),
glueTable))
- .thenReturn(Collections.emptyList());
-
TableInput expected =
getCreateOrUpdateTableInput(
- TEST_CATALOG_TABLE_IDENTIFIER.getTableName(), parameters,
TEST_ICEBERG_INTERNAL_TABLE);
+ TEST_GLUE_TABLE, parameters,
getTestStorageDescriptor(UPDATED_ICEBERG_GLUE_SCHEMA));
TableInput output =
icebergGlueCatalogTableBuilder.getUpdateTableRequest(
- TEST_ICEBERG_INTERNAL_TABLE, glueTable,
TEST_CATALOG_TABLE_IDENTIFIER);
+ TEST_UPDATED_ICEBERG_INTERNAL_TABLE, glueTable,
TEST_CATALOG_TABLE_IDENTIFIER);
assertEquals(expected, output);
- verify(mockGlueSchemaExtractor, times(1))
- .toColumns(TableFormat.ICEBERG,
TEST_ICEBERG_INTERNAL_TABLE.getReadSchema(), glueTable);
}
@Test
@@ -137,4 +133,18 @@ public class TestIcebergGlueCatalogTableBuilder extends
GlueCatalogSyncTestBase
icebergGlueCatalogTableBuilder.getTableParameters(mockIcebergBaseTable);
assertEquals(expected, tableParameters);
}
+
+ private TableInput getCreateOrUpdateTableInput(
+ String tableName, Map<String, String> params, StorageDescriptor sd) {
+ return TableInput.builder()
+ .name(tableName)
+ .tableType(GLUE_EXTERNAL_TABLE_TYPE)
+ .parameters(params)
+ .storageDescriptor(sd)
+ .build();
+ }
+
+ private StorageDescriptor getTestStorageDescriptor(List<Column> columns) {
+ return
StorageDescriptor.builder().columns(columns).location(TEST_BASE_PATH).build();
+ }
}