godfreyhe commented on a change in pull request #13631:
URL: https://github.com/apache/flink/pull/13631#discussion_r506915187



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
##########
@@ -256,7 +256,15 @@ class RefFieldAccessorVisitor(usedFields: Array[Int]) 
extends RexVisitorImpl[Uni
     if (right.length < left.length) {
       false
     } else {
-      right.take(left.length).equals(left)
+      right.take(left.length).zip(left).foldLeft(true) {
+        (ans, fields) => {
+          if (ans) {
+            fields._1.equals(fields._2)

Review comment:
       It's better we does not use `_1`, `_2`, which makes the code hard to 
read. we can use use `case` match with meaningful name, just like:
   ```
   right.take(left.length).zip(left).foldLeft(true) {
          case (ans, (rName, lName)) => {
             if (ans) {
               lName.equals(rName)
             } else {
               false
             }
           }
         }
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
##########
@@ -75,10 +79,46 @@ public static TableSchema projectSchema(TableSchema 
tableSchema, int[][] project
                checkArgument(containsPhysicalColumnsOnly(tableSchema), 
"Projection is only supported for physical columns.");
                TableSchema.Builder schemaBuilder = TableSchema.builder();
                List<TableColumn> tableColumns = tableSchema.getTableColumns();
+               Map<String, String> nameDomain = new HashMap<>();
+               String exceptionTemplate = "Get name conflicts for origin 
fields %s and %s with new name `%s`. " +
+                               "When pushing projection into scan, we will 
concatenate top level names with delimiter '_'. " +
+                               "Please rename the origin field names when 
creating table.";
+               String originFullyQualifiedName;
+               String newName;
                for (int[] fieldPath : projectedFields) {
-                       checkArgument(fieldPath.length == 1, "Nested projection 
push down is not supported yet.");
-                       TableColumn column = tableColumns.get(fieldPath[0]);
-                       schemaBuilder.field(column.getName(), column.getType());
+                       if (fieldPath.length == 1) {
+                               TableColumn column = 
tableColumns.get(fieldPath[0]);
+                               newName = column.getName();
+                               originFullyQualifiedName = 
String.format("`%s`", column.getName());
+                               if (nameDomain.containsKey(column.getName())) {
+                                       throw new TableException(

Review comment:
       how about resolve the conflicts through adding postfix ?

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -70,41 +69,40 @@ public void setup() {
                                                " 'bounded' = 'true'\n" +
                                                ")";
                util().tableEnv().executeSql(ddl2);
-       }
-
-       @Override
-       public void testNestedProject() {
-               expectedException().expect(TableException.class);
-               expectedException().expectMessage("Nested projection push down 
is unsupported now.");
-               testNestedProject(true);
-       }
 
-       @Test
-       public void testNestedProjectDisabled() {
-               testNestedProject(false);
-       }
-
-       private void testNestedProject(boolean nestedProjectionSupported) {
-               String ddl =
+               String ddl3 =
                                "CREATE TABLE NestedTable (\n" +
                                                "  id int,\n" +
-                                               "  deepNested row<nested1 
row<name string, `value` int>, nested2 row<num int, flag boolean>>,\n" +
-                                               "  nested row<name string, 
`value` int>,\n" +
+                                               "  deepNested row<nested1 
row<name string, `value` int>, `nested2.` row<num int, flag boolean>>,\n" +
+                                               "  nested row<name string, 
`value.` int>,\n" +
                                                "  name string\n" +
                                                ") WITH (\n" +
                                                " 'connector' = 'values',\n" +
-                                               " 'nested-projection-supported' 
= '" + nestedProjectionSupported + "',\n" +
+                                               " 'nested-projection-supported' 
= 'true'," +
                                                "  'bounded' = 'true'\n" +
                                                ")";
-               util().tableEnv().executeSql(ddl);
+               util().tableEnv().executeSql(ddl3);
+       }
 
+       @Override
+       @Test
+       public void testNestedProject() {
                String sqlQuery = "SELECT id,\n" +
                                "    deepNested.nested1.name AS nestedName,\n" +
-                               "    nested.`value` AS nestedValue,\n" +
-                               "    deepNested.nested2.flag AS nestedFlag,\n" +
-                               "    deepNested.nested2.num AS nestedNum\n" +
+                               "    nested.`value.` AS nestedValue,\n" +
+                               "    deepNested.`nested2.`.flag AS 
nestedFlag,\n" +

Review comment:
       how about keep the original test, and add a new test case the verify the 
case which field name contain dot




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to