This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 504640f78b Flink: Dynamic Sink: Add case-insensitive field matching
(#14729)
504640f78b is described below
commit 504640f78bfde7338666d3451ac43fadbbc5627a
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Jan 19 17:45:29 2026 +0100
Flink: Dynamic Sink: Add case-insensitive field matching (#14729)
---
.../flink/sink/dynamic/CompareSchemasVisitor.java | 47 ++++---
.../flink/sink/dynamic/DynamicIcebergSink.java | 20 ++-
.../flink/sink/dynamic/DynamicRecordProcessor.java | 20 ++-
.../sink/dynamic/DynamicTableUpdateOperator.java | 17 ++-
.../flink/sink/dynamic/EvolveSchemaVisitor.java | 24 +++-
.../flink/sink/dynamic/TableMetadataCache.java | 35 +++--
.../iceberg/flink/sink/dynamic/TableUpdater.java | 18 +--
.../sink/dynamic/TestCompareSchemasVisitor.java | 143 +++++++++++++++----
.../flink/sink/dynamic/TestDynamicIcebergSink.java | 85 ++++++++++++
.../dynamic/TestDynamicTableUpdateOperator.java | 82 +++++++++--
.../sink/dynamic/TestEvolveSchemaVisitor.java | 154 ++++++++++++++++++---
.../flink/sink/dynamic/TestTableMetadataCache.java | 104 +++++++++++---
.../flink/sink/dynamic/TestTableUpdater.java | 109 +++++++++++----
13 files changed, 698 insertions(+), 160 deletions(-)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
index 60561b0f56..cb4fc62c01 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/CompareSchemasVisitor.java
@@ -20,6 +20,8 @@ package org.apache.iceberg.flink.sink.dynamic;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
@@ -43,26 +45,31 @@ public class CompareSchemasVisitor
extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {
private final Schema tableSchema;
+ private final boolean caseSensitive;
private final boolean dropUnusedColumns;
- private CompareSchemasVisitor(Schema tableSchema, boolean dropUnusedColumns)
{
+ private CompareSchemasVisitor(
+ Schema tableSchema, boolean caseSensitive, boolean dropUnusedColumns) {
this.tableSchema = tableSchema;
+ this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}
- public static Result visit(Schema dataSchema, Schema tableSchema) {
- return visit(dataSchema, tableSchema, true, false);
- }
-
public static Result visit(
Schema dataSchema, Schema tableSchema, boolean caseSensitive, boolean
dropUnusedColumns) {
return visit(
dataSchema,
-1,
- new CompareSchemasVisitor(tableSchema, dropUnusedColumns),
+ new CompareSchemasVisitor(tableSchema, caseSensitive,
dropUnusedColumns),
new PartnerIdByNameAccessors(tableSchema, caseSensitive));
}
+ @VisibleForTesting
+ @Deprecated
+ public static Result visit(Schema dataSchema, Schema tableSchema) {
+ return visit(dataSchema, tableSchema, true, false);
+ }
+
@Override
public Result schema(Schema dataSchema, Integer tableSchemaId, Result
downstream) {
if (tableSchemaId == null) {
@@ -92,7 +99,7 @@ public class CompareSchemasVisitor
}
for (Types.NestedField tableField :
tableSchemaType.asStructType().fields()) {
- if (struct.field(tableField.name()) == null
+ if (getFieldFromStruct(tableField.name(), struct, caseSensitive) == null
&& (tableField.isRequired() || dropUnusedColumns)) {
// If a field from the table schema does not exist in the input
schema, then we won't visit
// it. The only choice is to make the table field optional or drop it.
@@ -105,11 +112,10 @@ public class CompareSchemasVisitor
}
for (int i = 0; i < struct.fields().size(); ++i) {
- if (!struct
- .fields()
- .get(i)
- .name()
- .equals(tableSchemaType.asStructType().fields().get(i).name())) {
+ String fieldName = struct.fields().get(i).name();
+ String tableFieldName =
tableSchemaType.asStructType().fields().get(i).name();
+ if ((caseSensitive && !fieldName.equals(tableFieldName))
+ || (!caseSensitive && !fieldName.equalsIgnoreCase(tableFieldName))) {
return Result.DATA_CONVERSION_NEEDED;
}
}
@@ -117,6 +123,12 @@ public class CompareSchemasVisitor
return result;
}
+ @Nullable
+ static Types.NestedField getFieldFromStruct(
+ String fieldName, Types.StructType struct, boolean caseSensitive) {
+ return caseSensitive ? struct.field(fieldName) :
struct.caseInsensitiveField(fieldName);
+ }
+
@Override
public Result field(Types.NestedField field, Integer tableSchemaId, Result
typeResult) {
if (tableSchemaId == null) {
@@ -191,14 +203,10 @@ public class CompareSchemasVisitor
static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
private final Schema tableSchema;
- private boolean caseSensitive = true;
+ private boolean caseSensitive;
- PartnerIdByNameAccessors(Schema tableSchema) {
+ PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) {
this.tableSchema = tableSchema;
- }
-
- private PartnerIdByNameAccessors(Schema tableSchema, boolean
caseSensitive) {
- this(tableSchema);
this.caseSensitive = caseSensitive;
}
@@ -211,8 +219,7 @@ public class CompareSchemasVisitor
struct =
tableSchema.findField(tableSchemaFieldId).type().asStructType();
}
- Types.NestedField field =
- caseSensitive ? struct.field(name) :
struct.caseInsensitiveField(name);
+ Types.NestedField field = getFieldFromStruct(name, struct,
caseSensitive);
if (field != null) {
return field.fieldId();
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
index e1bc8deb9d..61b1f84a43 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicIcebergSink.java
@@ -191,6 +191,7 @@ public class DynamicIcebergSink
private int cacheMaximumSize = 100;
private long cacheRefreshMs = 1_000;
private int inputSchemasPerTableCacheMaximumSize = 10;
+ private boolean caseSensitive = true;
Builder() {}
@@ -353,6 +354,15 @@ public class DynamicIcebergSink
return this;
}
+ /**
+ * Set whether schema field name matching should be case-sensitive. The
default is to match the
+ * field names case-sensitive.
+ */
+ public Builder<T> caseSensitive(boolean newCaseSensitive) {
+ this.caseSensitive = newCaseSensitive;
+ return this;
+ }
+
private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}
@@ -399,11 +409,12 @@ public class DynamicIcebergSink
generator,
catalogLoader,
immediateUpdate,
- dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
- tableCreator))
+ tableCreator,
+ caseSensitive,
+ dropUnusedColumns))
.uid(prefixIfNotNull(uidPrefix, "-generator"))
.name(operatorName("generator"))
.returns(type);
@@ -418,11 +429,12 @@ public class DynamicIcebergSink
.map(
new DynamicTableUpdateOperator(
catalogLoader,
- dropUnusedColumns,
cacheMaximumSize,
cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize,
- tableCreator))
+ tableCreator,
+ caseSensitive,
+ dropUnusedColumns))
.uid(prefixIfNotNull(uidPrefix, "-updater"))
.name(operatorName("Updater"))
.returns(type)
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
index 427aa6ceaf..07dfad2780 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordProcessor.java
@@ -45,6 +45,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T,
DynamicRecordInternal
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final TableCreator tableCreator;
+ private final boolean caseSensitive;
private transient TableMetadataCache tableCache;
private transient HashKeyGenerator hashKeyGenerator;
@@ -57,19 +58,21 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T,
DynamicRecordInternal
DynamicRecordGenerator<T> generator,
CatalogLoader catalogLoader,
boolean immediateUpdate,
- boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
- TableCreator tableCreator) {
+ TableCreator tableCreator,
+ boolean caseSensitive,
+ boolean dropUnusedColumns) {
this.generator = generator;
this.catalogLoader = catalogLoader;
this.immediateUpdate = immediateUpdate;
- this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize =
inputSchemasPerTableCacheMaximumSize;
this.tableCreator = tableCreator;
+ this.caseSensitive = caseSensitive;
+ this.dropUnusedColumns = dropUnusedColumns;
}
@Override
@@ -78,12 +81,17 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T,
DynamicRecordInternal
Catalog catalog = catalogLoader.loadCatalog();
this.tableCache =
new TableMetadataCache(
- catalog, cacheMaximumSize, cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize);
+ catalog,
+ cacheMaximumSize,
+ cacheRefreshMs,
+ inputSchemasPerTableCacheMaximumSize,
+ caseSensitive,
+ dropUnusedColumns);
this.hashKeyGenerator =
new HashKeyGenerator(
cacheMaximumSize,
getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks());
if (immediateUpdate) {
- updater = new TableUpdater(tableCache, catalog, dropUnusedColumns);
+ updater = new TableUpdater(tableCache, catalog, caseSensitive,
dropUnusedColumns);
} else {
updateStream =
new OutputTag<>(
@@ -109,7 +117,7 @@ class DynamicRecordProcessor<T> extends ProcessFunction<T,
DynamicRecordInternal
TableMetadataCache.ResolvedSchemaInfo foundSchema =
exists
- ? tableCache.schema(data.tableIdentifier(), data.schema(),
dropUnusedColumns)
+ ? tableCache.schema(data.tableIdentifier(), data.schema())
: TableMetadataCache.NOT_FOUND;
PartitionSpec foundSpec = exists ? tableCache.spec(data.tableIdentifier(),
data.spec()) : null;
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
index 8f38d4f8be..456f20adf5 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicTableUpdateOperator.java
@@ -43,22 +43,25 @@ class DynamicTableUpdateOperator
private final long cacheRefreshMs;
private final int inputSchemasPerTableCacheMaximumSize;
private final TableCreator tableCreator;
+ private final boolean caseSensitive;
private transient TableUpdater updater;
DynamicTableUpdateOperator(
CatalogLoader catalogLoader,
- boolean dropUnusedColumns,
int cacheMaximumSize,
long cacheRefreshMs,
int inputSchemasPerTableCacheMaximumSize,
- TableCreator tableCreator) {
+ TableCreator tableCreator,
+ boolean caseSensitive,
+ boolean dropUnusedColumns) {
this.catalogLoader = catalogLoader;
- this.dropUnusedColumns = dropUnusedColumns;
this.cacheMaximumSize = cacheMaximumSize;
this.cacheRefreshMs = cacheRefreshMs;
this.inputSchemasPerTableCacheMaximumSize =
inputSchemasPerTableCacheMaximumSize;
this.tableCreator = tableCreator;
+ this.caseSensitive = caseSensitive;
+ this.dropUnusedColumns = dropUnusedColumns;
}
@Override
@@ -68,8 +71,14 @@ class DynamicTableUpdateOperator
this.updater =
new TableUpdater(
new TableMetadataCache(
- catalog, cacheMaximumSize, cacheRefreshMs,
inputSchemasPerTableCacheMaximumSize),
+ catalog,
+ cacheMaximumSize,
+ cacheRefreshMs,
+ inputSchemasPerTableCacheMaximumSize,
+ caseSensitive,
+ dropUnusedColumns),
catalog,
+ caseSensitive,
dropUnusedColumns);
}
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
index e106cf5754..d9747d201e 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/EvolveSchemaVisitor.java
@@ -59,6 +59,7 @@ public class EvolveSchemaVisitor extends
SchemaWithPartnerVisitor<Integer, Boole
private final UpdateSchema api;
private final Schema existingSchema;
private final Schema targetSchema;
+ private final boolean caseSensitive;
private final boolean dropUnusedColumns;
private EvolveSchemaVisitor(
@@ -66,11 +67,13 @@ public class EvolveSchemaVisitor extends
SchemaWithPartnerVisitor<Integer, Boole
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
+ boolean caseSensitive,
boolean dropUnusedColumns) {
this.identifier = identifier;
- this.api = api;
+ this.api = api.caseSensitive(caseSensitive);
this.existingSchema = existingSchema;
this.targetSchema = targetSchema;
+ this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}
@@ -82,6 +85,7 @@ public class EvolveSchemaVisitor extends
SchemaWithPartnerVisitor<Integer, Boole
* @param api an UpdateSchema for adding changes
* @param existingSchema an existing schema
* @param targetSchema a new schema to compare with the existing
+ * @param caseSensitive whether field name matching should be case-sensitive
* @param dropUnusedColumns whether to drop columns not present in target
schema
*/
public static void visit(
@@ -89,12 +93,14 @@ public class EvolveSchemaVisitor extends
SchemaWithPartnerVisitor<Integer, Boole
UpdateSchema api,
Schema existingSchema,
Schema targetSchema,
+ boolean caseSensitive,
boolean dropUnusedColumns) {
visit(
targetSchema,
-1,
- new EvolveSchemaVisitor(identifier, api, existingSchema, targetSchema,
dropUnusedColumns),
- new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema));
+ new EvolveSchemaVisitor(
+ identifier, api, existingSchema, targetSchema, caseSensitive,
dropUnusedColumns),
+ new CompareSchemasVisitor.PartnerIdByNameAccessors(existingSchema,
caseSensitive));
}
@Override
@@ -107,14 +113,16 @@ public class EvolveSchemaVisitor extends
SchemaWithPartnerVisitor<Integer, Boole
Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
String after = null;
for (Types.NestedField targetField : struct.fields()) {
- Types.NestedField nestedField = partnerStruct.field(targetField.name());
+ Types.NestedField nestedField =
+ CompareSchemasVisitor.getFieldFromStruct(
+ targetField.name(), partnerStruct, caseSensitive);
final String columnName;
if (nestedField != null) {
updateColumn(nestedField, targetField);
columnName = this.existingSchema.findColumnName(nestedField.fieldId());
} else {
addColumn(partnerId, targetField);
- columnName = this.targetSchema.findColumnName(targetField.fieldId());
+ columnName = targetSchema.findColumnName(targetField.fieldId());
}
setPosition(columnName, after);
@@ -122,7 +130,11 @@ public class EvolveSchemaVisitor extends
SchemaWithPartnerVisitor<Integer, Boole
}
for (Types.NestedField existingField : partnerStruct.fields()) {
- if (struct.field(existingField.name()) == null) {
+ Types.NestedField targetField =
+ caseSensitive
+ ? struct.field(existingField.name())
+ : struct.caseInsensitiveField(existingField.name());
+ if (targetField == null) {
String columnName =
this.existingSchema.findColumnName(existingField.fieldId());
if (dropUnusedColumns) {
LOG.debug("{}: Dropping column: {}", identifier.name(), columnName);
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
index 3be8bbcd91..fdefc01402 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableMetadataCache.java
@@ -55,10 +55,24 @@ class TableMetadataCache {
private final Clock cacheRefreshClock;
private final int inputSchemasPerTableCacheMaximumSize;
private final Map<TableIdentifier, CacheItem> tableCache;
+ private final boolean caseSensitive;
+ private final boolean dropUnusedColumns;
TableMetadataCache(
- Catalog catalog, int maximumSize, long refreshMs, int
inputSchemasPerTableCacheMaximumSize) {
- this(catalog, maximumSize, refreshMs,
inputSchemasPerTableCacheMaximumSize, Clock.systemUTC());
+ Catalog catalog,
+ int maximumSize,
+ long refreshMs,
+ int inputSchemasPerTableCacheMaximumSize,
+ boolean caseSensitive,
+ boolean dropUnusedColumns) {
+ this(
+ catalog,
+ maximumSize,
+ refreshMs,
+ inputSchemasPerTableCacheMaximumSize,
+ caseSensitive,
+ dropUnusedColumns,
+ Clock.systemUTC());
}
@VisibleForTesting
@@ -67,12 +81,16 @@ class TableMetadataCache {
int maximumSize,
long refreshMs,
int inputSchemasPerTableCacheMaximumSize,
+ boolean caseSensitive,
+ boolean dropUnusedColumns,
Clock cacheRefreshClock) {
this.catalog = catalog;
this.refreshMs = refreshMs;
- this.cacheRefreshClock = cacheRefreshClock;
this.inputSchemasPerTableCacheMaximumSize =
inputSchemasPerTableCacheMaximumSize;
this.tableCache = new LRUCache<>(maximumSize);
+ this.caseSensitive = caseSensitive;
+ this.dropUnusedColumns = dropUnusedColumns;
+ this.cacheRefreshClock = cacheRefreshClock;
}
Tuple2<Boolean, Exception> exists(TableIdentifier identifier) {
@@ -90,8 +108,8 @@ class TableMetadataCache {
return branch(identifier, branch, true);
}
- ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input, boolean
dropUnusedColumns) {
- return schema(identifier, input, true, dropUnusedColumns);
+ ResolvedSchemaInfo schema(TableIdentifier identifier, Schema input) {
+ return schema(identifier, input, true);
}
PartitionSpec spec(TableIdentifier identifier, PartitionSpec spec) {
@@ -125,7 +143,7 @@ class TableMetadataCache {
}
private ResolvedSchemaInfo schema(
- TableIdentifier identifier, Schema input, boolean allowRefresh, boolean
dropUnusedColumns) {
+ TableIdentifier identifier, Schema input, boolean allowRefresh) {
CacheItem cached = tableCache.get(identifier);
Schema compatible = null;
if (cached != null && cached.tableExists) {
@@ -140,7 +158,8 @@ class TableMetadataCache {
for (Map.Entry<Integer, Schema> tableSchema :
cached.tableSchemas.entrySet()) {
CompareSchemasVisitor.Result result =
- CompareSchemasVisitor.visit(input, tableSchema.getValue(), true,
dropUnusedColumns);
+ CompareSchemasVisitor.visit(
+ input, tableSchema.getValue(), caseSensitive,
dropUnusedColumns);
if (result == CompareSchemasVisitor.Result.SAME) {
ResolvedSchemaInfo newResult =
new ResolvedSchemaInfo(
@@ -158,7 +177,7 @@ class TableMetadataCache {
if (needsRefresh(identifier, cached, allowRefresh)) {
refreshTable(identifier);
- return schema(identifier, input, false, dropUnusedColumns);
+ return schema(identifier, input, false);
} else if (compatible != null) {
ResolvedSchemaInfo newResult =
new ResolvedSchemaInfo(
diff --git
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
index d8809efbe5..b0bdad8ed1 100644
---
a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
+++
b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableUpdater.java
@@ -43,11 +43,14 @@ class TableUpdater {
private static final Logger LOG =
LoggerFactory.getLogger(TableUpdater.class);
private final TableMetadataCache cache;
private final Catalog catalog;
+ private final boolean caseSensitive;
private final boolean dropUnusedColumns;
- TableUpdater(TableMetadataCache cache, Catalog catalog, boolean
dropUnusedColumns) {
+ TableUpdater(
+ TableMetadataCache cache, Catalog catalog, boolean caseSensitive,
boolean dropUnusedColumns) {
this.cache = cache;
this.catalog = catalog;
+ this.caseSensitive = caseSensitive;
this.dropUnusedColumns = dropUnusedColumns;
}
@@ -120,15 +123,14 @@ class TableUpdater {
private TableMetadataCache.ResolvedSchemaInfo findOrCreateSchema(
TableIdentifier identifier, Schema schema) {
- TableMetadataCache.ResolvedSchemaInfo fromCache =
- cache.schema(identifier, schema, dropUnusedColumns);
+ TableMetadataCache.ResolvedSchemaInfo fromCache = cache.schema(identifier,
schema);
if (fromCache.compareResult() !=
CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
return fromCache;
} else {
Table table = catalog.loadTable(identifier);
Schema tableSchema = table.schema();
CompareSchemasVisitor.Result result =
- CompareSchemasVisitor.visit(schema, tableSchema, true,
dropUnusedColumns);
+ CompareSchemasVisitor.visit(schema, tableSchema, caseSensitive,
dropUnusedColumns);
switch (result) {
case SAME:
cache.update(identifier, table);
@@ -145,20 +147,20 @@ class TableUpdater {
LOG.info(
"Triggering schema update for table {} {} to {}", identifier,
tableSchema, schema);
UpdateSchema updateApi = table.updateSchema();
- EvolveSchemaVisitor.visit(identifier, updateApi, tableSchema,
schema, dropUnusedColumns);
+ EvolveSchemaVisitor.visit(
+ identifier, updateApi, tableSchema, schema, caseSensitive,
dropUnusedColumns);
try {
updateApi.commit();
cache.update(identifier, table);
TableMetadataCache.ResolvedSchemaInfo comparisonAfterMigration =
- cache.schema(identifier, schema, dropUnusedColumns);
+ cache.schema(identifier, schema);
Schema newSchema = comparisonAfterMigration.resolvedTableSchema();
LOG.info("Table {} schema updated from {} to {}", identifier,
tableSchema, newSchema);
return comparisonAfterMigration;
} catch (CommitFailedException e) {
cache.invalidate(identifier);
- TableMetadataCache.ResolvedSchemaInfo newSchema =
- cache.schema(identifier, schema, dropUnusedColumns);
+ TableMetadataCache.ResolvedSchemaInfo newSchema =
cache.schema(identifier, schema);
if (newSchema.compareResult() !=
CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED) {
LOG.debug("Table {} schema updated concurrently to {}",
identifier, schema);
return newSchema;
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
index cc8e6898d2..9e4d600f93 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestCompareSchemasVisitor.java
@@ -33,6 +33,9 @@ import org.junit.jupiter.api.Test;
class TestCompareSchemasVisitor {
+ private static final boolean CASE_SENSITIVE = true;
+ private static final boolean CASE_INSENSITIVE = false;
+
private static final boolean DROP_COLUMNS = true;
private static final boolean PRESERVE_COLUMNS = false;
@@ -47,7 +50,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(1, "id", IntegerType.get(), "comment"),
optional(2, "data", StringType.get()),
- optional(3, "extra", StringType.get()))))
+ optional(3, "extra", StringType.get())),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SAME);
}
@@ -62,7 +67,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(1, "id", IntegerType.get()),
optional(2, "data", StringType.get()),
- optional(3, "extra", StringType.get()))))
+ optional(3, "extra", StringType.get())),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SAME);
}
@@ -75,7 +82,9 @@ class TestCompareSchemasVisitor {
optional(1, "data", StringType.get()),
optional(2, "extra", StringType.get())),
new Schema(
- optional(0, "id", IntegerType.get()), optional(1, "data",
StringType.get()))))
+ optional(0, "id", IntegerType.get()), optional(1, "data",
StringType.get())),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -88,7 +97,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(0, "id", IntegerType.get()),
optional(1, "data", StringType.get()),
- optional(2, "extra", StringType.get()))))
+ optional(2, "extra", StringType.get())),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
}
@@ -99,7 +110,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(1, "id", LongType.get()), optional(2, "extra",
StringType.get())),
new Schema(
- optional(1, "id", IntegerType.get()), optional(2, "extra",
StringType.get()))))
+ optional(1, "id", IntegerType.get()), optional(2, "extra",
StringType.get())),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -110,7 +123,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(1, "id", IntegerType.get()), optional(2, "extra",
StringType.get())),
new Schema(
- optional(1, "id", LongType.get()), optional(2, "extra",
StringType.get()))))
+ optional(1, "id", LongType.get()), optional(2, "extra",
StringType.get())),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
}
@@ -120,9 +135,11 @@ class TestCompareSchemasVisitor {
new Schema(optional(1, "id", IntegerType.get()), optional(2, "extra",
StringType.get()));
Schema tableSchema =
new Schema(required(1, "id", IntegerType.get()), optional(2, "extra",
StringType.get()));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+ assertThat(
+ CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
- assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema))
+ assertThat(
+ CompareSchemasVisitor.visit(tableSchema, dataSchema,
CASE_SENSITIVE, PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SAME);
}
@@ -131,9 +148,11 @@ class TestCompareSchemasVisitor {
Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()));
Schema tableSchema =
new Schema(optional(1, "id", IntegerType.get()), required(2, "extra",
StringType.get()));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+ assertThat(
+ CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
- assertThat(CompareSchemasVisitor.visit(tableSchema, dataSchema))
+ assertThat(
+ CompareSchemasVisitor.visit(tableSchema, dataSchema,
CASE_SENSITIVE, PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -142,7 +161,8 @@ class TestCompareSchemasVisitor {
Schema dataSchema = new Schema(required(1, "id", IntegerType.get()));
Schema tableSchema =
new Schema(required(1, "id", IntegerType.get()), optional(2, "extra",
StringType.get()));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema))
+ assertThat(
+ CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
}
@@ -155,8 +175,9 @@ class TestCompareSchemasVisitor {
optional(2, "struct1", StructType.of(optional(3, "extra",
IntegerType.get())))),
new Schema(
optional(0, "id", IntegerType.get()),
- optional(
- 1, "struct1", StructType.of(optional(2, "extra",
IntegerType.get()))))))
+ optional(1, "struct1", StructType.of(optional(2, "extra",
IntegerType.get())))),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SAME);
}
@@ -169,8 +190,9 @@ class TestCompareSchemasVisitor {
optional(1, "struct1", StructType.of(optional(2, "extra",
LongType.get())))),
new Schema(
optional(1, "id", IntegerType.get()),
- optional(
- 2, "struct1", StructType.of(optional(3, "extra",
IntegerType.get()))))))
+ optional(2, "struct1", StructType.of(optional(3, "extra",
IntegerType.get())))),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -185,7 +207,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(0, "id", IntegerType.get()),
optional(
- 1, "map1", MapType.ofOptional(2, 3, IntegerType.get(),
StringType.get())))))
+ 1, "map1", MapType.ofOptional(2, 3, IntegerType.get(),
StringType.get()))),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SAME);
}
@@ -200,7 +224,9 @@ class TestCompareSchemasVisitor {
new Schema(
optional(1, "id", IntegerType.get()),
optional(
- 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(),
StringType.get())))))
+ 2, "map1", MapType.ofOptional(3, 4, IntegerType.get(),
StringType.get()))),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -213,7 +239,9 @@ class TestCompareSchemasVisitor {
optional(2, "list1", ListType.ofOptional(3,
IntegerType.get()))),
new Schema(
optional(0, "id", IntegerType.get()),
- optional(1, "list1", ListType.ofOptional(2,
IntegerType.get())))))
+ optional(1, "list1", ListType.ofOptional(2,
IntegerType.get()))),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SAME);
}
@@ -226,10 +254,76 @@ class TestCompareSchemasVisitor {
optional(1, "list1", ListType.ofOptional(2,
LongType.get()))),
new Schema(
optional(1, "id", IntegerType.get()),
- optional(2, "list1", ListType.ofOptional(3,
IntegerType.get())))))
+ optional(2, "list1", ListType.ofOptional(3,
IntegerType.get()))),
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS))
+ .isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
+ }
+
+ @Test
+ void testCaseInsensitiveFieldMatching() {
+ assertThat(
+ CompareSchemasVisitor.visit(
+ new Schema(
+ optional(1, "ID", IntegerType.get()),
+ optional(2, "Data", StringType.get()),
+ optional(3, "EXTRA", StringType.get())),
+ new Schema(
+ optional(1, "id", IntegerType.get()),
+ optional(2, "data", StringType.get()),
+ optional(3, "extra", StringType.get())),
+ CASE_INSENSITIVE,
+ PRESERVE_COLUMNS))
+ .isEqualTo(CompareSchemasVisitor.Result.SAME);
+ }
+
+ @Test
+ void testCaseSensitiveFieldMatchingDefault() {
+ assertThat(
+ CompareSchemasVisitor.visit(
+ new Schema(
+ optional(1, "ID", IntegerType.get()),
+ optional(2, "Data", StringType.get()),
+ optional(3, "EXTRA", StringType.get())),
+ new Schema(
+ optional(1, "id", IntegerType.get()),
+ optional(2, "data", StringType.get()),
+ optional(3, "extra", StringType.get())),
+ CASE_SENSITIVE,
+ DROP_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
+ @Test
+ void testCaseInsensitiveNestedStruct() {
+ assertThat(
+ CompareSchemasVisitor.visit(
+ new Schema(
+ optional(1, "ID", IntegerType.get()),
+ optional(2, "STRUCT1", StructType.of(optional(3, "NESTED",
StringType.get())))),
+ new Schema(
+ optional(1, "id", IntegerType.get()),
+ optional(2, "struct1", StructType.of(optional(3, "nested",
StringType.get())))),
+ CASE_INSENSITIVE,
+ PRESERVE_COLUMNS))
+ .isEqualTo(CompareSchemasVisitor.Result.SAME);
+ }
+
+ @Test
+ void testCaseInsensitiveWithMoreColumns() {
+ assertThat(
+ CompareSchemasVisitor.visit(
+ new Schema(
+ optional(0, "ID", IntegerType.get()), optional(1, "DATA",
StringType.get())),
+ new Schema(
+ optional(0, "id", IntegerType.get()),
+ optional(1, "data", StringType.get()),
+ optional(2, "extra", StringType.get())),
+ CASE_INSENSITIVE,
+ PRESERVE_COLUMNS))
+ .isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
+ }
+
@Test
void testDropUnusedColumnsEnabled() {
Schema dataSchema = new Schema(optional(1, "id", IntegerType.get()));
@@ -239,7 +333,7 @@ class TestCompareSchemasVisitor {
optional(2, "data", StringType.get()),
optional(3, "extra", StringType.get()));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true,
DROP_COLUMNS))
+ assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, DROP_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -249,7 +343,7 @@ class TestCompareSchemasVisitor {
Schema tableSchema =
new Schema(optional(1, "id", IntegerType.get()), required(2, "data",
StringType.get()));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true,
DROP_COLUMNS))
+ assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, DROP_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -262,7 +356,7 @@ class TestCompareSchemasVisitor {
optional(3, "extra", StringType.get()));
Schema tableSchema = new Schema(optional(1, "id", IntegerType.get()));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true,
DROP_COLUMNS))
+ assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, DROP_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
}
@@ -282,10 +376,11 @@ class TestCompareSchemasVisitor {
optional(3, "field1", StringType.get()),
optional(4, "field2", IntegerType.get()))));
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true,
DROP_COLUMNS))
+ assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, DROP_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
- assertThat(CompareSchemasVisitor.visit(dataSchema, tableSchema, true,
PRESERVE_COLUMNS))
+ assertThat(
+ CompareSchemasVisitor.visit(dataSchema, tableSchema,
CASE_SENSITIVE, PRESERVE_COLUMNS))
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
}
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
index 0c07bc9461..d9602e12eb 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java
@@ -977,6 +977,91 @@ class TestDynamicIcebergSink extends
TestFlinkIcebergSinkBase {
assertThat(records).hasSize(2);
}
+ @Test
+ void testCaseInsensitiveSchemaMatching() throws Exception {
+ Schema lowerCaseSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+ Schema upperCaseSchema =
+ new Schema(
+ Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "DATA", Types.StringType.get()));
+
+ Schema mixedCaseSchema =
+ new Schema(
+ Types.NestedField.optional(1, "Id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "Data", Types.StringType.get()));
+
+ List<DynamicIcebergDataImpl> rows =
+ Lists.newArrayList(
+ new DynamicIcebergDataImpl(
+ lowerCaseSchema, "t1", "main", PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ upperCaseSchema, "t1", "main", PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ mixedCaseSchema, "t1", "main", PartitionSpec.unpartitioned()));
+
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+ env.setParallelism(2);
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .writeParallelism(2)
+ .immediateTableUpdate(true)
+ .caseSensitive(false)
+ .append();
+
+ env.execute("Test Case Insensitive Iceberg DataStream");
+
+ verifyResults(rows);
+ }
+
+ @Test
+ void testCaseSensitiveSchemaMatchingCreatesNewFields() throws Exception {
+ Schema lowerCaseSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+ Schema upperCaseSchema =
+ new Schema(
+ Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "DATA", Types.StringType.get()));
+
+ List<DynamicIcebergDataImpl> rows =
+ Lists.newArrayList(
+ new DynamicIcebergDataImpl(
+ lowerCaseSchema, "t1", "main", PartitionSpec.unpartitioned()),
+ new DynamicIcebergDataImpl(
+ upperCaseSchema, "t1", "main", PartitionSpec.unpartitioned()));
+
+ DataStream<DynamicIcebergDataImpl> dataStream =
+ env.fromData(rows, TypeInformation.of(new TypeHint<>() {}));
+ env.setParallelism(2);
+
+ DynamicIcebergSink.forInput(dataStream)
+ .generator(new Generator())
+ .catalogLoader(CATALOG_EXTENSION.catalogLoader())
+ .writeParallelism(2)
+ .immediateTableUpdate(true)
+ .caseSensitive(true)
+ .append();
+
+ env.execute("Test Case Sensitive Iceberg DataStream");
+
+ Table table =
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of(DATABASE, "t1"));
+ Schema resultSchema = table.schema();
+ assertThat(resultSchema.columns()).hasSize(4);
+ assertThat(resultSchema.findField("id")).isNotNull();
+ assertThat(resultSchema.findField("ID")).isNotNull();
+ assertThat(resultSchema.findField("data")).isNotNull();
+ assertThat(resultSchema.findField("DATA")).isNotNull();
+ }
+
/**
* Represents a concurrent duplicate commit during an ongoing commit
operation, which can happen
* in production scenarios when using REST catalog.
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
index d68dd58c08..1c8e6df859 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicTableUpdateOperator.java
@@ -32,9 +32,17 @@ import org.apache.iceberg.flink.HadoopCatalogExtension;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
class TestDynamicTableUpdateOperator {
+ private static final boolean CASE_SENSITIVE = true;
+ private static final boolean CASE_INSENSITIVE = false;
+
+ private static final boolean DROP_COLUMNS = true;
+ private static final boolean PRESERVE_COLUMNS = false;
+
@RegisterExtension
private static final HadoopCatalogExtension CATALOG_EXTENSION =
new HadoopCatalogExtension(DATABASE, TABLE);
@@ -59,11 +67,12 @@ class TestDynamicTableUpdateOperator {
DynamicTableUpdateOperator operator =
new DynamicTableUpdateOperator(
CATALOG_EXTENSION.catalogLoader(),
- false,
cacheMaximumSize,
cacheRefreshMs,
inputSchemaCacheMaximumSize,
- TableCreator.DEFAULT);
+ TableCreator.DEFAULT,
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS);
operator.open(null);
DynamicRecordInternal input =
@@ -93,11 +102,12 @@ class TestDynamicTableUpdateOperator {
DynamicTableUpdateOperator operator =
new DynamicTableUpdateOperator(
CATALOG_EXTENSION.catalogLoader(),
- false,
cacheMaximumSize,
cacheRefreshMs,
inputSchemaCacheMaximumSize,
- TableCreator.DEFAULT);
+ TableCreator.DEFAULT,
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS);
operator.open(null);
catalog.createTable(table, SCHEMA1);
@@ -122,6 +132,59 @@ class TestDynamicTableUpdateOperator {
assertThat(catalog.loadTable(table).schema().schemaId()).isEqualTo(output.schema().schemaId());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCaseInSensitivity(boolean caseSensitive) throws Exception {
+ int cacheMaximumSize = 10;
+ int cacheRefreshMs = 1000;
+ int inputSchemaCacheMaximumSize = 10;
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier table = TableIdentifier.of(TABLE);
+
+ Schema initialSchema = new Schema(Types.NestedField.required(1, "id",
Types.IntegerType.get()));
+ Schema caseSensitiveSchema =
+ new Schema(Types.NestedField.required(1, "Id",
Types.IntegerType.get()));
+
+ DynamicTableUpdateOperator operator =
+ new DynamicTableUpdateOperator(
+ CATALOG_EXTENSION.catalogLoader(),
+ cacheMaximumSize,
+ cacheRefreshMs,
+ inputSchemaCacheMaximumSize,
+ TableCreator.DEFAULT,
+ caseSensitive,
+ PRESERVE_COLUMNS);
+ operator.open(null);
+
+ catalog.createTable(table, initialSchema);
+ DynamicRecordInternal input =
+ new DynamicRecordInternal(
+ TABLE,
+ "branch",
+ caseSensitiveSchema,
+ GenericRowData.of(1, "test"),
+ PartitionSpec.unpartitioned(),
+ 42,
+ false,
+ Collections.emptySet());
+ DynamicRecordInternal output = operator.map(input);
+
+ if (caseSensitive) {
+ // Schema changes due to case sensitivity
+ Schema expectedSchema =
+ new Schema(
+ Types.NestedField.optional(2, "Id", Types.IntegerType.get()),
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()));
+ Schema tableSchema = catalog.loadTable(table).schema();
+ assertThat(tableSchema.sameSchema(expectedSchema)).isTrue();
+ assertThat(output.schema().sameSchema(expectedSchema)).isTrue();
+ } else {
+ // No schema change due to case insensitivity
+
assertThat(catalog.loadTable(table).schema().sameSchema(initialSchema)).isTrue();
+ assertThat(output.schema().sameSchema(initialSchema)).isTrue();
+ }
+ }
+
@Test
void testDynamicTableUpdateOperatorPreserveUnusedColumns() throws Exception {
int cacheMaximumSize = 10;
@@ -133,11 +196,12 @@ class TestDynamicTableUpdateOperator {
DynamicTableUpdateOperator operator =
new DynamicTableUpdateOperator(
CATALOG_EXTENSION.catalogLoader(),
- false, // dropUnusedColumns = false (default)
cacheMaximumSize,
cacheRefreshMs,
inputSchemaCacheMaximumSize,
- TableCreator.DEFAULT);
+ TableCreator.DEFAULT,
+ CASE_SENSITIVE,
+ PRESERVE_COLUMNS);
operator.open(null);
catalog.createTable(table, SCHEMA2);
@@ -173,12 +237,12 @@ class TestDynamicTableUpdateOperator {
DynamicTableUpdateOperator operator =
new DynamicTableUpdateOperator(
CATALOG_EXTENSION.catalogLoader(),
- // Drop unused columns
- true,
cacheMaximumSize,
cacheRefreshMs,
inputSchemaCacheMaximumSize,
- TableCreator.DEFAULT);
+ TableCreator.DEFAULT,
+ CASE_INSENSITIVE,
+ DROP_COLUMNS);
operator.open(null);
catalog.createTable(table, SCHEMA2);
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
index 027adc4031..d2da73c669 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestEvolveSchemaVisitor.java
@@ -50,6 +50,10 @@ import org.junit.jupiter.api.Test;
public class TestEvolveSchemaVisitor {
private static final TableIdentifier TABLE = TableIdentifier.of("table");
+
+ private static final boolean CASE_SENSITIVE = true;
+ private static final boolean CASE_INSENSITIVE = false;
+
private static final boolean DROP_COLUMNS = true;
private static final boolean PRESERVE_COLUMNS = false;
@@ -94,7 +98,8 @@ public class TestEvolveSchemaVisitor {
public void testAddTopLevelPrimitives() {
Schema targetSchema = new Schema(primitiveFields(0, primitiveTypes()));
UpdateSchema updateApi = loadUpdateApi(new Schema());
- EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(targetSchema.asStruct()).isEqualTo(updateApi.apply().asStruct());
}
@@ -104,7 +109,8 @@ public class TestEvolveSchemaVisitor {
assertThat(existingSchema.columns().stream().allMatch(Types.NestedField::isRequired)).isTrue();
UpdateSchema updateApi = loadUpdateApi(existingSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, new Schema(),
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, new Schema(), CASE_SENSITIVE,
PRESERVE_COLUMNS);
Schema newSchema = updateApi.apply();
assertThat(newSchema.asStruct().fields()).hasSize(14);
assertThat(newSchema.columns().stream().allMatch(Types.NestedField::isOptional)).isTrue();
@@ -129,7 +135,8 @@ public class TestEvolveSchemaVisitor {
optional(2, "b", StructType.of(optional(5, "nested2",
StringType.get()))));
UpdateSchema updateApi = loadUpdateApi(existingSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema,
DROP_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE,
DROP_COLUMNS);
Schema newSchema = updateApi.apply();
assertThat(newSchema.sameSchema(targetSchema)).isTrue();
@@ -151,7 +158,8 @@ public class TestEvolveSchemaVisitor {
Schema targetSchema = new Schema(optional(1, "a", StringType.get()));
UpdateSchema updateApi = loadUpdateApi(existingSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
Schema newSchema = updateApi.apply();
assertThat(newSchema.sameSchema(existingSchema)).isTrue();
@@ -164,7 +172,8 @@ public class TestEvolveSchemaVisitor {
UpdateSchema updateApi = loadUpdateApi(existingSchema);
Schema newSchema =
new Schema(Arrays.asList(Types.NestedField.optional(-1, "myField",
Types.LongType.get())));
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, newSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, newSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().sameSchema(existingSchema)).isTrue();
}
@@ -177,7 +186,8 @@ public class TestEvolveSchemaVisitor {
new Schema(
Arrays.asList(optional(2, "b", StringType.get()), optional(1, "a",
StringType.get())));
UpdateSchema updateApi = loadUpdateApi(existingSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -186,7 +196,8 @@ public class TestEvolveSchemaVisitor {
for (PrimitiveType primitiveType : primitiveTypes()) {
Schema targetSchema = new Schema(optional(1, "aList",
ListType.ofOptional(2, primitiveType)));
UpdateSchema updateApi = loadUpdateApi(new Schema());
- EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
}
@@ -198,7 +209,8 @@ public class TestEvolveSchemaVisitor {
new Schema(optional(1, "aList", ListType.ofRequired(2,
primitiveType)));
Schema targetSchema = new Schema();
UpdateSchema updateApi = loadUpdateApi(existingSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema,
targetSchema, PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
Schema expectedSchema =
new Schema(optional(1, "aList", ListType.ofRequired(2,
primitiveType)));
assertThat(updateApi.apply().asStruct()).isEqualTo(expectedSchema.asStruct());
@@ -211,7 +223,8 @@ public class TestEvolveSchemaVisitor {
Schema targetSchema =
new Schema(optional(1, "aMap", MapType.ofOptional(2, 3,
primitiveType, primitiveType)));
UpdateSchema updateApi = loadUpdateApi(new Schema());
- EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
}
@@ -223,7 +236,8 @@ public class TestEvolveSchemaVisitor {
new Schema(
optional(1, "aStruct", StructType.of(optional(2, "primitive",
primitiveType))));
UpdateSchema updateApi = loadUpdateApi(new Schema());
- EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), currentSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, new Schema(), currentSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(currentSchema.asStruct());
}
}
@@ -236,7 +250,8 @@ public class TestEvolveSchemaVisitor {
new Schema(
optional(1, "aStruct", StructType.of(optional(2, "primitive",
primitiveType))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
}
@@ -251,7 +266,8 @@ public class TestEvolveSchemaVisitor {
new Schema(
optional(1, "aStruct", StructType.of(optional(2, "primitive",
primitiveType))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
}
@@ -262,7 +278,8 @@ public class TestEvolveSchemaVisitor {
Schema targetSchema =
new Schema(optional(1, "aStruct", StructType.of(primitiveFields(1,
primitiveTypes()))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -292,7 +309,8 @@ public class TestEvolveSchemaVisitor {
ListType.ofOptional(
10, DecimalType.of(11,
20))))))))))));
UpdateSchema updateApi = loadUpdateApi(new Schema());
- EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -331,7 +349,8 @@ public class TestEvolveSchemaVisitor {
"aString",
StringType.get()))))))))))))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -366,7 +385,8 @@ public class TestEvolveSchemaVisitor {
12, 13, StringType.get(),
StringType.get()))))))));
UpdateSchema updateApi = loadUpdateApi(new Schema());
- EvolveSchemaVisitor.visit(TABLE, updateApi, new Schema(), targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, new Schema(), targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -382,6 +402,7 @@ public class TestEvolveSchemaVisitor {
loadUpdateApi(currentSchema),
currentSchema,
targetSchema,
+ true,
PRESERVE_COLUMNS))
.hasMessage("Cannot change column type: aList.element: string -> long")
.isInstanceOf(IllegalArgumentException.class);
@@ -403,6 +424,7 @@ public class TestEvolveSchemaVisitor {
loadUpdateApi(currentSchema),
currentSchema,
targetSchema,
+ true,
PRESERVE_COLUMNS))
.hasMessage("Cannot change column type: aMap.value: string -> long")
.isInstanceOf(IllegalArgumentException.class);
@@ -422,6 +444,7 @@ public class TestEvolveSchemaVisitor {
loadUpdateApi(currentSchema),
currentSchema,
targetSchema,
+ true,
PRESERVE_COLUMNS))
.hasMessage("Cannot change column type: aMap.key: string -> uuid")
.isInstanceOf(IllegalArgumentException.class);
@@ -434,7 +457,8 @@ public class TestEvolveSchemaVisitor {
Schema targetSchema = new Schema(required(1, "aCol", LongType.get()));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
Schema applied = updateApi.apply();
assertThat(applied.asStruct().fields()).hasSize(1);
assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(LongType.get());
@@ -447,7 +471,8 @@ public class TestEvolveSchemaVisitor {
Schema targetSchema = new Schema(required(1, "aCol", DoubleType.get()));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
Schema applied = updateApi.apply();
assertThat(applied.asStruct().fields()).hasSize(1);
assertThat(applied.asStruct().fields().get(0).type()).isEqualTo(DoubleType.get());
@@ -464,6 +489,7 @@ public class TestEvolveSchemaVisitor {
loadUpdateApi(currentSchema),
currentSchema,
targetSchema,
+ true,
PRESERVE_COLUMNS))
.hasMessage("Cannot change column type: aCol: double -> float")
.isInstanceOf(IllegalArgumentException.class);
@@ -477,7 +503,8 @@ public class TestEvolveSchemaVisitor {
Schema targetSchema = new Schema(required(1, "aCol", DecimalType.of(22,
1)));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -520,7 +547,8 @@ public class TestEvolveSchemaVisitor {
optional(6, "time",
TimeType.get())))))))));
UpdateSchema updateApi = loadUpdateApi(existingSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, existingSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -536,6 +564,7 @@ public class TestEvolveSchemaVisitor {
loadUpdateApi(currentSchema),
currentSchema,
targetSchema,
+ true,
PRESERVE_COLUMNS))
.hasMessage("Cannot change column type: aColumn: list<string> ->
string")
.isInstanceOf(IllegalArgumentException.class);
@@ -573,7 +602,8 @@ public class TestEvolveSchemaVisitor {
optional(7, "d1", StructType.of(optional(8, "d2",
StringType.get()))))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -625,7 +655,8 @@ public class TestEvolveSchemaVisitor {
StringType.get()))))))))))))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(updateApi.apply().asStruct()).isEqualTo(targetSchema.asStruct());
}
@@ -645,7 +676,8 @@ public class TestEvolveSchemaVisitor {
optional(
3, "s3", StructType.of(optional(4, "s4",
StringType.get()))))))));
UpdateSchema updateApi = loadUpdateApi(currentSchema);
- EvolveSchemaVisitor.visit(TABLE, updateApi, currentSchema, targetSchema,
PRESERVE_COLUMNS);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, currentSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(getNestedSchemaWithOptionalModifier(true).asStruct())
.isEqualTo(updateApi.apply().asStruct());
}
@@ -682,6 +714,82 @@ public class TestEvolveSchemaVisitor {
9, "s4",
StringType.get()))))))))))))));
}
+ @Test
+ public void testCaseInsensitiveAddField() {
+ Schema existingSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "name", Types.StringType.get()));
+ Schema targetSchema =
+ new Schema(
+ Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "NAME", Types.StringType.get()),
+ Types.NestedField.optional(3, "AGE", Types.IntegerType.get()));
+
+ UpdateSchema updateApi = loadUpdateApi(existingSchema);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE,
PRESERVE_COLUMNS);
+ Schema result = updateApi.apply();
+ assertThat(result.columns()).hasSize(3);
+ assertThat(result.findField("AGE")).isNotNull();
+ }
+
+ @Test
+ public void testCaseInsensitiveMakeFieldOptional() {
+ Schema existingSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.required(2, "name", Types.StringType.get()));
+ Schema targetSchema = new Schema(Types.NestedField.optional(1, "ID",
Types.IntegerType.get()));
+
+ UpdateSchema updateApi = loadUpdateApi(existingSchema);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE,
PRESERVE_COLUMNS);
+ Schema result = updateApi.apply();
+ assertThat(result.findField("name").isOptional()).isTrue();
+ }
+
+ @Test
+ public void testCaseInsensitiveNestedStructField() {
+ Schema existingSchema =
+ new Schema(
+ optional(1, "struct1", StructType.of(optional(2, "field1",
Types.StringType.get()))));
+ Schema targetSchema =
+ new Schema(
+ optional(
+ 1,
+ "STRUCT1",
+ StructType.of(
+ optional(2, "FIELD1", Types.StringType.get()),
+ optional(3, "FIELD2", Types.IntegerType.get()))));
+
+ UpdateSchema updateApi = loadUpdateApi(existingSchema);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_INSENSITIVE,
PRESERVE_COLUMNS);
+ Schema result = updateApi.apply();
+ Types.StructType struct =
result.findField("struct1").type().asStructType();
+ assertThat(struct.fields()).hasSize(2);
+ assertThat(struct.field("FIELD2")).isNotNull();
+ }
+
+ @Test
+ public void testCaseSensitiveDoesNotMatch() {
+ Schema existingSchema =
+ new Schema(Types.NestedField.optional(1, "id",
Types.IntegerType.get()));
+ Schema targetSchema =
+ new Schema(
+ Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "name", Types.StringType.get()));
+
+ UpdateSchema updateApi = loadUpdateApi(existingSchema);
+ EvolveSchemaVisitor.visit(
+ TABLE, updateApi, existingSchema, targetSchema, CASE_SENSITIVE,
PRESERVE_COLUMNS);
+ Schema result = updateApi.apply();
+ assertThat(result.columns()).hasSize(3);
+ assertThat(result.findField("ID")).isNotNull();
+ assertThat(result.findField("id")).isNotNull();
+ }
+
private static UpdateSchema loadUpdateApi(Schema schema) {
try {
Constructor<?> constructor =
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
index d696059902..8a17c707f8 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableMetadataCache.java
@@ -36,6 +36,12 @@ import org.junit.jupiter.api.Test;
public class TestTableMetadataCache extends TestFlinkIcebergSinkBase {
+ private static final boolean CASE_SENSITIVE = true;
+ private static final boolean CASE_INSENSITIVE = false;
+
+ private static final boolean DROP_COLUMNS = true;
+ private static final boolean PRESERVE_COLUMNS = false;
+
static final Schema SCHEMA =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
@@ -47,29 +53,35 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
Types.NestedField.optional(2, "data", Types.StringType.get()),
Types.NestedField.optional(3, "extra", Types.StringType.get()));
+ static final Schema SCHEMA_UPPERCASE =
+ new Schema(
+ Types.NestedField.optional(1, "ID", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "DATA", Types.StringType.get()));
+
+ static final Schema SCHEMA_MIXEDCASE =
+ new Schema(
+ Types.NestedField.optional(1, "Id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "Data", Types.StringType.get()));
+
@Test
void testCaching() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
catalog.createTable(tableIdentifier, SCHEMA);
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
- Schema schema1 = cache.schema(tableIdentifier, SCHEMA,
false).resolvedTableSchema();
+ Schema schema1 = cache.schema(tableIdentifier,
SCHEMA).resolvedTableSchema();
assertThat(schema1.sameSchema(SCHEMA)).isTrue();
assertThat(
- cache
- .schema(tableIdentifier, SerializationUtils.clone(SCHEMA),
false)
- .resolvedTableSchema())
+ cache.schema(tableIdentifier,
SerializationUtils.clone(SCHEMA)).resolvedTableSchema())
.isEqualTo(schema1);
- assertThat(cache.schema(tableIdentifier, SCHEMA2, false))
- .isEqualTo(TableMetadataCache.NOT_FOUND);
+ assertThat(cache.schema(tableIdentifier,
SCHEMA2)).isEqualTo(TableMetadataCache.NOT_FOUND);
- schema1 = cache.schema(tableIdentifier, SCHEMA,
false).resolvedTableSchema();
+ schema1 = cache.schema(tableIdentifier, SCHEMA).resolvedTableSchema();
assertThat(
- cache
- .schema(tableIdentifier, SerializationUtils.clone(SCHEMA),
false)
- .resolvedTableSchema())
+ cache.schema(tableIdentifier,
SerializationUtils.clone(SCHEMA)).resolvedTableSchema())
.isEqualTo(schema1);
}
@@ -78,10 +90,11 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
catalog.createTable(tableIdentifier, SCHEMA);
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
- Schema schema1 = cache.schema(tableIdentifier, SCHEMA,
false).resolvedTableSchema();
+ Schema schema1 = cache.schema(tableIdentifier,
SCHEMA).resolvedTableSchema();
assertThat(schema1.sameSchema(SCHEMA)).isTrue();
catalog.dropTable(tableIdentifier);
@@ -93,7 +106,7 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
PartitionSpec.unpartitioned(),
TableCreator.DEFAULT);
- Schema schema2 = cache.schema(tableIdentifier, SCHEMA2,
false).resolvedTableSchema();
+ Schema schema2 = cache.schema(tableIdentifier,
SCHEMA2).resolvedTableSchema();
assertThat(schema2.sameSchema(SCHEMA2)).isTrue();
}
@@ -102,7 +115,8 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
catalog.createTable(tableIdentifier, SCHEMA);
- TableMetadataCache cache = new TableMetadataCache(catalog, 0,
Long.MAX_VALUE, 10);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 0, Long.MAX_VALUE, 10, CASE_SENSITIVE,
PRESERVE_COLUMNS);
assertThat(cache.getInternalCache()).isEmpty();
}
@@ -117,15 +131,21 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
// Init cache
TableMetadataCache cache =
new TableMetadataCache(
- catalog, 10, 100L, 10, Clock.fixed(Instant.now(),
ZoneId.systemDefault()));
+ catalog,
+ 10,
+ 100L,
+ 10,
+ CASE_INSENSITIVE,
+ PRESERVE_COLUMNS,
+ Clock.fixed(Instant.now(), ZoneId.systemDefault()));
cache.update(tableIdentifier, table);
// Cache schema
- Schema schema = cache.schema(tableIdentifier, SCHEMA2,
false).resolvedTableSchema();
+ Schema schema = cache.schema(tableIdentifier,
SCHEMA2).resolvedTableSchema();
assertThat(schema.sameSchema(SCHEMA2)).isTrue();
// Cache schema with fewer fields
- TableMetadataCache.ResolvedSchemaInfo schemaInfo =
cache.schema(tableIdentifier, SCHEMA, false);
+ TableMetadataCache.ResolvedSchemaInfo schemaInfo =
cache.schema(tableIdentifier, SCHEMA);
assertThat(schemaInfo.resolvedTableSchema().sameSchema(SCHEMA2)).isTrue();
assertThat(schemaInfo.compareResult())
.isEqualTo(CompareSchemasVisitor.Result.DATA_CONVERSION_NEEDED);
@@ -140,9 +160,10 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
void testNoSuchNamespaceExceptionHandling() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier =
TableIdentifier.of("nonexistent_namespace", "myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
- TableMetadataCache.ResolvedSchemaInfo result =
cache.schema(tableIdentifier, SCHEMA, false);
+ TableMetadataCache.ResolvedSchemaInfo result =
cache.schema(tableIdentifier, SCHEMA);
assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
@@ -152,11 +173,48 @@ public class TestTableMetadataCache extends
TestFlinkIcebergSinkBase {
void testNoSuchTableExceptionHandling() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier =
TableIdentifier.parse("default.nonexistent_table");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
- TableMetadataCache.ResolvedSchemaInfo result =
cache.schema(tableIdentifier, SCHEMA, false);
+ TableMetadataCache.ResolvedSchemaInfo result =
cache.schema(tableIdentifier, SCHEMA);
assertThat(result).isEqualTo(TableMetadataCache.NOT_FOUND);
assertThat(cache.getInternalCache().get(tableIdentifier)).isNotNull();
}
+
+ @Test
+ void testCaseInsensitiveCaching() {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+ catalog.createTable(tableIdentifier, SCHEMA);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_INSENSITIVE, PRESERVE_COLUMNS);
+
+ Schema schema1 = cache.schema(tableIdentifier,
SCHEMA).resolvedTableSchema();
+ assertThat(schema1.sameSchema(SCHEMA)).isTrue();
+
+ Schema schemaUpperCase = cache.schema(tableIdentifier,
SCHEMA_UPPERCASE).resolvedTableSchema();
+ assertThat(schemaUpperCase).isEqualTo(schema1);
+
+ Schema schemaMixedCase = cache.schema(tableIdentifier,
SCHEMA_MIXEDCASE).resolvedTableSchema();
+ assertThat(schemaMixedCase).isEqualTo(schema1);
+ }
+
+ @Test
+ void testCaseSensitiveCachingDoesNotMatch() {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
+ catalog.createTable(tableIdentifier, SCHEMA);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+
+ Schema schema1 = cache.schema(tableIdentifier,
SCHEMA).resolvedTableSchema();
+ assertThat(schema1.sameSchema(SCHEMA)).isTrue();
+
+ assertThat(cache.schema(tableIdentifier, SCHEMA_UPPERCASE))
+ .isEqualTo(TableMetadataCache.NOT_FOUND);
+
+ assertThat(cache.schema(tableIdentifier, SCHEMA_MIXEDCASE))
+ .isEqualTo(TableMetadataCache.NOT_FOUND);
+ }
}
diff --git
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
index c0b376d30e..bdc825b44f 100644
---
a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
+++
b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableUpdater.java
@@ -36,9 +36,17 @@ import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
public class TestTableUpdater extends TestFlinkIcebergSinkBase {
+ private static final boolean CASE_SENSITIVE = true;
+ private static final boolean CASE_INSENSITIVE = false;
+
+ private static final boolean DROP_COLUMNS = true;
+ private static final boolean PRESERVE_COLUMNS = false;
+
static final Schema SCHEMA =
new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
@@ -57,8 +65,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
catalog.initialize("catalog", Map.of());
catalog.createNamespace(Namespace.of("myNamespace"));
TableIdentifier tableIdentifier =
TableIdentifier.parse("myNamespace.myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
String locationOverride = tempDir.toString() + "/custom-path";
Map<String, String> tableProperties = Map.of("key", "value");
@@ -75,8 +84,7 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
assertThat(catalog.tableExists(tableIdentifier)).isTrue();
assertThat(catalog.loadTable(tableIdentifier).properties().get("key")).isEqualTo("value");
assertThat(catalog.loadTable(tableIdentifier).location()).isEqualTo(locationOverride);
- TableMetadataCache.ResolvedSchemaInfo cachedSchema =
- cache.schema(tableIdentifier, SCHEMA, false);
+ TableMetadataCache.ResolvedSchemaInfo cachedSchema =
cache.schema(tableIdentifier, SCHEMA);
assertThat(cachedSchema.resolvedTableSchema().sameSchema(SCHEMA)).isTrue();
}
@@ -84,8 +92,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
void testTableAlreadyExists() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
// Make the table non-existent in cache
cache.exists(tableIdentifier);
@@ -108,8 +117,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
void testBranchCreationAndCaching() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
catalog.createTable(tableIdentifier, SCHEMA);
tableUpdater.update(
@@ -126,8 +136,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
void testSpecCreation() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).bucket("data",
10).build();
tableUpdater.update(
@@ -143,9 +154,10 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
catalog.createTable(tableIdentifier, SCHEMA);
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- cache.schema(tableIdentifier, SCHEMA, false);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ cache.schema(tableIdentifier, SCHEMA);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
Schema updated =
tableUpdater
@@ -158,8 +170,7 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
.f0
.resolvedTableSchema();
assertThat(updated.sameSchema(SCHEMA2)).isTrue();
- assertThat(
- cache.schema(tableIdentifier, SCHEMA2,
false).resolvedTableSchema().sameSchema(SCHEMA2))
+ assertThat(cache.schema(tableIdentifier,
SCHEMA2).resolvedTableSchema().sameSchema(SCHEMA2))
.isTrue();
}
@@ -168,8 +179,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("default.myTable");
catalog.createTable(tableIdentifier, SCHEMA);
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
// Initialize cache
tableUpdater.update(
@@ -184,7 +196,7 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
catalog.createTable(tableIdentifier, SCHEMA2);
// Cache still stores the old information
- assertThat(cache.schema(tableIdentifier, SCHEMA2, false).compareResult())
+ assertThat(cache.schema(tableIdentifier, SCHEMA2).compareResult())
.isEqualTo(CompareSchemasVisitor.Result.SCHEMA_UPDATE_NEEDED);
assertThat(
@@ -204,14 +216,59 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
.doesNotContainKey(SCHEMA2);
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testCaseSensitivity(boolean caseSensitive) {
+ Catalog catalog = CATALOG_EXTENSION.catalog();
+ TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10, caseSensitive,
DROP_COLUMNS);
+
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
caseSensitive, DROP_COLUMNS);
+
+ Schema schema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()),
+ Types.NestedField.optional(3, "extra", Types.StringType.get()));
+
+ catalog.createTable(tableIdentifier, schema);
+
+ Schema schemaWithUpperCase =
+ new Schema(
+ Types.NestedField.optional(1, "Id", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "Data", Types.StringType.get()),
+ Types.NestedField.optional(3, "Extra", Types.StringType.get()));
+
+ Tuple2<TableMetadataCache.ResolvedSchemaInfo, PartitionSpec> result =
+ tableUpdater.update(
+ tableIdentifier,
+ SnapshotRef.MAIN_BRANCH,
+ schemaWithUpperCase,
+ PartitionSpec.unpartitioned(),
+ TableCreator.DEFAULT);
+
+
assertThat(result.f0.compareResult()).isEqualTo(CompareSchemasVisitor.Result.SAME);
+
+ Schema tableSchema = catalog.loadTable(tableIdentifier).schema();
+ if (caseSensitive) {
+ assertThat(tableSchema.columns()).hasSize(3);
+ assertThat(tableSchema.findField("Id")).isNotNull();
+ assertThat(tableSchema.findField("Data")).isNotNull();
+ assertThat(tableSchema.findField("Extra")).isNotNull();
+ } else {
+ assertThat(tableSchema.sameSchema(schema)).isTrue();
+ }
+ }
+
@Test
void testDropUnusedColumns() {
Catalog catalog = CATALOG_EXTENSION.catalog();
TableIdentifier tableIdentifier = TableIdentifier.parse("myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, DROP_COLUMNS);
- final boolean dropUnusedColumns = true;
- TableUpdater tableUpdater = new TableUpdater(cache, catalog,
dropUnusedColumns);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, DROP_COLUMNS);
catalog.createTable(tableIdentifier, SCHEMA2);
@@ -236,8 +293,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
Catalog catalog = CATALOG_EXTENSION.catalog();
SupportsNamespaces namespaceCatalog = (SupportsNamespaces) catalog;
TableIdentifier tableIdentifier = TableIdentifier.of("new_namespace",
"myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
assertThat(namespaceCatalog.namespaceExists(Namespace.of("new_namespace"))).isFalse();
assertThat(catalog.tableExists(tableIdentifier)).isFalse();
@@ -265,8 +323,9 @@ public class TestTableUpdater extends
TestFlinkIcebergSinkBase {
namespaceCatalog.createNamespace(namespace);
TableIdentifier tableIdentifier = TableIdentifier.of("existing_namespace",
"myTable");
- TableMetadataCache cache = new TableMetadataCache(catalog, 10,
Long.MAX_VALUE, 10);
- TableUpdater tableUpdater = new TableUpdater(cache, catalog, false);
+ TableMetadataCache cache =
+ new TableMetadataCache(catalog, 10, Long.MAX_VALUE, 10,
CASE_SENSITIVE, PRESERVE_COLUMNS);
+ TableUpdater tableUpdater = new TableUpdater(cache, catalog,
CASE_SENSITIVE, PRESERVE_COLUMNS);
assertThat(namespaceCatalog.namespaceExists(namespace)).isTrue();
assertThat(catalog.tableExists(tableIdentifier)).isFalse();