This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57a6f02357fd2eb7e3c59e9903f9ec33a655ead6
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Dec 6 15:21:46 2021 +0100

    [FLINK-25014][table-api-java] Perform toDataStream projection 
case-insensitive
    
    This closes #18029.
---
 .../flink/table/catalog/SchemaTranslator.java      | 38 ++++++++++++++++++----
 .../flink/table/catalog/SchemaTranslatorTest.java  |  4 +--
 2 files changed, 33 insertions(+), 9 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
index 4f9fad4..467840c 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/SchemaTranslator.java
@@ -43,6 +43,7 @@ import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -113,19 +114,42 @@ public final class SchemaTranslator {
             ResolvedSchema inputSchema,
             AbstractDataType<?> targetDataType) {
         final List<String> inputFieldNames = inputSchema.getColumnNames();
+        final List<String> inputFieldNamesNormalized =
+                inputFieldNames.stream()
+                        .map(n -> n.toLowerCase(Locale.ROOT))
+                        .collect(Collectors.toList());
         final DataType resolvedDataType = 
dataTypeFactory.createDataType(targetDataType);
         final List<String> targetFieldNames = flattenToNames(resolvedDataType);
+        final List<String> targetFieldNamesNormalized =
+                targetFieldNames.stream()
+                        .map(n -> n.toLowerCase(Locale.ROOT))
+                        .collect(Collectors.toList());
         final List<DataType> targetFieldDataTypes = 
flattenToDataTypes(resolvedDataType);
 
         // help in reorder fields for POJOs if all field names are present but 
out of order,
         // otherwise let the sink validation fail later
-        final List<String> projections;
-        if (targetFieldNames.size() == inputFieldNames.size()
-                && !targetFieldNames.equals(inputFieldNames)
-                && targetFieldNames.containsAll(inputFieldNames)) {
-            projections = targetFieldNames;
-        } else {
-            projections = null;
+        List<String> projections = null;
+        if (targetFieldNames.size() == inputFieldNames.size()) {
+            // reordering by name (case-sensitive)
+            if (targetFieldNames.containsAll(inputFieldNames)) {
+                projections = targetFieldNames;
+            }
+            // reordering by name (case-insensitive) but fields must be unique
+            else if 
(targetFieldNamesNormalized.containsAll(inputFieldNamesNormalized)
+                    && targetFieldNamesNormalized.stream().distinct().count()
+                            == targetFieldNames.size()
+                    && inputFieldNamesNormalized.stream().distinct().count()
+                            == inputFieldNames.size()) {
+                projections =
+                        targetFieldNamesNormalized.stream()
+                                .map(
+                                        targetName -> {
+                                            final int inputFieldPos =
+                                                    
inputFieldNamesNormalized.indexOf(targetName);
+                                            return 
inputFieldNames.get(inputFieldPos);
+                                        })
+                                .collect(Collectors.toList());
+            }
         }
 
         final Schema schema =
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
index 46bab61..c2c2e9f 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaTranslatorTest.java
@@ -90,7 +90,7 @@ public class SchemaTranslatorTest {
                 ResolvedSchema.of(
                         Column.physical("c", DataTypes.INT()),
                         Column.physical("a", DataTypes.BOOLEAN()),
-                        Column.physical("b", DataTypes.DOUBLE()));
+                        Column.physical("B", DataTypes.DOUBLE())); // 
case-insensitive mapping
 
         final DataType physicalDataType =
                 DataTypes.ROW(
@@ -102,7 +102,7 @@ public class SchemaTranslatorTest {
                 SchemaTranslator.createProducingResult(
                         dataTypeFactory(), inputSchema, physicalDataType);
 
-        assertEquals(Optional.of(Arrays.asList("a", "b", "c")), 
result.getProjections());
+        assertEquals(Optional.of(Arrays.asList("a", "B", "c")), 
result.getProjections());
 
         assertEquals(
                 Schema.newBuilder()

Reply via email to