godfreyhe commented on a change in pull request #13631:
URL: https://github.com/apache/flink/pull/13631#discussion_r505333391



##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
##########
@@ -268,7 +268,8 @@ class RefFieldAccessorVisitor(usedFields: Array[Int]) 
extends RexVisitorImpl[Uni
             // access is top-level access => return top-level access
             case _ :: _ if nestedAccess.equals("*") => List("*")
             // previous access is not prefix of this access => add access
-            case head :: _ if !nestedAccess.startsWith(head) =>
+            // it may cause bug without "." as tail if we have references a.b 
and a.bb
+            case head :: _ if !nestedAccess.startsWith(head + ".") =>

Review comment:
       what if a field name contains `.` ?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriter.scala
##########
@@ -60,3 +79,88 @@ class InputRewriter(fieldMap: Map[Int, Int]) extends 
RexShuttle {
     fieldMap.getOrElse(ref.getIndex,
       throw new IllegalArgumentException("input field contains invalid index"))
 }
+
+/**
+ * A RexShuttle to rewrite field accesses of RexNode with nested projection.
+ * For `RexInputRef`, it works like `InputReWriter` and use the old input
+ * ref index to find the new input fields ref.
+ * For `RexFieldAccess`, it will traverse to the top level of the access and
+ * find the mapping in field fieldMap first. There are 3 situations we need to 
consider:
+ *  1. mapping has the top level access, we should make field access to the 
reference;
+ *  2. mapping has the field, we should make an access;
+ *  3. mapping has no information of the current name, we should keep the full 
name
+ *  of the fields and index of mapping for later lookup.
+ * When the process is back from the recursion, we still have 2 situations 
need to
+ * consider:
+ *  1. we have found the reference of the upper level, we just make an access 
above the
+ *  reference we find before;
+ *  2. we haven't found the reference of the upper level, we concatenate the 
prefix with
+ *  the current field name and look up the new prefix in the mapping. If it's 
in the mapping,
+ *  we create a reference. Otherwise, we should go to the next level with the 
new prefix.
+ */
+class NestedInputRewriter(
+  fieldMap: JMap[Integer, JMap[String, Integer]],
+  rowTypes: JList[RelDataType],
+  builder: RexBuilder) extends RexShuttle {
+
+  override def visitFieldAccess(input: RexFieldAccess): RexNode = {
+    def traverse(fieldAccess: RexFieldAccess): (Int, String, Option[RexNode]) 
= {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          val mapping =
+            fieldMap.getOrElse(ref.getIndex,
+              throw new IllegalArgumentException("input field contains unknown 
index"))
+          if (mapping.contains("*")) {
+            (ref.getIndex,
+              "",
+              Option.apply(builder.makeFieldAccess(
+                new RexInputRef(mapping("*"), rowTypes(mapping("*"))),
+                fieldAccess.getField.getName,
+                false))
+            )
+          } else if(mapping.contains(fieldAccess.getField.getName)) {
+            (ref.getIndex,
+              "",
+              Option.apply(new 
RexInputRef(mapping(fieldAccess.getField.getName),

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushProjectIntoTableSourceScanRuleTest.java
##########
@@ -107,4 +96,13 @@ private void testNestedProject(boolean 
nestedProjectionSupported) {
                util().verifyPlan(sqlQuery);
        }
 
+       @Test
+       public void testComplicatedNestedProject() {
+               String sqlQuery = "SELECT id," +
+                               "    deepNested.nested1.name AS nestedName,\n" +
+                               "    deepNested.nested2 AS nested2,\n" +
+                               "    deepNested.nested2.num AS nestedNum\n" +
+                               "FROM NestedTable";
+               util().verifyPlan(sqlQuery);

Review comment:
       nit: add a test about complex expressions, such as  
deepNested.nested1.name + nested.value

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriter.scala
##########
@@ -60,3 +79,88 @@ class InputRewriter(fieldMap: Map[Int, Int]) extends 
RexShuttle {
     fieldMap.getOrElse(ref.getIndex,
       throw new IllegalArgumentException("input field contains invalid index"))
 }
+
+/**
+ * A RexShuttle to rewrite field accesses of RexNode with nested projection.
+ * For `RexInputRef`, it works like `InputReWriter` and use the old input
+ * ref index to find the new input fields ref.
+ * For `RexFieldAccess`, it will traverse to the top level of the access and
+ * find the mapping in field fieldMap first. There are 3 situations we need to 
consider:
+ *  1. mapping has the top level access, we should make field access to the 
reference;
+ *  2. mapping has the field, we should make an access;
+ *  3. mapping has no information of the current name, we should keep the full 
name
+ *  of the fields and index of mapping for later lookup.
+ * When the process is back from the recursion, we still have 2 situations 
need to
+ * consider:
+ *  1. we have found the reference of the upper level, we just make an access 
above the
+ *  reference we find before;
+ *  2. we haven't found the reference of the upper level, we concatenate the 
prefix with
+ *  the current field name and look up the new prefix in the mapping. If it's 
in the mapping,
+ *  we create a reference. Otherwise, we should go to the next level with the 
new prefix.
+ */
+class NestedInputRewriter(
+  fieldMap: JMap[Integer, JMap[String, Integer]],
+  rowTypes: JList[RelDataType],
+  builder: RexBuilder) extends RexShuttle {
+
+  override def visitFieldAccess(input: RexFieldAccess): RexNode = {
+    def traverse(fieldAccess: RexFieldAccess): (Int, String, Option[RexNode]) 
= {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          val mapping =
+            fieldMap.getOrElse(ref.getIndex,
+              throw new IllegalArgumentException("input field contains unknown 
index"))
+          if (mapping.contains("*")) {
+            (ref.getIndex,
+              "",
+              Option.apply(builder.makeFieldAccess(

Review comment:
       nit: Option.apply() -> Some()

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeRewriter.scala
##########
@@ -60,3 +79,88 @@ class InputRewriter(fieldMap: Map[Int, Int]) extends 
RexShuttle {
     fieldMap.getOrElse(ref.getIndex,
       throw new IllegalArgumentException("input field contains invalid index"))
 }
+
+/**
+ * A RexShuttle to rewrite field accesses of RexNode with nested projection.
+ * For `RexInputRef`, it works like `InputReWriter` and use the old input
+ * ref index to find the new input fields ref.
+ * For `RexFieldAccess`, it will traverse to the top level of the access and
+ * find the mapping in field fieldMap first. There are 3 situations we need to 
consider:
+ *  1. mapping has the top level access, we should make field access to the 
reference;
+ *  2. mapping has the field, we should make an access;
+ *  3. mapping has no information of the current name, we should keep the full 
name
+ *  of the fields and index of mapping for later lookup.
+ * When the process is back from the recursion, we still have 2 situations 
need to
+ * consider:
+ *  1. we have found the reference of the upper level, we just make an access 
above the
+ *  reference we find before;
+ *  2. we haven't found the reference of the upper level, we concatenate the 
prefix with
+ *  the current field name and look up the new prefix in the mapping. If it's 
in the mapping,
+ *  we create a reference. Otherwise, we should go to the next level with the 
new prefix.
+ */
+class NestedInputRewriter(
+  fieldMap: JMap[Integer, JMap[String, Integer]],
+  rowTypes: JList[RelDataType],
+  builder: RexBuilder) extends RexShuttle {
+
+  override def visitFieldAccess(input: RexFieldAccess): RexNode = {
+    def traverse(fieldAccess: RexFieldAccess): (Int, String, Option[RexNode]) 
= {
+      fieldAccess.getReferenceExpr match {
+        case ref: RexInputRef =>
+          val mapping =
+            fieldMap.getOrElse(ref.getIndex,
+              throw new IllegalArgumentException("input field contains unknown 
index"))
+          if (mapping.contains("*")) {
+            (ref.getIndex,
+              "",
+              Option.apply(builder.makeFieldAccess(
+                new RexInputRef(mapping("*"), rowTypes(mapping("*"))),
+                fieldAccess.getField.getName,
+                false))
+            )
+          } else if(mapping.contains(fieldAccess.getField.getName)) {
+            (ref.getIndex,
+              "",
+              Option.apply(new 
RexInputRef(mapping(fieldAccess.getField.getName),
+                rowTypes(mapping(fieldAccess.getField.getName)))))
+          } else {
+            (ref.getIndex, fieldAccess.getField.getName, Option.empty)
+          }
+        case acc: RexFieldAccess =>
+          val (i, prefix, node) = traverse(acc)
+          if (node.isDefined) {
+            (i,
+              "",
+              Option.apply(builder.makeFieldAccess(node.get, 
fieldAccess.getField.getName, false)))
+          } else {
+            val newPrefix = s"$prefix.${fieldAccess.getField.getName}"
+            // we have checked before
+            val mapping = fieldMap(i)
+            if (mapping.contains(newPrefix)) {
+              (i,
+                "",
+                Option.apply(new RexInputRef(mapping(newPrefix), 
rowTypes(mapping(newPrefix)))))
+            } else {
+              (i, newPrefix, Option.empty)
+            }
+          }
+      }
+    }

Review comment:
       it's better we can move this method out of its parent method, which 
could improve code readability 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to