This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.5 by this push:
     new 7a14257  [FLINK-10670] [table] Fix Correlate codegen error
7a14257 is described below

commit 7a142575a5f032ea596242d3d654c1383b853879
Author: Xpray <leonxp...@gmail.com>
AuthorDate: Thu Oct 25 00:02:13 2018 +0800

    [FLINK-10670] [table] Fix Correlate codegen error
    
    This closes #6923.
---
 .../apache/flink/table/codegen/CodeGenerator.scala  | 14 +++++++-------
 .../table/codegen/CollectorCodeGenerator.scala      | 11 ++++++++---
 .../flink/table/plan/nodes/CommonCorrelate.scala    | 15 +++------------
 .../runtime/stream/table/CorrelateITCase.scala      | 21 +++++++++++++++++++++
 4 files changed, 39 insertions(+), 22 deletions(-)

diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 57e3809..f5c4d11 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -109,39 +109,39 @@ abstract class CodeGenerator(
 
   // set of member statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableMemberStatements: mutable.LinkedHashSet[String] =
+  val reusableMemberStatements: mutable.LinkedHashSet[String] =
     mutable.LinkedHashSet[String]()
 
   // set of constructor statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableInitStatements: mutable.LinkedHashSet[String] =
+  val reusableInitStatements: mutable.LinkedHashSet[String] =
     mutable.LinkedHashSet[String]()
 
   // set of open statements for RichFunction that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableOpenStatements: mutable.LinkedHashSet[String] =
+  val reusableOpenStatements: mutable.LinkedHashSet[String] =
     mutable.LinkedHashSet[String]()
 
   // set of close statements for RichFunction that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableCloseStatements: mutable.LinkedHashSet[String] =
+  val reusableCloseStatements: mutable.LinkedHashSet[String] =
     mutable.LinkedHashSet[String]()
 
   // set of statements that will be added only once per record;
   // code should only update member variables because local variables are not 
accessible if
   // the code needs to be split;
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
+  val reusablePerRecordStatements: mutable.LinkedHashSet[String] =
     mutable.LinkedHashSet[String]()
 
   // map of initial input unboxing expressions that will be added only once
   // (inputTerm, index) -> expr
-  protected val reusableInputUnboxingExprs: mutable.Map[(String, Int), 
GeneratedExpression] =
+  val reusableInputUnboxingExprs: mutable.Map[(String, Int), 
GeneratedExpression] =
     mutable.Map[(String, Int), GeneratedExpression]()
 
   // set of constructor statements that will be added only once
   // we use a LinkedHashSet to keep the insertion order
-  protected val reusableConstructorStatements: mutable.LinkedHashSet[(String, 
String)] =
+  val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] =
     mutable.LinkedHashSet[(String, String)]()
 
   /**
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
index 85d858f..ddc13c5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala
@@ -59,13 +59,14 @@ class CollectorCodeGenerator(
     *             valid Java class identifier.
     * @param bodyCode body code for the collector method
     * @param collectedType The type information of the element collected by 
the collector
+    * @param filterGenerator generator containing context information for the 
generated body code
     * @return instance of GeneratedCollector
     */
   def generateTableFunctionCollector(
       name: String,
       bodyCode: String,
       collectedType: TypeInformation[Any],
-      codeGenerator: CodeGenerator)
+      filterGenerator: CodeGenerator)
     : GeneratedCollector = {
 
     val className = newName(name)
@@ -86,6 +87,10 @@ class CollectorCodeGenerator(
       s"$input2TypeClass $input2Term" // local variable
     }
 
+    reusableMemberStatements ++= filterGenerator.reusableMemberStatements
+    reusableInitStatements ++= filterGenerator.reusableInitStatements
+    reusablePerRecordStatements ++= filterGenerator.reusablePerRecordStatements
+
     val funcCode = j"""
       |public class $className extends 
${classOf[TableFunctionCollector[_]].getCanonicalName} {
       |
@@ -98,7 +103,7 @@ class CollectorCodeGenerator(
       |
       |  @Override
       |  public void open(${classOf[Configuration].getCanonicalName} 
parameters) throws Exception {
-      |    ${codeGenerator.reuseOpenCode()}
+      |    ${filterGenerator.reuseOpenCode()}
       |  }
       |
       |  @Override
@@ -113,7 +118,7 @@ class CollectorCodeGenerator(
       |
       |  @Override
       |  public void close() throws Exception {
-      |    ${codeGenerator.reuseCloseCode()}
+      |    ${filterGenerator.reuseCloseCode()}
       |  }
       |}
       |""".stripMargin
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
index 3475e19..2587687 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle}
+import org.apache.calcite.rex.{RexCall, RexNode}
 import org.apache.calcite.sql.SemiJoinType
 import org.apache.flink.api.common.functions.Function
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -149,18 +149,9 @@ trait CommonCorrelate {
          |getCollector().collect(${crossResultExpr.resultTerm});
          |""".stripMargin
     } else {
-
-      // adjust indices of InputRefs to adhere to schema expected by generator
-      val changeInputRefIndexShuttle = new RexShuttle {
-        override def visitInputRef(inputRef: RexInputRef): RexNode = {
-          new RexInputRef(inputSchema.arity + inputRef.getIndex, 
inputRef.getType)
-        }
-      }
-      // Run generateExpression to add init statements (ScalarFunctions) of 
condition to generator.
-      //   The generated expression is discarded.
-      
generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle))
-
       filterGenerator.input1Term = filterGenerator.input2Term
+      // generating filter expressions might add init statements and member 
fields
+      // that need to be combined with the statements of the collector code 
generator
       val filterCondition = filterGenerator.generateExpression(condition.get)
       s"""
          |${filterGenerator.reuseInputUnboxingCode()}
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
index f4b1f80..60e03d4 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -284,6 +284,27 @@ class CorrelateITCase extends AbstractTestBase {
     )
   }
 
+  @Test
+  def testTableFunctionCollectorInit(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    // this case will generate 'timestamp' member field and 'DateFormatter'
+    val result = t
+      .join(func0('c) as('d, 'e))
+      .where(dateFormat(currentTimestamp(), "yyyyMMdd") === 'd)
+      .select('c, 'd, 'e)
+      .toAppendStream[Row]
+
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    assertEquals(
+      Seq(),
+      StreamITCase.testResults.sorted
+    )
+  }
+
   private def testData(
       env: StreamExecutionEnvironment)
     : DataStream[(Int, Long, String)] = {

Reply via email to