This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57a6f02357fd2eb7e3c59e9903f9ec33a655ead6 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Dec 6 15:21:46 2021 +0100 [FLINK-25014][table-api-java] Perform toDataStream projection case-insensitive This closes #18029. --- .../flink/table/catalog/SchemaTranslator.java | 38 ++++++++++++++++++---- .../flink/table/catalog/SchemaTranslatorTest.java | 4 +-- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java index 4f9fad4..467840c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java @@ -43,6 +43,7 @@ import javax.annotation.Nullable; import java.util.Collections; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -113,19 +114,42 @@ public final class SchemaTranslator { ResolvedSchema inputSchema, AbstractDataType<?> targetDataType) { final List<String> inputFieldNames = inputSchema.getColumnNames(); + final List<String> inputFieldNamesNormalized = + inputFieldNames.stream() + .map(n -> n.toLowerCase(Locale.ROOT)) + .collect(Collectors.toList()); final DataType resolvedDataType = dataTypeFactory.createDataType(targetDataType); final List<String> targetFieldNames = flattenToNames(resolvedDataType); + final List<String> targetFieldNamesNormalized = + targetFieldNames.stream() + .map(n -> n.toLowerCase(Locale.ROOT)) + .collect(Collectors.toList()); final List<DataType> targetFieldDataTypes = flattenToDataTypes(resolvedDataType); // help in reorder fields for POJOs if all field names are present but out of order, // otherwise let the sink validation fail later - final List<String> projections; - if (targetFieldNames.size() == inputFieldNames.size() - && !targetFieldNames.equals(inputFieldNames) - && targetFieldNames.containsAll(inputFieldNames)) { - projections = targetFieldNames; - } else { - projections = null; + List<String> projections = null; + if (targetFieldNames.size() == inputFieldNames.size()) { + // reordering by name (case-sensitive) + if (targetFieldNames.containsAll(inputFieldNames)) { + projections = targetFieldNames; + } + // reordering by name (case-insensitive) but fields must be unique + else if (targetFieldNamesNormalized.containsAll(inputFieldNamesNormalized) + && targetFieldNamesNormalized.stream().distinct().count() + == targetFieldNames.size() + && inputFieldNamesNormalized.stream().distinct().count() + == inputFieldNames.size()) { + projections = + targetFieldNamesNormalized.stream() + .map( + targetName -> { + final int inputFieldPos = + inputFieldNamesNormalized.indexOf(targetName); + return inputFieldNames.get(inputFieldPos); + }) + .collect(Collectors.toList()); + } } final Schema schema = diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java index 46bab61..c2c2e9f 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java @@ -90,7 +90,7 @@ public class SchemaTranslatorTest { ResolvedSchema.of( Column.physical("c", DataTypes.INT()), Column.physical("a", DataTypes.BOOLEAN()), - Column.physical("b", DataTypes.DOUBLE())); + Column.physical("B", DataTypes.DOUBLE())); // case-insensitive mapping final DataType physicalDataType = DataTypes.ROW( @@ -102,7 +102,7 @@ public class SchemaTranslatorTest { SchemaTranslator.createProducingResult( dataTypeFactory(), inputSchema, physicalDataType); - assertEquals(Optional.of(Arrays.asList("a", "b", "c")), result.getProjections()); + assertEquals(Optional.of(Arrays.asList("a", "B", "c")), result.getProjections()); assertEquals( Schema.newBuilder()