This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 81af6e29 Fix Delta to Iceberg not working on column mapping enabled
Delta source table (#766)
81af6e29 is described below
commit 81af6e294041ec46cd30cd2796b8ab20647b541a
Author: Xingrong Chen <[email protected]>
AuthorDate: Mon Dec 29 07:58:49 2025 -0800
Fix Delta to Iceberg not working on column mapping enabled Delta source
table (#766)
* Fix Iceberg duplicate field id in converted schema
* Traverse the whole schema before converting, add test for the whole
conversion process
* update name mapping in the target Iceberg to read values from correct
parquet columns
* Add comments for physical field name and add more fields for schema
extraction test
* reset id to name mapping before extraction
* Update name mapping in one place
* update comments
* Create schema extractor per conversion, add test cases for schema
evolution
* Add adding columns case for schema evolution test
* recreate the name mapping when source schema contains field IDs
* Wrap single line if statement, add default case for switch, update name
mapping only when schema changed
* update comments
* add comments for setNameMapping
* Update name mapping setting function name and comments
---
.../apache/xtable/model/schema/InternalField.java | 3 +
.../apache/xtable/delta/DeltaSchemaExtractor.java | 6 +
.../xtable/iceberg/IcebergConversionTarget.java | 53 ++++++++
.../xtable/iceberg/IcebergSchemaExtractor.java | 59 ++++++++-
.../apache/xtable/iceberg/IcebergTableManager.java | 12 +-
.../org/apache/xtable/ITConversionController.java | 42 ++++++
.../org/apache/xtable/TestSparkDeltaTable.java | 34 ++++-
.../org/apache/xtable/delta/TestDeltaHelper.java | 37 ++++++
.../xtable/iceberg/TestIcebergSchemaExtractor.java | 144 +++++++++++++++++++++
.../xtable/iceberg/TestIcebergTableManager.java | 17 +--
10 files changed, 379 insertions(+), 28 deletions(-)
diff --git
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
index 31eb0ed4..7f209761 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
@@ -43,6 +43,9 @@ public class InternalField {
// The id field for the field. This is used to identify the field in the
schema even after
// renames.
Integer fieldId;
+ // The name of the column in the data file used to store this field if it
differs from the name in
+ // the table's definition; otherwise, null
+ @Getter String storageName;
// represents the fully qualified path to the field (dot separated)
@Getter(lazy = true)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index 1376f884..5119f5f5 100644
---
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -55,6 +55,7 @@ import org.apache.xtable.schema.SchemaUtils;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class DeltaSchemaExtractor {
private static final String DELTA_COLUMN_MAPPING_ID =
"delta.columnMapping.id";
+ private static final String DELTA_COLUMN_MAPPING_NAME =
"delta.columnMapping.physicalName";
private static final DeltaSchemaExtractor INSTANCE = new
DeltaSchemaExtractor();
// Timestamps in Delta are microsecond precision by default
private static final Map<InternalSchema.MetadataKey, Object>
@@ -136,6 +137,10 @@ public class DeltaSchemaExtractor {
field.metadata().contains(DELTA_COLUMN_MAPPING_ID)
? (int)
field.metadata().getLong(DELTA_COLUMN_MAPPING_ID)
: null;
+ String storageName =
+ field.metadata().contains(DELTA_COLUMN_MAPPING_NAME)
+ ?
field.metadata().getString(DELTA_COLUMN_MAPPING_NAME)
+ : null;
String fieldComment =
field.getComment().isDefined() ?
field.getComment().get() : null;
InternalSchema schema =
@@ -148,6 +153,7 @@ public class DeltaSchemaExtractor {
return InternalField.builder()
.name(field.name())
.fieldId(fieldId)
+ .storageName(storageName)
.parentPath(parentPath)
.schema(schema)
.defaultValue(
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index b05089d0..bac7b510 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -20,9 +20,12 @@ package org.apache.xtable.iceberg;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import lombok.extern.log4j.Log4j2;
@@ -39,6 +42,11 @@ import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.MappedFields;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.model.InternalTable;
@@ -161,13 +169,58 @@ public class IcebergConversionTarget implements
ConversionTarget {
}
}
+ /**
+ * Create a name mapping from the given schema, add storage names in {@link
+ * IcebergSchemaExtractor#getIdToStorageName()}, if any exist, to the name
mapping, and apply the
+ * updated mapping to the table.
+ *
+ * <p>This method should only be called when the name mapping is not set, or
when field IDs are
+ * provided in the source schema
+ *
+ * @param schema the {@link Schema} from which to create the name mapping
+ */
+ private void createAndSetNameMapping(Schema schema) {
+ NameMapping mapping = MappingUtil.create(schema);
+ MappedFields updatedMappedFields =
+ updateNameMapping(mapping.asMappedFields(),
schemaExtractor.getIdToStorageName());
+ transaction
+ .updateProperties()
+ .set(
+ TableProperties.DEFAULT_NAME_MAPPING,
+ NameMappingParser.toJson(NameMapping.of(updatedMappedFields)))
+ .commit();
+ }
+
+ private MappedFields updateNameMapping(
+ MappedFields mapping, Map<Integer, String> idToStorageName) {
+ if (mapping == null) {
+ return null;
+ }
+ List<MappedField> fieldResults = new ArrayList<>();
+ for (MappedField field : mapping.fields()) {
+ Set<String> fieldNames = new HashSet<>(field.names());
+ if (idToStorageName.containsKey(field.id())) {
+ fieldNames.add(idToStorageName.get(field.id()));
+ }
+ MappedFields nestedMapping = updateNameMapping(field.nestedMapping(),
idToStorageName);
+ fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping));
+ }
+ return MappedFields.of(fieldResults);
+ }
+
@Override
public void syncSchema(InternalSchema schema) {
Schema latestSchema = schemaExtractor.toIceberg(schema);
+ if
(!transaction.table().properties().containsKey(TableProperties.DEFAULT_NAME_MAPPING))
{
+ createAndSetNameMapping(latestSchema);
+ }
if (!transaction.table().schema().sameSchema(latestSchema)) {
boolean hasFieldIds =
schema.getAllFields().stream().anyMatch(field -> field.getFieldId()
!= null);
if (hasFieldIds) {
+ // If field IDs are provided in the source schema, manually update the
name mapping to
+ // ensure the IDs match the correct fields.
+ createAndSetNameMapping(latestSchema);
// There is no clean way to sync the schema with the provided field
IDs using the
// transaction API so we commit the current transaction and interact
directly with
// the operations API.
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
index 4366bc02..3ae41561 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.AccessLevel;
+import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
@@ -53,18 +54,69 @@ import org.apache.xtable.model.schema.InternalType;
@Log4j2
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class IcebergSchemaExtractor {
- private static final IcebergSchemaExtractor INSTANCE = new
IcebergSchemaExtractor();
private static final String MAP_KEY_FIELD_NAME = "key";
private static final String MAP_VALUE_FIELD_NAME = "value";
private static final String LIST_ELEMENT_FIELD_NAME = "element";
+ @Getter private final Map<Integer, String> idToStorageName = new HashMap<>();
public static IcebergSchemaExtractor getInstance() {
- return INSTANCE;
+ return new IcebergSchemaExtractor();
+ }
+
+ private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger
fieldIdTracker) {
+ schema.getFields().stream()
+ .forEach(
+ field -> {
+ if (field.getFieldId() != null) {
+ fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max);
+ }
+ initializeFieldIdTracker(field, fieldIdTracker);
+ });
+ }
+
+ private void initializeFieldIdTracker(InternalField field, AtomicInteger
fieldIdTracker) {
+ switch (field.getSchema().getDataType()) {
+ case RECORD:
+ initializeFieldIdTracker(field.getSchema(), fieldIdTracker);
+ return;
+ case MAP:
+ field.getSchema().getFields().stream()
+ .filter(
+ mapField ->
+
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())
+ ||
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
+ .forEach(
+ mapField -> {
+ if (mapField.getFieldId() != null) {
+ fieldIdTracker.accumulateAndGet(mapField.getFieldId(),
Math::max);
+ }
+ initializeFieldIdTracker(mapField, fieldIdTracker);
+ });
+ return;
+ case LIST:
+ field.getSchema().getFields().stream()
+ .filter(
+ arrayField ->
+
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(arrayField.getName()))
+ .forEach(
+ arrayField -> {
+ if (arrayField.getFieldId() != null) {
+ fieldIdTracker.accumulateAndGet(arrayField.getFieldId(),
Math::max);
+ }
+ initializeFieldIdTracker(arrayField, fieldIdTracker);
+ });
+ return;
+ default:
+ return;
+ }
}
public Schema toIceberg(InternalSchema internalSchema) {
// if field IDs are not assigned in the source, just use an incrementing
integer
AtomicInteger fieldIdTracker = new AtomicInteger(0);
+ // traverse the schema before conversion to ensure fieldIdTracker won't
return any
+ // fieldIds that are already present in the schema
+ initializeFieldIdTracker(internalSchema, fieldIdTracker);
List<Types.NestedField> nestedFields = convertFields(internalSchema,
fieldIdTracker);
List<InternalField> recordKeyFields = internalSchema.getRecordKeyFields();
boolean recordKeyFieldsAreNotRequired =
@@ -154,6 +206,9 @@ public class IcebergSchemaExtractor {
List<Types.NestedField> nestedFields = new
ArrayList<>(schema.getFields().size());
for (int i = 0; i < schema.getFields().size(); i++) {
InternalField field = schema.getFields().get(i);
+ if (field.getStorageName() != null) {
+ idToStorageName.put(ids.get(i), field.getStorageName());
+ }
nestedFields.add(
Types.NestedField.of(
ids.get(i),
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
index 06b625c0..19f162a6 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
@@ -36,13 +36,10 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.mapping.MappingUtil;
-import org.apache.iceberg.mapping.NameMappingParser;
@AllArgsConstructor(staticName = "of")
@Log4j2
@@ -88,14 +85,14 @@ class IcebergTableManager {
new Schema(),
PartitionSpec.unpartitioned(),
basePath,
- getDefaultMappingProperties(schema)))
+ Collections.emptyMap()))
.orElseGet(
() ->
getHadoopTables()
.create(
new Schema(),
PartitionSpec.unpartitioned(),
- getDefaultMappingProperties(schema),
+ Collections.emptyMap(),
basePath));
// set the schema with the provided field IDs
TableOperations operations = ((BaseTable)
tableWithEmptySchema).operations();
@@ -112,11 +109,6 @@ class IcebergTableManager {
}
}
- private Map<String, String> getDefaultMappingProperties(Schema schema) {
- return Collections.singletonMap(
- TableProperties.DEFAULT_NAME_MAPPING,
NameMappingParser.toJson(MappingUtil.create(schema)));
- }
-
private Optional<Catalog> getCatalog(IcebergCatalogConfig catalogConfig) {
if (catalogConfig == null) {
return Optional.empty();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index b8ea413b..019f69ea 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -746,6 +746,48 @@ public class ITConversionController {
}
}
+ @Test
+ public void testColumnMappingEnabledDeltaToIceberg() {
+ String tableName = getTableName();
+ ConversionSourceProvider<?> conversionSourceProvider =
getConversionSourceProvider(DELTA);
+ try (TestSparkDeltaTable table =
+ TestSparkDeltaTable.forColumnMappingEnabled(tableName, tempDir,
sparkSession, null)) {
+ table.insertRows(20);
+ ConversionController conversionController =
+ new ConversionController(jsc.hadoopConfiguration());
+ ConversionConfig conversionConfig =
+ getTableSyncConfig(
+ DELTA,
+ SyncMode.INCREMENTAL,
+ tableName,
+ table,
+ Collections.singletonList(ICEBERG),
+ null,
+ null);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ table.insertRows(10);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ table.insertRows(10);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(DELTA, table,
Collections.singletonList(ICEBERG), 40);
+
+ table.dropColumn("long_field");
+ table.insertRows(10);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(DELTA, table,
Collections.singletonList(ICEBERG), 50);
+
+ table.renameColumn("double_field", "scores");
+ table.insertRows(10);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(DELTA, table,
Collections.singletonList(ICEBERG), 60);
+
+ table.addColumn();
+ table.insertRows(10);
+ conversionController.sync(conversionConfig, conversionSourceProvider);
+ checkDatasetEquivalence(DELTA, table,
Collections.singletonList(ICEBERG), 70);
+ }
+ }
+
@Test
public void testMetadataRetention() throws Exception {
String tableName = getTableName();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
index 028eca1b..743206aa 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
@@ -69,12 +69,27 @@ public class TestSparkDeltaTable implements
GenericTable<Row, Object>, Closeable
return new TestSparkDeltaTable(tableName, tempDir, sparkSession,
partitionField, true);
}
+ public static TestSparkDeltaTable forColumnMappingEnabled(
+ String tableName, Path tempDir, SparkSession sparkSession, String
partitionField) {
+ return new TestSparkDeltaTable(tableName, tempDir, sparkSession,
partitionField, true, true);
+ }
+
public TestSparkDeltaTable(
String name,
Path tempDir,
SparkSession sparkSession,
String partitionField,
boolean includeAdditionalColumns) {
+ this(name, tempDir, sparkSession, partitionField,
includeAdditionalColumns, false);
+ }
+
+ public TestSparkDeltaTable(
+ String name,
+ Path tempDir,
+ SparkSession sparkSession,
+ String partitionField,
+ boolean includeAdditionalColumns,
+ boolean enableColumnMapping) {
try {
this.tableName = name;
this.basePath = initBasePath(tempDir, tableName);
@@ -82,7 +97,8 @@ public class TestSparkDeltaTable implements GenericTable<Row,
Object>, Closeable
this.partitionField = partitionField;
this.includeAdditionalColumns = includeAdditionalColumns;
this.testDeltaHelper =
- TestDeltaHelper.createTestDataHelper(partitionField,
includeAdditionalColumns);
+ TestDeltaHelper.createTestDataHelper(
+ partitionField, includeAdditionalColumns, enableColumnMapping);
testDeltaHelper.createTable(sparkSession, tableName, basePath);
this.deltaLog = DeltaLog.forTable(sparkSession, basePath);
this.deltaTable = DeltaTable.forPath(sparkSession, basePath);
@@ -260,4 +276,20 @@ public class TestSparkDeltaTable implements
GenericTable<Row, Object>, Closeable
.filter(columnName -> !columnName.equals("yearOfBirth"))
.collect(Collectors.toList());
}
+
+ public void dropColumn(String colName) {
+ testDeltaHelper.dropColumn(colName);
+ sparkSession.sql(String.format("ALTER TABLE delta.`%s` DROP COLUMN %s",
basePath, colName));
+ }
+
+ public void renameColumn(String colName, String newColName) {
+ testDeltaHelper.renameColumn(colName, newColName);
+ sparkSession.sql(
+ String.format(
+ "ALTER TABLE delta.`%s` RENAME COLUMN %s TO %s", basePath,
colName, newColName));
+ }
+
+ public void addColumn() {
+ testDeltaHelper.addColumn();
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
index a677f57e..2e95e89b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
@@ -117,14 +117,21 @@ public class TestDeltaHelper {
StructType tableStructSchema;
String partitionField;
boolean includeAdditionalColumns;
+ boolean enableColumnMapping;
public static TestDeltaHelper createTestDataHelper(
String partitionField, boolean includeAdditionalColumns) {
+ return createTestDataHelper(partitionField, includeAdditionalColumns,
false);
+ }
+
+ public static TestDeltaHelper createTestDataHelper(
+ String partitionField, boolean includeAdditionalColumns, boolean
enableColumnMapping) {
StructType tableSchema = generateDynamicSchema(partitionField,
includeAdditionalColumns);
return TestDeltaHelper.builder()
.tableStructSchema(tableSchema)
.partitionField(partitionField)
.includeAdditionalColumns(includeAdditionalColumns)
+ .enableColumnMapping(enableColumnMapping)
.build();
}
@@ -160,6 +167,11 @@ public class TestDeltaHelper {
if (includeAdditionalColumns) {
tableBuilder.addColumn("street", StringType);
}
+ if (enableColumnMapping) {
+ tableBuilder.property("delta.minReaderVersion", "2");
+ tableBuilder.property("delta.minWriterVersion", "5");
+ tableBuilder.property("delta.columnMapping.mode", "name");
+ }
tableBuilder.execute();
}
@@ -303,4 +315,29 @@ public class TestDeltaHelper {
.mapToObj(i -> generateRandomRowForGivenYearAndLevel(partitionValue,
level))
.collect(Collectors.toList());
}
+
+ public void dropColumn(String colName) {
+ this.tableStructSchema =
+ new StructType(
+ Arrays.stream(tableStructSchema.fields())
+ .filter(field -> field.name() != colName)
+ .toArray(StructField[]::new));
+ }
+
+ public void renameColumn(String colName, String newColName) {
+ this.tableStructSchema =
+ new StructType(
+ Arrays.stream(tableStructSchema.fields())
+ .map(
+ field ->
+ field.name().equals(colName)
+ ? new StructField(
+ newColName, field.dataType(),
field.nullable(), field.metadata())
+ : field)
+ .toArray(StructField[]::new));
+ }
+
+ public void addColumn() {
+ this.tableStructSchema = tableStructSchema.add("city", StringType, true);
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
index 824e2285..f842f99c 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
@@ -1044,4 +1044,148 @@ public class TestIcebergSchemaExtractor {
Assertions.assertTrue(
icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema)));
}
+
+ @Test
+ public void testToIcebergWithPartialFieldIdsSet() {
+ InternalSchema internalSchema =
+ InternalSchema.builder()
+ .name("testRecord")
+ .dataType(InternalType.RECORD)
+ .isNullable(false)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("name")
+ .fieldId(1)
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+ .dataType(InternalType.STRING)
+ .isNullable(true)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("scores")
+ .fieldId(2)
+ .schema(
+ InternalSchema.builder()
+ .name("array")
+ .dataType(InternalType.LIST)
+ .isNullable(true)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("_one_field_element")
+ .parentPath("scores")
+ .schema(
+ InternalSchema.builder()
+ .name("long")
+
.dataType(InternalType.LONG)
+ .isNullable(true)
+ .build())
+ .fieldId(null)
+ .build()))
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("record_map")
+ .fieldId(3)
+ .schema(
+ InternalSchema.builder()
+ .name("map")
+ .dataType(InternalType.MAP)
+ .isNullable(true)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("_one_field_key")
+ .parentPath("record_map")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+
.dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("_one_field_value")
+ .parentPath("record_map")
+ .schema(
+ InternalSchema.builder()
+ .name("struct")
+
.dataType(InternalType.RECORD)
+ .isNullable(true)
+ .fields(
+ Arrays.asList(
+
InternalField.builder()
+
.name("nested_int")
+ .fieldId(5)
+ .parentPath(
+
"record_map._one_field_value")
+ .schema(
+
InternalSchema.builder()
+
.name("integer")
+
.dataType(InternalType.INT)
+
.isNullable(true)
+
.build())
+ .build()))
+ .build())
+ .build()))
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("primitive_map")
+ .fieldId(4)
+ .schema(
+ InternalSchema.builder()
+ .name("map")
+ .dataType(InternalType.MAP)
+ .isNullable(false)
+ .fields(
+ Arrays.asList(
+ InternalField.builder()
+ .name("_one_field_key")
+ .parentPath("primitive_map")
+ .schema(
+ InternalSchema.builder()
+ .name("string")
+
.dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .build(),
+ InternalField.builder()
+ .name("_one_field_value")
+ .parentPath("primitive_map")
+ .schema(
+ InternalSchema.builder()
+ .name("integer")
+ .dataType(InternalType.INT)
+ .isNullable(false)
+ .build())
+ .build()))
+ .build())
+ .build()))
+ .build();
+ Schema icebergRepresentation =
+ new Schema(
+ Types.NestedField.optional(1, "name", Types.StringType.get()),
+ Types.NestedField.optional(
+ 2, "scores", Types.ListType.ofOptional(6,
Types.LongType.get())),
+ Types.NestedField.optional(
+ 3,
+ "record_map",
+ Types.MapType.ofOptional(
+ 7,
+ 8,
+ Types.StringType.get(),
+ Types.StructType.of(
+ Arrays.asList(
+ Types.NestedField.optional(
+ 5, "nested_int", Types.IntegerType.get()))))),
+ Types.NestedField.required(
+ 4,
+ "primitive_map",
+ Types.MapType.ofRequired(9, 10, Types.StringType.get(),
Types.IntegerType.get())));
+
assertTrue(icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema)));
+ }
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
index f424e3a9..413a16d2 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
@@ -40,12 +40,9 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.mapping.MappingUtil;
-import org.apache.iceberg.mapping.NameMappingParser;
public class TestIcebergTableManager {
private static final String BASE_PATH = "file:///basePath/";
@@ -117,10 +114,7 @@ public class TestIcebergTableManager {
any(),
eq(PartitionSpec.unpartitioned()),
eq(BASE_PATH),
- eq(
- Collections.singletonMap(
- TableProperties.DEFAULT_NAME_MAPPING,
- NameMappingParser.toJson(MappingUtil.create(schema))))))
+ eq(Collections.emptyMap())))
.thenReturn(mockInitialTable);
when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable);
@@ -164,14 +158,7 @@ public class TestIcebergTableManager {
Schema schema = new Schema();
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
when(mockCatalog.createTable(
- eq(IDENTIFIER),
- any(),
- any(),
- eq(BASE_PATH),
- eq(
- Collections.singletonMap(
- TableProperties.DEFAULT_NAME_MAPPING,
- NameMappingParser.toJson(MappingUtil.create(schema))))))
+ eq(IDENTIFIER), any(), any(), eq(BASE_PATH),
eq(Collections.emptyMap())))
.thenThrow(new AlreadyExistsException("Table already exists"));
when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable);