This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.5 by this push: new 7a14257 [FLINK-10670] [table] Fix Correlate codegen error 7a14257 is described below commit 7a142575a5f032ea596242d3d654c1383b853879 Author: Xpray <leonxp...@gmail.com> AuthorDate: Thu Oct 25 00:02:13 2018 +0800 [FLINK-10670] [table] Fix Correlate codegen error This closes #6923. --- .../apache/flink/table/codegen/CodeGenerator.scala | 14 +++++++------- .../table/codegen/CollectorCodeGenerator.scala | 11 ++++++++--- .../flink/table/plan/nodes/CommonCorrelate.scala | 15 +++------------ .../runtime/stream/table/CorrelateITCase.scala | 21 +++++++++++++++++++++ 4 files changed, 39 insertions(+), 22 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 57e3809..f5c4d11 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -109,39 +109,39 @@ abstract class CodeGenerator( // set of member statements that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableMemberStatements: mutable.LinkedHashSet[String] = + val reusableMemberStatements: mutable.LinkedHashSet[String] = mutable.LinkedHashSet[String]() // set of constructor statements that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableInitStatements: mutable.LinkedHashSet[String] = + val reusableInitStatements: mutable.LinkedHashSet[String] = mutable.LinkedHashSet[String]() // set of open statements for RichFunction that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableOpenStatements: mutable.LinkedHashSet[String] = + val reusableOpenStatements: mutable.LinkedHashSet[String] = mutable.LinkedHashSet[String]() // set of close statements for RichFunction that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableCloseStatements: mutable.LinkedHashSet[String] = + val reusableCloseStatements: mutable.LinkedHashSet[String] = mutable.LinkedHashSet[String]() // set of statements that will be added only once per record; // code should only update member variables because local variables are not accessible if // the code needs to be split; // we use a LinkedHashSet to keep the insertion order - protected val reusablePerRecordStatements: mutable.LinkedHashSet[String] = + val reusablePerRecordStatements: mutable.LinkedHashSet[String] = mutable.LinkedHashSet[String]() // map of initial input unboxing expressions that will be added only once // (inputTerm, index) -> expr - protected val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = + val reusableInputUnboxingExprs: mutable.Map[(String, Int), GeneratedExpression] = mutable.Map[(String, Int), GeneratedExpression]() // set of constructor statements that will be added only once // we use a LinkedHashSet to keep the insertion order - protected val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] = + val reusableConstructorStatements: mutable.LinkedHashSet[(String, String)] = mutable.LinkedHashSet[(String, String)]() /** diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala index 85d858f..ddc13c5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala @@ -59,13 +59,14 @@ class CollectorCodeGenerator( * valid Java class identifier. * @param bodyCode body code for the collector method * @param collectedType The type information of the element collected by the collector + * @param filterGenerator generator containing context information for the generated body code * @return instance of GeneratedCollector */ def generateTableFunctionCollector( name: String, bodyCode: String, collectedType: TypeInformation[Any], - codeGenerator: CodeGenerator) + filterGenerator: CodeGenerator) : GeneratedCollector = { val className = newName(name) @@ -86,6 +87,10 @@ class CollectorCodeGenerator( s"$input2TypeClass $input2Term" // local variable } + reusableMemberStatements ++= filterGenerator.reusableMemberStatements + reusableInitStatements ++= filterGenerator.reusableInitStatements + reusablePerRecordStatements ++= filterGenerator.reusablePerRecordStatements + val funcCode = j""" |public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} { | @@ -98,7 +103,7 @@ class CollectorCodeGenerator( | | @Override | public void open(${classOf[Configuration].getCanonicalName} parameters) throws Exception { - | ${codeGenerator.reuseOpenCode()} + | ${filterGenerator.reuseOpenCode()} | } | | @Override @@ -113,7 +118,7 @@ class CollectorCodeGenerator( | | @Override | public void close() throws Exception { - | ${codeGenerator.reuseCloseCode()} + | ${filterGenerator.reuseCloseCode()} | } |} |""".stripMargin diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala index 3475e19..2587687 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -18,7 +18,7 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode, RexShuttle} +import org.apache.calcite.rex.{RexCall, RexNode} import org.apache.calcite.sql.SemiJoinType import org.apache.flink.api.common.functions.Function import org.apache.flink.api.common.typeinfo.TypeInformation @@ -149,18 +149,9 @@ trait CommonCorrelate { |getCollector().collect(${crossResultExpr.resultTerm}); |""".stripMargin } else { - - // adjust indices of InputRefs to adhere to schema expected by generator - val changeInputRefIndexShuttle = new RexShuttle { - override def visitInputRef(inputRef: RexInputRef): RexNode = { - new RexInputRef(inputSchema.arity + inputRef.getIndex, inputRef.getType) - } - } - // Run generateExpression to add init statements (ScalarFunctions) of condition to generator. - // The generated expression is discarded. - generator.generateExpression(condition.get.accept(changeInputRefIndexShuttle)) - filterGenerator.input1Term = filterGenerator.input2Term + // generating filter expressions might add init statements and member fields + // that need to be combined with the statements of the collector code generator val filterCondition = filterGenerator.generateExpression(condition.get) s""" |${filterGenerator.reuseInputUnboxingCode()} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala index f4b1f80..60e03d4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala @@ -284,6 +284,27 @@ class CorrelateITCase extends AbstractTestBase { ) } + @Test + def testTableFunctionCollectorInit(): Unit = { + val t = testData(env).toTable(tEnv).as('a, 'b, 'c) + val func0 = new TableFunc0 + + // this case will generate 'timestamp' member field and 'DateFormatter' + val result = t + .join(func0('c) as('d, 'e)) + .where(dateFormat(currentTimestamp(), "yyyyMMdd") === 'd) + .select('c, 'd, 'e) + .toAppendStream[Row] + + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + assertEquals( + Seq(), + StreamITCase.testResults.sorted + ) + } + private def testData( env: StreamExecutionEnvironment) : DataStream[(Int, Long, String)] = {