This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git
The following commit(s) were added to refs/heads/main by this push:
new 93d4d27c [750] Fix schema sync for Iceberg tables (#749)
93d4d27c is described below
commit 93d4d27c9e508508400a8b7985dd3bf40c2488bd
Author: Tim Brown <[email protected]>
AuthorDate: Wed Oct 8 15:16:39 2025 -0400
[750] Fix schema sync for Iceberg tables (#749)
* fix initial schema sync
* update sync schema
* spotless
* update run sync to hit field ID case
* add basic unit test, fix test expectation
* style
---
.../xtable/iceberg/IcebergConversionTarget.java | 17 +++++-
.../apache/xtable/iceberg/IcebergSchemaSync.java | 11 ++++
.../apache/xtable/iceberg/IcebergTableManager.java | 50 ++++++++++------
.../org/apache/xtable/TestAbstractHudiTable.java | 6 +-
.../java/org/apache/xtable/TestJavaHudiTable.java | 56 ++++++++++++++---
.../java/org/apache/xtable/TestSparkHudiTable.java | 2 +-
.../iceberg/TestIcebergConversionSource.java | 2 +-
.../xtable/iceberg/TestIcebergSchemaSync.java | 23 +++++++
.../org/apache/xtable/iceberg/TestIcebergSync.java | 48 ++++++++-------
.../xtable/iceberg/TestIcebergTableManager.java | 70 +++++++++++++++-------
xtable-utilities/pom.xml | 6 ++
.../apache/xtable/utilities/ITRunCatalogSync.java | 2 +-
.../org/apache/xtable/utilities/ITRunSync.java | 15 +++--
13 files changed, 228 insertions(+), 80 deletions(-)
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
index a57ac4f6..b05089d0 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java
@@ -164,7 +164,22 @@ public class IcebergConversionTarget implements
ConversionTarget {
@Override
public void syncSchema(InternalSchema schema) {
Schema latestSchema = schemaExtractor.toIceberg(schema);
- schemaSync.sync(transaction.table().schema(), latestSchema, transaction);
+ if (!transaction.table().schema().sameSchema(latestSchema)) {
+ boolean hasFieldIds =
+ schema.getAllFields().stream().anyMatch(field -> field.getFieldId()
!= null);
+ if (hasFieldIds) {
+ // There is no clean way to sync the schema with the provided field
IDs using the
+ // transaction API so we commit the current transaction and interact
directly with
+ // the operations API.
+ transaction.commitTransaction();
+ schemaSync.syncWithProvidedIds(latestSchema, table);
+ // Start a new transaction for remaining operations
+ table.refresh();
+ transaction = table.newTransaction();
+ } else {
+ schemaSync.sync(transaction.table().schema(), latestSchema,
transaction);
+ }
+ }
}
@Override
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
index 800938cb..4b570560 100644
--- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
+++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaSync.java
@@ -28,7 +28,10 @@ import java.util.function.Supplier;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.types.Types;
@@ -58,6 +61,14 @@ public class IcebergSchemaSync {
}
}
+ public void syncWithProvidedIds(Schema latest, Table table) {
+ BaseTable baseTable = ((BaseTable) table);
+ TableMetadata current = baseTable.operations().current();
+ TableMetadata updated =
+ TableMetadata.buildFrom(current).setCurrentSchema(latest,
latest.highestFieldId()).build();
+ baseTable.operations().commit(current, updated);
+ }
+
/**
* Return a mapping of fieldId in the latest schema to an update action to
perform. This allows
* updates to happen in the same order as the source system.
diff --git
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
index 431184ce..06b625c0 100644
---
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
+++
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergTableManager.java
@@ -29,11 +29,13 @@ import lombok.extern.log4j.Log4j2;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -75,24 +77,34 @@ class IcebergTableManager {
return getTable(catalogConfig, tableIdentifier, basePath);
} else {
try {
- return getCatalog(catalogConfig)
- .map(
- catalog ->
- catalog.createTable(
- tableIdentifier,
- schema,
- partitionSpec,
- basePath,
- getDefaultMappingProperties(schema)))
- .orElseGet(
- () ->
- getHadoopTables()
- .create(
- schema,
- partitionSpec,
- SortOrder.unsorted(),
- getDefaultMappingProperties(schema),
- basePath));
+ // initialize the table with an empty schema, then manually set the
schema to prevent the
+ // Iceberg API from remapping the field IDs.
+ Table tableWithEmptySchema =
+ getCatalog(catalogConfig)
+ .map(
+ catalog ->
+ catalog.createTable(
+ tableIdentifier,
+ new Schema(),
+ PartitionSpec.unpartitioned(),
+ basePath,
+ getDefaultMappingProperties(schema)))
+ .orElseGet(
+ () ->
+ getHadoopTables()
+ .create(
+ new Schema(),
+ PartitionSpec.unpartitioned(),
+ getDefaultMappingProperties(schema),
+ basePath));
+ // set the schema with the provided field IDs
+ TableOperations operations = ((BaseTable)
tableWithEmptySchema).operations();
+ TableMetadata tableMetadata = operations.current();
+ TableMetadata.Builder builder = TableMetadata.buildFrom(tableMetadata);
+ builder.setCurrentSchema(schema, schema.highestFieldId());
+ builder.setDefaultPartitionSpec(partitionSpec);
+ operations.commit(tableMetadata, builder.build());
+ return getTable(catalogConfig, tableIdentifier, basePath);
} catch (AlreadyExistsException ex) {
log.info("Table {} not created since it already exists",
tableIdentifier);
return getTable(catalogConfig, tableIdentifier, basePath);
diff --git
a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
index 89460c40..8295ce51 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
@@ -590,7 +590,10 @@ public abstract class TestAbstractHudiTable
@SneakyThrows
protected HoodieTableMetaClient getMetaClient(
- TypedProperties keyGenProperties, HoodieTableType hoodieTableType,
Configuration conf) {
+ TypedProperties keyGenProperties,
+ HoodieTableType hoodieTableType,
+ Configuration conf,
+ boolean populateMetaFields) {
LocalFileSystem fs = (LocalFileSystem) FSUtils.getFs(basePath, conf);
// Enforce checksum such that fs.open() is consistent to DFS
fs.setVerifyChecksum(true);
@@ -614,6 +617,7 @@ public abstract class TestAbstractHudiTable
.setPayloadClass(OverwriteWithLatestAvroPayload.class)
.setCommitTimezone(HoodieTimelineTimeZone.UTC)
.setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+ .setPopulateMetaFields(populateMetaFields)
.build();
return HoodieTableMetaClient.initTableAndGetMetaClient(conf,
this.basePath, properties);
}
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
index abbe7fe6..2f5b73e4 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
@@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable {
private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
private final Configuration conf;
+ private final boolean addFieldIds;
/**
* Create a test table instance for general testing. The table is created
with the schema defined
@@ -83,7 +84,13 @@ public class TestJavaHudiTable extends TestAbstractHudiTable
{
public static TestJavaHudiTable forStandardSchema(
String tableName, Path tempDir, String partitionConfig, HoodieTableType
tableType) {
return new TestJavaHudiTable(
- tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null);
+ tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null,
false);
+ }
+
+ public static TestJavaHudiTable forStandardSchemaWithFieldIds(
+ String tableName, Path tempDir, String partitionConfig, HoodieTableType
tableType) {
+ return new TestJavaHudiTable(
+ tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType, null,
true);
}
public static TestJavaHudiTable forStandardSchema(
@@ -93,7 +100,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable
{
HoodieTableType tableType,
HoodieArchivalConfig archivalConfig) {
return new TestJavaHudiTable(
- tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType,
archivalConfig);
+ tableName, BASIC_SCHEMA, tempDir, partitionConfig, tableType,
archivalConfig, false);
}
/**
@@ -119,7 +126,20 @@ public class TestJavaHudiTable extends
TestAbstractHudiTable {
tempDir,
partitionConfig,
tableType,
- null);
+ null,
+ false);
+ }
+
+ public static TestJavaHudiTable withAdditionalColumnsAndFieldIds(
+ String tableName, Path tempDir, String partitionConfig, HoodieTableType
tableType) {
+ return new TestJavaHudiTable(
+ tableName,
+ addSchemaEvolutionFieldsToBase(BASIC_SCHEMA),
+ tempDir,
+ partitionConfig,
+ tableType,
+ null,
+ true);
}
public static TestJavaHudiTable withAdditionalTopLevelField(
@@ -129,7 +149,13 @@ public class TestJavaHudiTable extends
TestAbstractHudiTable {
HoodieTableType tableType,
Schema previousSchema) {
return new TestJavaHudiTable(
- tableName, addTopLevelField(previousSchema), tempDir, partitionConfig,
tableType, null);
+ tableName,
+ addTopLevelField(previousSchema),
+ tempDir,
+ partitionConfig,
+ tableType,
+ null,
+ false);
}
public static TestJavaHudiTable withSchema(
@@ -138,7 +164,8 @@ public class TestJavaHudiTable extends
TestAbstractHudiTable {
String partitionConfig,
HoodieTableType tableType,
Schema schema) {
- return new TestJavaHudiTable(tableName, schema, tempDir, partitionConfig,
tableType, null);
+ return new TestJavaHudiTable(
+ tableName, schema, tempDir, partitionConfig, tableType, null, false);
}
private TestJavaHudiTable(
@@ -147,10 +174,12 @@ public class TestJavaHudiTable extends
TestAbstractHudiTable {
Path tempDir,
String partitionConfig,
HoodieTableType hoodieTableType,
- HoodieArchivalConfig archivalConfig) {
+ HoodieArchivalConfig archivalConfig,
+ boolean addFieldIds) {
super(name, schema, tempDir, partitionConfig);
this.conf = new Configuration();
this.conf.set("parquet.avro.write-old-list-structure", "false");
+ this.addFieldIds = addFieldIds;
try {
this.metaClient = initMetaClient(hoodieTableType, typedProperties);
} catch (IOException ex) {
@@ -297,13 +326,14 @@ public class TestJavaHudiTable extends
TestAbstractHudiTable {
private HoodieTableMetaClient initMetaClient(
HoodieTableType hoodieTableType, TypedProperties keyGenProperties)
throws IOException {
- return getMetaClient(keyGenProperties, hoodieTableType, conf);
+ return getMetaClient(keyGenProperties, hoodieTableType, conf,
!addFieldIds);
}
private HoodieJavaWriteClient<HoodieAvroPayload> initJavaWriteClient(
Schema schema, TypedProperties keyGenProperties, HoodieArchivalConfig
archivalConfig) {
HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder()
+ .withPopulateMetaFields(!addFieldIds)
.withProperties(generateWriteConfig(schema,
keyGenProperties).getProps())
.withClusteringConfig(
HoodieClusteringConfig.newBuilder()
@@ -321,6 +351,18 @@ public class TestJavaHudiTable extends
TestAbstractHudiTable {
.withArchivalConfig(archivalConfig)
.build();
}
+ if (addFieldIds) {
+ writeConfig
+ .getProps()
+ .put(
+ "hoodie.avro.write.support.class",
+
"org.apache.xtable.hudi.extensions.HoodieAvroWriteSupportWithFieldIds");
+ writeConfig
+ .getProps()
+ .put(
+ "hoodie.client.init.callback.classes",
+
"org.apache.xtable.hudi.extensions.AddFieldIdsClientInitCallback");
+ }
HoodieEngineContext context = new HoodieJavaEngineContext(conf);
return new HoodieJavaWriteClient<>(context, writeConfig);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
index 79316f5d..1aaf61f9 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java
@@ -271,6 +271,6 @@ public class TestSparkHudiTable extends
TestAbstractHudiTable {
private HoodieTableMetaClient initMetaClient(
JavaSparkContext jsc, HoodieTableType hoodieTableType, TypedProperties
keyGenProperties) {
- return getMetaClient(keyGenProperties, hoodieTableType,
jsc.hadoopConfiguration());
+ return getMetaClient(keyGenProperties, hoodieTableType,
jsc.hadoopConfiguration(), true);
}
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
index ab13ae2d..ffe6a217 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergConversionSource.java
@@ -364,7 +364,7 @@ class TestIcebergConversionSource {
.schema(csSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
- .withSpec(csPartitionSpec)
+ .withSpec(table.spec())
.withPartition(partitionInfo)
.build();
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
index 98254591..b07fac4c 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaSync.java
@@ -18,6 +18,7 @@
package org.apache.xtable.iceberg;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -29,9 +30,12 @@ import java.util.stream.Collectors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
+import org.mockito.MockedStatic;
import org.mockito.Mockito;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.types.Type;
@@ -335,6 +339,25 @@ public class TestIcebergSchemaSync {
verify(mockUpdateSchema).commit();
}
+ @Test
+ void testSyncWithProvidedIds() {
+ BaseTable mockBaseTable = Mockito.mock(BaseTable.class,
RETURNS_DEEP_STUBS);
+ TableMetadata mockCurrent = Mockito.mock(TableMetadata.class);
+ when(mockBaseTable.operations().current()).thenReturn(mockCurrent);
+ try (MockedStatic<TableMetadata> tableMetadataMockedStatic =
+ Mockito.mockStatic(TableMetadata.class)) {
+ TableMetadata.Builder mockBuilder =
Mockito.mock(TableMetadata.Builder.class);
+ tableMetadataMockedStatic
+ .when(() -> TableMetadata.buildFrom(mockCurrent))
+ .thenReturn(mockBuilder);
+ when(mockBuilder.setCurrentSchema(SCHEMA,
SCHEMA.highestFieldId())).thenReturn(mockBuilder);
+ TableMetadata mockUpdated = Mockito.mock(TableMetadata.class);
+ when(mockBuilder.build()).thenReturn(mockUpdated);
+ schemaSync.syncWithProvidedIds(SCHEMA, mockBaseTable);
+ verify(mockBaseTable.operations()).commit(mockCurrent, mockUpdated);
+ }
+ }
+
private Schema addColumnToDefault(Schema schema, Types.NestedField field,
Integer parentId) {
List<Types.NestedField> fields = new ArrayList<>();
for (Types.NestedField existingField : schema.columns()) {
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
index c02d7f26..d5a25b02 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java
@@ -170,9 +170,9 @@ public class TestIcebergSync {
.build();
private final Schema icebergSchema =
new Schema(
- Types.NestedField.required(1, "timestamp_field",
Types.TimestampType.withoutZone()),
+ Types.NestedField.required(3, "timestamp_field",
Types.TimestampType.withoutZone()),
Types.NestedField.required(2, "date_field", Types.DateType.get()),
- Types.NestedField.required(3, "group_id", Types.IntegerType.get()),
+ Types.NestedField.required(1, "group_id", Types.IntegerType.get()),
Types.NestedField.required(
4,
"record",
@@ -244,11 +244,13 @@ public class TestIcebergSync {
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
- validateIcebergTable(tableName, table1, Sets.newHashSet(dataFile1,
dataFile2), null);
+ validateIcebergTable(
+ tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null,
icebergSchema);
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
- validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile2,
dataFile3), null);
+ validateIcebergTable(
+ tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null,
icebergSchema);
ArgumentCaptor<Transaction> transactionArgumentCaptor =
ArgumentCaptor.forClass(Transaction.class);
@@ -256,7 +258,7 @@ public class TestIcebergSync {
ArgumentCaptor<PartitionSpec> partitionSpecArgumentCaptor =
ArgumentCaptor.forClass(PartitionSpec.class);
- verify(mockSchemaSync, times(2))
+ verify(mockSchemaSync, times(1))
.sync(
schemaArgumentCaptor.capture(),
schemaArgumentCaptor.capture(),
@@ -274,13 +276,9 @@ public class TestIcebergSync {
assertTrue(
partitionSpecSchemaArgumentCaptor.getAllValues().stream()
.allMatch(capturedSchema ->
capturedSchema.sameSchema(icebergSchema)));
- // schema sync args for first iteration
- assertTrue(
- schemaArgumentCaptor.getAllValues().subList(0, 2).stream()
- .allMatch(capturedSchema ->
capturedSchema.sameSchema(icebergSchema)));
// second snapshot sync will evolve the schema
-
assertTrue(schemaArgumentCaptor.getAllValues().get(2).sameSchema(icebergSchema));
-
assertTrue(schemaArgumentCaptor.getAllValues().get(3).sameSchema(icebergSchema2));
+
assertTrue(schemaArgumentCaptor.getAllValues().get(0).sameSchema(icebergSchema));
+
assertTrue(schemaArgumentCaptor.getAllValues().get(1).sameSchema(icebergSchema2));
// check that the correct partition spec is used in calls to the mocks
assertTrue(
partitionSpecArgumentCaptor.getAllValues().stream()
@@ -292,9 +290,6 @@ public class TestIcebergSync {
assertSame(
transactionArgumentCaptor.getAllValues().get(0),
transactionArgumentCaptor.getAllValues().get(2));
- assertSame(
- transactionArgumentCaptor.getAllValues().get(1),
- transactionArgumentCaptor.getAllValues().get(3));
// validate that transactions are different between runs
assertNotSame(
transactionArgumentCaptor.getAllValues().get(1),
@@ -358,7 +353,8 @@ public class TestIcebergSync {
// get a new iceberg sync to make sure table is re-read from disk and no
metadata is cached
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot3);
- validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile3,
dataFile4), null);
+ validateIcebergTable(
+ tableName, table2, Sets.newHashSet(dataFile3, dataFile4), null,
icebergSchema);
// Validate Iceberg table state
Table table = getTable(basePath);
assertEquals(4, table.history().size());
@@ -425,7 +421,8 @@ public class TestIcebergSync {
Expressions.and(
Expressions.greaterThanOrEqual(
partitionField.getSourceField().getName(), "2022-10-01T00:00"),
- Expressions.lessThan(partitionField.getSourceField().getName(),
"2022-10-02T00:00")));
+ Expressions.lessThan(partitionField.getSourceField().getName(),
"2022-10-02T00:00")),
+ icebergSchema);
}
@Test
@@ -485,7 +482,8 @@ public class TestIcebergSync {
Sets.newHashSet(dataFile1, dataFile2),
Expressions.and(
Expressions.greaterThanOrEqual(partitionField.getSourceField().getName(),
"2022-10-01"),
- Expressions.lessThan(partitionField.getSourceField().getName(),
"2022-10-02")));
+ Expressions.lessThan(partitionField.getSourceField().getName(),
"2022-10-02")),
+ icebergSchema);
}
@Test
@@ -539,7 +537,8 @@ public class TestIcebergSync {
Sets.newHashSet(dataFile1, dataFile2),
Expressions.and(
Expressions.greaterThanOrEqual(partitionField.getSourceField().getName(), 1),
- Expressions.lessThan(partitionField.getSourceField().getName(),
2)));
+ Expressions.lessThan(partitionField.getSourceField().getName(),
2)),
+ icebergSchema);
}
@Test
@@ -619,7 +618,8 @@ public class TestIcebergSync {
Expressions.greaterThanOrEqual(
partitionField2.getSourceField().getName(),
"2022-10-01T00:00"),
Expressions.lessThan(
- partitionField2.getSourceField().getName(),
"2022-10-02T00:00"))));
+ partitionField2.getSourceField().getName(),
"2022-10-02T00:00"))),
+ icebergSchema);
}
@Test
@@ -678,7 +678,8 @@ public class TestIcebergSync {
tableName,
table,
Sets.newHashSet(dataFile1, dataFile2),
- Expressions.equal(partitionField.getSourceField().getPath(),
"value1"));
+ Expressions.equal(partitionField.getSourceField().getPath(), "value1"),
+ icebergSchema);
}
@Test
@@ -822,13 +823,16 @@ public class TestIcebergSync {
String tableName,
InternalTable table,
Set<InternalDataFile> expectedFiles,
- Expression filterExpression)
+ Expression filterExpression,
+ Schema expectedSchema)
throws IOException {
Path warehouseLocation = Paths.get(table.getBasePath()).getParent();
try (HadoopCatalog catalog = new HadoopCatalog(CONFIGURATION,
warehouseLocation.toString())) {
TableIdentifier tableId = TableIdentifier.of(Namespace.empty(),
tableName);
assertTrue(catalog.tableExists(tableId));
- TableScan scan = catalog.loadTable(tableId).newScan();
+ Table icebergTable = catalog.loadTable(tableId);
+ assertTrue(expectedSchema.sameSchema(icebergTable.schema()));
+ TableScan scan = icebergTable.newScan();
if (filterExpression != null) {
scan = scan.filter(filterExpression);
}
diff --git
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
index f81f1336..f424e3a9 100644
---
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
+++
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergTableManager.java
@@ -20,7 +20,9 @@ package org.apache.xtable.iceberg;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -30,10 +32,14 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.apache.iceberg.BaseTable;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -101,27 +107,46 @@ public class TestIcebergTableManager {
.catalogName(catalogName)
.catalogOptions(OPTIONS)
.build();
- Table mockTable = mock(Table.class);
+ BaseTable mockInitialTable = mock(BaseTable.class);
+ Table loadedTable = mock(Table.class);
when(mockCatalog.tableExists(IDENTIFIER)).thenReturn(false);
Schema schema = new Schema();
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
when(mockCatalog.createTable(
- IDENTIFIER,
- schema,
- partitionSpec,
- BASE_PATH,
- Collections.singletonMap(
- TableProperties.DEFAULT_NAME_MAPPING,
- NameMappingParser.toJson(MappingUtil.create(schema)))))
- .thenReturn(mockTable);
+ eq(IDENTIFIER),
+ any(),
+ eq(PartitionSpec.unpartitioned()),
+ eq(BASE_PATH),
+ eq(
+ Collections.singletonMap(
+ TableProperties.DEFAULT_NAME_MAPPING,
+ NameMappingParser.toJson(MappingUtil.create(schema))))))
+ .thenReturn(mockInitialTable);
+ when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(loadedTable);
- IcebergTableManager tableManager = IcebergTableManager.of(CONFIGURATION);
+ TableOperations tableOperations = mock(TableOperations.class);
+ when(mockInitialTable.operations()).thenReturn(tableOperations);
+ TableMetadata initialMetadata = mock(TableMetadata.class);
+ when(tableOperations.current()).thenReturn(initialMetadata);
+ try (MockedStatic<TableMetadata> tableMetadataMockedStatic =
mockStatic(TableMetadata.class)) {
+ TableMetadata.Builder mockBuilder = mock(TableMetadata.Builder.class);
+ tableMetadataMockedStatic
+ .when(() -> TableMetadata.buildFrom(initialMetadata))
+ .thenReturn(mockBuilder);
+ TableMetadata updatedMetadata = mock(TableMetadata.class);
+ when(mockBuilder.build()).thenReturn(updatedMetadata);
- Table actual =
- tableManager.getOrCreateTable(catalogConfig, IDENTIFIER, BASE_PATH,
schema, partitionSpec);
- assertEquals(mockTable, actual);
- verify(mockCatalog).initialize(catalogName, OPTIONS);
- verify(mockCatalog, never()).loadTable(any());
+ IcebergTableManager tableManager = IcebergTableManager.of(CONFIGURATION);
+
+ Table actual =
+ tableManager.getOrCreateTable(
+ catalogConfig, IDENTIFIER, BASE_PATH, schema, partitionSpec);
+ assertEquals(loadedTable, actual);
+ verify(mockCatalog).initialize(catalogName, OPTIONS);
+ verify(tableOperations).commit(initialMetadata, updatedMetadata);
+ verify(mockBuilder).setCurrentSchema(schema, schema.highestFieldId());
+ verify(mockBuilder).setDefaultPartitionSpec(partitionSpec);
+ }
}
@Test
@@ -139,13 +164,14 @@ public class TestIcebergTableManager {
Schema schema = new Schema();
PartitionSpec partitionSpec = PartitionSpec.unpartitioned();
when(mockCatalog.createTable(
- IDENTIFIER,
- schema,
- partitionSpec,
- BASE_PATH,
- Collections.singletonMap(
- TableProperties.DEFAULT_NAME_MAPPING,
- NameMappingParser.toJson(MappingUtil.create(schema)))))
+ eq(IDENTIFIER),
+ any(),
+ any(),
+ eq(BASE_PATH),
+ eq(
+ Collections.singletonMap(
+ TableProperties.DEFAULT_NAME_MAPPING,
+ NameMappingParser.toJson(MappingUtil.create(schema))))))
.thenThrow(new AlreadyExistsException("Table already exists"));
when(mockCatalog.loadTable(IDENTIFIER)).thenReturn(mockTable);
diff --git a/xtable-utilities/pom.xml b/xtable-utilities/pom.xml
index 5e37be6c..b0f09a3e 100644
--- a/xtable-utilities/pom.xml
+++ b/xtable-utilities/pom.xml
@@ -43,6 +43,12 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.xtable</groupId>
+
<artifactId>xtable-hudi-support-extensions_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.xtable</groupId>
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
index 52cff85a..24aa5adf 100644
---
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
+++
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunCatalogSync.java
@@ -133,7 +133,7 @@ public class ITRunCatalogSync {
Path icebergMetadataPath = Paths.get(URI.create(basePath + "/metadata"));
long icebergMetadataFiles =
Files.list(icebergMetadataPath).filter(p ->
p.toString().endsWith("metadata.json")).count();
- Assertions.assertEquals(2, icebergMetadataFiles);
+ Assertions.assertEquals(3, icebergMetadataFiles);
Path deltaMetadataPath = Paths.get(URI.create(basePath + "/_delta_log"));
long deltaMetadataFiles =
Files.list(deltaMetadataPath).filter(p ->
p.toString().endsWith(".json")).count();
diff --git
a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
index 2294e16a..f18ce867 100644
--- a/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
+++ b/xtable-utilities/src/test/java/org/apache/xtable/utilities/ITRunSync.java
@@ -55,7 +55,7 @@ class ITRunSync {
String[] args = new String[] {"--datasetConfig", configFile.getPath()};
RunSync.main(args);
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() +
"/metadata"));
- waitForNumIcebergCommits(icebergMetadataPath, 2);
+ waitForNumIcebergCommits(icebergMetadataPath, 3);
}
}
@@ -64,7 +64,7 @@ class ITRunSync {
ExecutorService runner = Executors.newSingleThreadExecutor();
String tableName = "test-table";
try (GenericTable table =
- TestJavaHudiTable.forStandardSchema(
+ TestJavaHudiTable.forStandardSchemaWithFieldIds(
tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
table.insertRows(20);
File configFile = writeConfigFile(tempDir, table, tableName);
@@ -78,11 +78,16 @@ class ITRunSync {
}
});
Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() +
"/metadata"));
- waitForNumIcebergCommits(icebergMetadataPath, 2);
+ waitForNumIcebergCommits(icebergMetadataPath, 3);
+ }
+ try (GenericTable table =
+ TestJavaHudiTable.withAdditionalColumnsAndFieldIds(
+ tableName, tempDir, null, HoodieTableType.COPY_ON_WRITE)) {
// write more data now that table is initialized and data is synced
table.insertRows(20);
- waitForNumIcebergCommits(icebergMetadataPath, 3);
- assertEquals(3, numIcebergMetadataJsonFiles(icebergMetadataPath));
+ Path icebergMetadataPath = Paths.get(URI.create(table.getBasePath() +
"/metadata"));
+ waitForNumIcebergCommits(icebergMetadataPath, 6);
+ assertEquals(6, numIcebergMetadataJsonFiles(icebergMetadataPath));
} finally {
runner.shutdownNow();
}