This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 81af6e29 Fix Delta to Iceberg not working on column mapping enabled 
Delta source table (#766)
81af6e29 is described below

commit 81af6e294041ec46cd30cd2796b8ab20647b541a
Author: Xingrong Chen <[email protected]>
AuthorDate: Mon Dec 29 07:58:49 2025 -0800

    Fix Delta to Iceberg not working on column mapping enabled Delta source 
table (#766)
    
    * Fix Iceberg duplicate field id in converted schema
    
    * Traverse the whole schema before converting, add test for the whole 
conversion process
    
    * update name mapping in the target Iceberg to read values from correct 
parquet columns
    
    * Add comments for physical field name and add more fields for schema 
extraction test
    
    * reset id to name mapping before extraction
    
    * Update name mapping in one place
    
    * update comments
    
    * Create schema extractor per conversion, add test cases for schema 
evolution
    
    * Add adding columns case for schema evolution test
    
    * recreate the name mapping when source schema contains field IDs
    
    * Wrap single line if statement, add default case for switch, update name 
mapping only when schema changed
    
    * update comments
    
    * add comments for setNameMapping
    
    * Update name mapping setting function name and comments
---
 .../apache/xtable/model/schema/InternalField.java  |   3 +
 .../apache/xtable/delta/DeltaSchemaExtractor.java  |   6 +
 .../xtable/iceberg/IcebergConversionTarget.java    |  53 ++++++++
 .../xtable/iceberg/IcebergSchemaExtractor.java     |  59 ++++++++-
 .../apache/xtable/iceberg/IcebergTableManager.java |  12 +-
 .../org/apache/xtable/ITConversionController.java  |  42 ++++++
 .../org/apache/xtable/TestSparkDeltaTable.java     |  34 ++++-
 .../org/apache/xtable/delta/TestDeltaHelper.java   |  37 ++++++
 .../xtable/iceberg/TestIcebergSchemaExtractor.java | 144 +++++++++++++++++++++
 .../xtable/iceberg/TestIcebergTableManager.java    |  17 +--
 10 files changed, 379 insertions(+), 28 deletions(-)

diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java 
b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
index 31eb0ed4..7f209761 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
@@ -43,6 +43,9 @@ public class InternalField {
   // The id field for the field. This is used to identify the field in the 
schema even after
   // renames.
   Integer fieldId;
+  // The name of the column in the data file used to store this field if it 
differs from the name in
+  // the table's definition; otherwise, null
+  @Getter String storageName;
 
   // represents the fully qualified path to the field (dot separated)
   @Getter(lazy = true)
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index 1376f884..5119f5f5 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -55,6 +55,7 @@ import org.apache.xtable.schema.SchemaUtils;
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class DeltaSchemaExtractor {
   private static final String DELTA_COLUMN_MAPPING_ID = 
"delta.columnMapping.id";
+  private static final String DELTA_COLUMN_MAPPING_NAME = 
"delta.columnMapping.physicalName";
   private static final DeltaSchemaExtractor INSTANCE = new 
DeltaSchemaExtractor();
   // Timestamps in Delta are microsecond precision by default
   private static final Map<InternalSchema.MetadataKey, Object>
@@ -136,6 +137,10 @@ public class DeltaSchemaExtractor {
                           field.metadata().contains(DELTA_COLUMN_MAPPING_ID)
                               ? (int) 
field.metadata().getLong(DELTA_COLUMN_MAPPING_ID)
                               : null;
+                      String storageName =
+                          field.metadata().contains(DELTA_COLUMN_MAPPING_NAME)
+                              ? 
field.metadata().getString(DELTA_COLUMN_MAPPING_NAME)
+                              : null;
                       String fieldComment =
                           field.getComment().isDefined() ? 
field.getComment().get() : null;
                       InternalSchema schema =
@@ -148,6 +153,7 @@ public class DeltaSchemaExtractor {
                       return InternalField.builder()
                           .name(field.name())
                           .fieldId(fieldId)
+                          .storageName(storageName)
                           .parentPath(parentPath)
                           .schema(schema)
                           .defaultValue(
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index b05089d0..bac7b510 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -20,9 +20,12 @@ package org.apache.xtable.iceberg;
 
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 
 import lombok.extern.log4j.Log4j2;
 
@@ -39,6 +42,11 @@ import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.MappedFields;
+import org.apache.iceberg.mapping.MappingUtil;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
 
 import org.apache.xtable.conversion.TargetTable;
 import org.apache.xtable.model.InternalTable;
@@ -161,13 +169,58 @@ public class IcebergConversionTarget implements 
ConversionTarget {
     }
   }
 
+  /**
+   * Create a name mapping from the given schema, add storage names in {@link
+   * IcebergSchemaExtractor#getIdToStorageName()}, if any exist, to the name 
mapping, and apply the
+   * updated mapping to the table.
+   *
+   * <p>This method should only be called when the name mapping is not set, or 
when field IDs are
+   * provided in the source schema
+   *
+   * @param schema the {@link Schema} from which to create the name mapping
+   */
+  private void createAndSetNameMapping(Schema schema) {
+    NameMapping mapping = MappingUtil.create(schema);
+    MappedFields updatedMappedFields =
+        updateNameMapping(mapping.asMappedFields(), 
schemaExtractor.getIdToStorageName());
+    transaction
+        .updateProperties()
+        .set(
+            TableProperties.DEFAULT_NAME_MAPPING,
+            NameMappingParser.toJson(NameMapping.of(updatedMappedFields)))
+        .commit();
+  }
+
+  private MappedFields updateNameMapping(
+      MappedFields mapping, Map<Integer, String> idToStorageName) {
+    if (mapping == null) {
+      return null;
+    }
+    List<MappedField> fieldResults = new ArrayList<>();
+    for (MappedField field : mapping.fields()) {
+      Set<String> fieldNames = new HashSet<>(field.names());
+      if (idToStorageName.containsKey(field.id())) {
+        fieldNames.add(idToStorageName.get(field.id()));
+      }
+      MappedFields nestedMapping = updateNameMapping(field.nestedMapping(), 
idToStorageName);
+      fieldResults.add(MappedField.of(field.id(), fieldNames, nestedMapping));
+    }
+    return MappedFields.of(fieldResults);
+  }
+
   @Override
   public void syncSchema(InternalSchema schema) {
     Schema latestSchema = schemaExtractor.toIceberg(schema);
+    if 
(!transaction.table().properties().containsKey(TableProperties.DEFAULT_NAME_MAPPING))
 {
+      createAndSetNameMapping(latestSchema);
+    }
     if (!transaction.table().schema().sameSchema(latestSchema)) {
       boolean hasFieldIds =
           schema.getAllFields().stream().anyMatch(field -> field.getFieldId() 
!= null);
       if (hasFieldIds) {
+        // If field IDs are provided in the source schema, manually update the 
name mapping to
+        // ensure the IDs match the correct fields.
+        createAndSetNameMapping(latestSchema);
         // There is no clean way to sync the schema with the provided field 
IDs using the
         // transaction API so we commit the current transaction and interact 
directly with
         // the operations API.
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
index 4366bc02..3ae41561 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import lombok.AccessLevel;
+import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.extern.log4j.Log4j2;
 
@@ -53,18 +54,69 @@ import org.apache.xtable.model.schema.InternalType;
 @Log4j2
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class IcebergSchemaExtractor {
-  private static final IcebergSchemaExtractor INSTANCE = new 
IcebergSchemaExtractor();
   private static final String MAP_KEY_FIELD_NAME = "key";
   private static final String MAP_VALUE_FIELD_NAME = "value";
   private static final String LIST_ELEMENT_FIELD_NAME = "element";
+  @Getter private final Map<Integer, String> idToStorageName = new HashMap<>();
 
   public static IcebergSchemaExtractor getInstance() {
-    return INSTANCE;
+    return new IcebergSchemaExtractor();
+  }
+
+  private void initializeFieldIdTracker(InternalSchema schema, AtomicInteger 
fieldIdTracker) {
+    schema.getFields().stream()
+        .forEach(
+            field -> {
+              if (field.getFieldId() != null) {
+                fieldIdTracker.accumulateAndGet(field.getFieldId(), Math::max);
+              }
+              initializeFieldIdTracker(field, fieldIdTracker);
+            });
+  }
+
+  private void initializeFieldIdTracker(InternalField field, AtomicInteger 
fieldIdTracker) {
+    switch (field.getSchema().getDataType()) {
+      case RECORD:
+        initializeFieldIdTracker(field.getSchema(), fieldIdTracker);
+        return;
+      case MAP:
+        field.getSchema().getFields().stream()
+            .filter(
+                mapField ->
+                    
InternalField.Constants.MAP_KEY_FIELD_NAME.equals(mapField.getName())
+                        || 
InternalField.Constants.MAP_VALUE_FIELD_NAME.equals(mapField.getName()))
+            .forEach(
+                mapField -> {
+                  if (mapField.getFieldId() != null) {
+                    fieldIdTracker.accumulateAndGet(mapField.getFieldId(), 
Math::max);
+                  }
+                  initializeFieldIdTracker(mapField, fieldIdTracker);
+                });
+        return;
+      case LIST:
+        field.getSchema().getFields().stream()
+            .filter(
+                arrayField ->
+                    
InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME.equals(arrayField.getName()))
+            .forEach(
+                arrayField -> {
+                  if (arrayField.getFieldId() != null) {
+                    fieldIdTracker.accumulateAndGet(arrayField.getFieldId(), 
Math::max);
+                  }
+                  initializeFieldIdTracker(arrayField, fieldIdTracker);
+                });
+        return;
+      default:
+        return;
+    }
   }
 
   public Schema toIceberg(InternalSchema internalSchema) {
     // if field IDs are not assigned in the source, just use an incrementing 
integer
     AtomicInteger fieldIdTracker = new AtomicInteger(0);
+    // traverse the schema before conversion to ensure fieldIdTracker won't 
return any
+    // fieldIds that are already present in the schema
+    initializeFieldIdTracker(internalSchema, fieldIdTracker);
     List<Types.NestedField> nestedFields = convertFields(internalSchema, 
fieldIdTracker);
     List<InternalField> recordKeyFields = internalSchema.getRecordKeyFields();
     boolean recordKeyFieldsAreNotRequired =
@@ -154,6 +206,9 @@ public class IcebergSchemaExtractor {
     List<Types.NestedField> nestedFields = new 
ArrayList<>(schema.getFields().size());
     for (int i = 0; i < schema.getFields().size(); i++) {
       InternalField field = schema.getFields().get(i);
+      if (field.getStorageName() != null) {
+        idToStorageName.put(ids.get(i), field.getStorageName());
+      }
       nestedFields.add(
           Types.NestedField.of(
               ids.get(i),
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
index 06b625c0..19f162a6 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
@@ -36,13 +36,10 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.mapping.MappingUtil;
-import org.apache.iceberg.mapping.NameMappingParser;
 
 @AllArgsConstructor(staticName = "of")
 @Log4j2
@@ -88,14 +85,14 @@ class IcebergTableManager {
                             new Schema(),
                             PartitionSpec.unpartitioned(),
                             basePath,
-                            getDefaultMappingProperties(schema)))
+                            Collections.emptyMap()))
                 .orElseGet(
                     () ->
                         getHadoopTables()
                             .create(
                                 new Schema(),
                                 PartitionSpec.unpartitioned(),
-                                getDefaultMappingProperties(schema),
+                                Collections.emptyMap(),
                                 basePath));
         // set the schema with the provided field IDs
         TableOperations operations = ((BaseTable) 
tableWithEmptySchema).operations();
@@ -112,11 +109,6 @@ class IcebergTableManager {
     }
   }
 
-  private Map<String, String> getDefaultMappingProperties(Schema schema) {
-    return Collections.singletonMap(
-        TableProperties.DEFAULT_NAME_MAPPING, 
NameMappingParser.toJson(MappingUtil.create(schema)));
-  }
-
   private Optional<Catalog> getCatalog(IcebergCatalogConfig catalogConfig) {
     if (catalogConfig == null) {
       return Optional.empty();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index b8ea413b..019f69ea 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -746,6 +746,48 @@ public class ITConversionController {
     }
   }
 
+  @Test
+  public void testColumnMappingEnabledDeltaToIceberg() {
+    String tableName = getTableName();
+    ConversionSourceProvider<?> conversionSourceProvider = 
getConversionSourceProvider(DELTA);
+    try (TestSparkDeltaTable table =
+        TestSparkDeltaTable.forColumnMappingEnabled(tableName, tempDir, 
sparkSession, null)) {
+      table.insertRows(20);
+      ConversionController conversionController =
+          new ConversionController(jsc.hadoopConfiguration());
+      ConversionConfig conversionConfig =
+          getTableSyncConfig(
+              DELTA,
+              SyncMode.INCREMENTAL,
+              tableName,
+              table,
+              Collections.singletonList(ICEBERG),
+              null,
+              null);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      table.insertRows(10);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      table.insertRows(10);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(DELTA, table, 
Collections.singletonList(ICEBERG), 40);
+
+      table.dropColumn("long_field");
+      table.insertRows(10);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(DELTA, table, 
Collections.singletonList(ICEBERG), 50);
+
+      table.renameColumn("double_field", "scores");
+      table.insertRows(10);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(DELTA, table, 
Collections.singletonList(ICEBERG), 60);
+
+      table.addColumn();
+      table.insertRows(10);
+      conversionController.sync(conversionConfig, conversionSourceProvider);
+      checkDatasetEquivalence(DELTA, table, 
Collections.singletonList(ICEBERG), 70);
+    }
+  }
+
   @Test
   public void testMetadataRetention() throws Exception {
     String tableName = getTableName();
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
index 028eca1b..743206aa 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkDeltaTable.java
@@ -69,12 +69,27 @@ public class TestSparkDeltaTable implements 
GenericTable<Row, Object>, Closeable
     return new TestSparkDeltaTable(tableName, tempDir, sparkSession, 
partitionField, true);
   }
 
+  public static TestSparkDeltaTable forColumnMappingEnabled(
+      String tableName, Path tempDir, SparkSession sparkSession, String 
partitionField) {
+    return new TestSparkDeltaTable(tableName, tempDir, sparkSession, 
partitionField, true, true);
+  }
+
   public TestSparkDeltaTable(
       String name,
       Path tempDir,
       SparkSession sparkSession,
       String partitionField,
       boolean includeAdditionalColumns) {
+    this(name, tempDir, sparkSession, partitionField, 
includeAdditionalColumns, false);
+  }
+
+  public TestSparkDeltaTable(
+      String name,
+      Path tempDir,
+      SparkSession sparkSession,
+      String partitionField,
+      boolean includeAdditionalColumns,
+      boolean enableColumnMapping) {
     try {
       this.tableName = name;
       this.basePath = initBasePath(tempDir, tableName);
@@ -82,7 +97,8 @@ public class TestSparkDeltaTable implements GenericTable<Row, 
Object>, Closeable
       this.partitionField = partitionField;
       this.includeAdditionalColumns = includeAdditionalColumns;
       this.testDeltaHelper =
-          TestDeltaHelper.createTestDataHelper(partitionField, 
includeAdditionalColumns);
+          TestDeltaHelper.createTestDataHelper(
+              partitionField, includeAdditionalColumns, enableColumnMapping);
       testDeltaHelper.createTable(sparkSession, tableName, basePath);
       this.deltaLog = DeltaLog.forTable(sparkSession, basePath);
       this.deltaTable = DeltaTable.forPath(sparkSession, basePath);
@@ -260,4 +276,20 @@ public class TestSparkDeltaTable implements 
GenericTable<Row, Object>, Closeable
         .filter(columnName -> !columnName.equals("yearOfBirth"))
         .collect(Collectors.toList());
   }
+
+  public void dropColumn(String colName) {
+    testDeltaHelper.dropColumn(colName);
+    sparkSession.sql(String.format("ALTER TABLE delta.`%s` DROP COLUMN %s", 
basePath, colName));
+  }
+
+  public void renameColumn(String colName, String newColName) {
+    testDeltaHelper.renameColumn(colName, newColName);
+    sparkSession.sql(
+        String.format(
+            "ALTER TABLE delta.`%s` RENAME COLUMN %s TO %s", basePath, 
colName, newColName));
+  }
+
+  public void addColumn() {
+    testDeltaHelper.addColumn();
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
index a677f57e..2e95e89b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaHelper.java
@@ -117,14 +117,21 @@ public class TestDeltaHelper {
   StructType tableStructSchema;
   String partitionField;
   boolean includeAdditionalColumns;
+  boolean enableColumnMapping;
 
   public static TestDeltaHelper createTestDataHelper(
       String partitionField, boolean includeAdditionalColumns) {
+    return createTestDataHelper(partitionField, includeAdditionalColumns, 
false);
+  }
+
+  public static TestDeltaHelper createTestDataHelper(
+      String partitionField, boolean includeAdditionalColumns, boolean 
enableColumnMapping) {
     StructType tableSchema = generateDynamicSchema(partitionField, 
includeAdditionalColumns);
     return TestDeltaHelper.builder()
         .tableStructSchema(tableSchema)
         .partitionField(partitionField)
         .includeAdditionalColumns(includeAdditionalColumns)
+        .enableColumnMapping(enableColumnMapping)
         .build();
   }
 
@@ -160,6 +167,11 @@ public class TestDeltaHelper {
     if (includeAdditionalColumns) {
       tableBuilder.addColumn("street", StringType);
     }
+    if (enableColumnMapping) {
+      tableBuilder.property("delta.minReaderVersion", "2");
+      tableBuilder.property("delta.minWriterVersion", "5");
+      tableBuilder.property("delta.columnMapping.mode", "name");
+    }
     tableBuilder.execute();
   }
 
@@ -303,4 +315,29 @@ public class TestDeltaHelper {
         .mapToObj(i -> generateRandomRowForGivenYearAndLevel(partitionValue, 
level))
         .collect(Collectors.toList());
   }
+
+  public void dropColumn(String colName) {
+    this.tableStructSchema =
+        new StructType(
+            Arrays.stream(tableStructSchema.fields())
+                .filter(field -> field.name() != colName)
+                .toArray(StructField[]::new));
+  }
+
+  public void renameColumn(String colName, String newColName) {
+    this.tableStructSchema =
+        new StructType(
+            Arrays.stream(tableStructSchema.fields())
+                .map(
+                    field ->
+                        field.name().equals(colName)
+                            ? new StructField(
+                                newColName, field.dataType(), 
field.nullable(), field.metadata())
+                            : field)
+                .toArray(StructField[]::new));
+  }
+
+  public void addColumn() {
+    this.tableStructSchema = tableStructSchema.add("city", StringType, true);
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
index 824e2285..f842f99c 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
@@ -1044,4 +1044,148 @@ public class TestIcebergSchemaExtractor {
     Assertions.assertTrue(
         
icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema)));
   }
+
+  @Test
+  public void testToIcebergWithPartialFieldIdsSet() {
+    InternalSchema internalSchema =
+        InternalSchema.builder()
+            .name("testRecord")
+            .dataType(InternalType.RECORD)
+            .isNullable(false)
+            .fields(
+                Arrays.asList(
+                    InternalField.builder()
+                        .name("name")
+                        .fieldId(1)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("string")
+                                .dataType(InternalType.STRING)
+                                .isNullable(true)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("scores")
+                        .fieldId(2)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("array")
+                                .dataType(InternalType.LIST)
+                                .isNullable(true)
+                                .fields(
+                                    Arrays.asList(
+                                        InternalField.builder()
+                                            .name("_one_field_element")
+                                            .parentPath("scores")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("long")
+                                                    
.dataType(InternalType.LONG)
+                                                    .isNullable(true)
+                                                    .build())
+                                            .fieldId(null)
+                                            .build()))
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("record_map")
+                        .fieldId(3)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("map")
+                                .dataType(InternalType.MAP)
+                                .isNullable(true)
+                                .fields(
+                                    Arrays.asList(
+                                        InternalField.builder()
+                                            .name("_one_field_key")
+                                            .parentPath("record_map")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("string")
+                                                    
.dataType(InternalType.STRING)
+                                                    .isNullable(false)
+                                                    .build())
+                                            .build(),
+                                        InternalField.builder()
+                                            .name("_one_field_value")
+                                            .parentPath("record_map")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("struct")
+                                                    
.dataType(InternalType.RECORD)
+                                                    .isNullable(true)
+                                                    .fields(
+                                                        Arrays.asList(
+                                                            
InternalField.builder()
+                                                                
.name("nested_int")
+                                                                .fieldId(5)
+                                                                .parentPath(
+                                                                    
"record_map._one_field_value")
+                                                                .schema(
+                                                                    
InternalSchema.builder()
+                                                                        
.name("integer")
+                                                                        
.dataType(InternalType.INT)
+                                                                        
.isNullable(true)
+                                                                        
.build())
+                                                                .build()))
+                                                    .build())
+                                            .build()))
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("primitive_map")
+                        .fieldId(4)
+                        .schema(
+                            InternalSchema.builder()
+                                .name("map")
+                                .dataType(InternalType.MAP)
+                                .isNullable(false)
+                                .fields(
+                                    Arrays.asList(
+                                        InternalField.builder()
+                                            .name("_one_field_key")
+                                            .parentPath("primitive_map")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("string")
+                                                    
.dataType(InternalType.STRING)
+                                                    .isNullable(false)
+                                                    .build())
+                                            .build(),
+                                        InternalField.builder()
+                                            .name("_one_field_value")
+                                            .parentPath("primitive_map")
+                                            .schema(
+                                                InternalSchema.builder()
+                                                    .name("integer")
+                                                    .dataType(InternalType.INT)
+                                                    .isNullable(false)
+                                                    .build())
+                                            .build()))
+                                .build())
+                        .build()))
+            .build();
+    Schema icebergRepresentation =
+        new Schema(
+            Types.NestedField.optional(1, "name", Types.StringType.get()),
+            Types.NestedField.optional(
+                2, "scores", Types.ListType.ofOptional(6, 
Types.LongType.get())),
+            Types.NestedField.optional(
+                3,
+                "record_map",
+                Types.MapType.ofOptional(
+                    7,
+                    8,
+                    Types.StringType.get(),
+                    Types.StructType.of(
+                        Arrays.asList(
+                            Types.NestedField.optional(
+                                5, "nested_int", Types.IntegerType.get()))))),
+            Types.NestedField.required(
+                4,
+                "primitive_map",
+                Types.MapType.ofRequired(9, 10, Types.StringType.get(), 
Types.IntegerType.get())));
+    
assertTrue(icebergRepresentation.sameSchema(SCHEMA_EXTRACTOR.toIceberg(internalSchema)));
+  }
 }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
index f424e3a9..413a16d2 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
@@ -40,12 +40,9 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
-import org.apache.iceberg.mapping.MappingUtil;
-import org.apache.iceberg.mapping.NameMappingParser;
 
 public class TestIcebergTableManager {
   private static final String BASE_PATH = "file:///basePath/";
@@ -117,10 +114,7 @@ public class TestIcebergTableManager {
             any(),
             eq(PartitionSpec.unpartitioned()),
             eq(BASE_PATH),
-            eq(
-                Collections.singletonMap(
-                    TableProperties.DEFAULT_NAME_MAPPING,
-                    NameMappingParser.toJson(MappingUtil.create(schema))))))
+            eq(Collections.emptyMap())))
         .thenReturn(mockInitialTable);
     when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable);
 
@@ -164,14 +158,7 @@ public class TestIcebergTableManager {
     Schema schema = new Schema();
     PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
     when(mockCatalog.createTable(
-            eq(IDENTIFIER),
-            any(),
-            any(),
-            eq(BASE_PATH),
-            eq(
-                Collections.singletonMap(
-                    TableProperties.DEFAULT_NAME_MAPPING,
-                    NameMappingParser.toJson(MappingUtil.create(schema))))))
+            eq(IDENTIFIER), any(), any(), eq(BASE_PATH), 
eq(Collections.emptyMap())))
         .thenThrow(new AlreadyExistsException("Table already exists"));
     when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable);
 

Reply via email to