This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new f6b855d [FLINK-14398][table-planner] Further split input unboxing code into separate methods (#10000) f6b855d is described below commit f6b855d9894bc783c760570053719b7a0c830ebb Author: Hao Dang <hdang.colum...@gmail.com> AuthorDate: Tue Oct 29 20:14:24 2019 -0700 [FLINK-14398][table-planner] Further split input unboxing code into separate methods (#10000) --- .../apache/flink/table/codegen/CodeGenerator.scala | 56 +++++++++++++++++----- .../table/codegen/CollectorCodeGenerator.scala | 22 ++------- .../table/codegen/FunctionCodeGenerator.scala | 26 +++++++--- .../flink/table/codegen/MatchCodeGenerator.scala | 33 ++++++++----- .../flink/table/runtime/stream/sql/SqlITCase.scala | 40 ++++++++++++++++ 5 files changed, 129 insertions(+), 48 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 2a7bb67..ab51a12 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -1052,23 +1052,51 @@ abstract class CodeGenerator( // ---------------------------------------------------------------------------------------------- // generator helping methods // ---------------------------------------------------------------------------------------------- + protected def makeReusableInSplits(expr: GeneratedExpression): GeneratedExpression = { + // prepare declaration in class + val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType) + if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { + reusableMemberStatements.add(s"private boolean ${expr.nullTerm};") + } + reusableMemberStatements.add(s"private $resultTypeTerm ${expr.resultTerm};") - protected def makeReusableInSplits(exprs: Iterable[GeneratedExpression]): Unit = { - // add results of expressions to member area such that all split functions can access it - exprs.foreach { expr => - - // declaration - val resultTypeTerm = primitiveTypeTermForTypeInfo(expr.resultType) - if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { - reusableMemberStatements.add(s"private boolean ${expr.nullTerm};") - } - reusableMemberStatements.add(s"private $resultTypeTerm ${expr.resultTerm};") - - // assignment + // when expr has no code, no need to split it into a method, but still need to assign + if (expr.code.isEmpty) { if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { reusablePerRecordStatements.add(s"this.${expr.nullTerm} = ${expr.nullTerm};") } reusablePerRecordStatements.add(s"this.${expr.resultTerm} = ${expr.resultTerm};") + expr + } else { + // create a method for the unboxing block + val methodName = newName(s"inputUnboxingSplit") + val method = + if (nullCheck && !expr.nullTerm.equals(NEVER_NULL) && !expr.nullTerm.equals(ALWAYS_NULL)) { + s""" + |private final void $methodName() throws Exception { + | ${expr.code} + | this.${expr.nullTerm} = ${expr.nullTerm}; + | this.${expr.resultTerm} = ${expr.resultTerm}; + |} + """.stripMargin + } else { + s""" + |private final void $methodName() throws Exception { + | ${expr.code} + | this.${expr.resultTerm} = ${expr.resultTerm}; + |} + """.stripMargin + } + + // add this method to reusable section for later generation + reusableMemberStatements.add(method) + + // create method call + GeneratedExpression( + expr.resultTerm, + expr.nullTerm, + s"$methodName();", + expr.resultType) } } @@ -1081,7 +1109,9 @@ abstract class CodeGenerator( hasCodeSplits = true // add input unboxing to member area such that all split functions can access it - makeReusableInSplits(reusableInputUnboxingExprs.values) + reusableInputUnboxingExprs.keys.foreach( + key => + reusableInputUnboxingExprs(key) = makeReusableInSplits(reusableInputUnboxingExprs(key))) // add split methods to the member area and return the code necessary to call those methods val methodCalls = splits.map { split => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala index ddc13c5..c437156 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/CollectorCodeGenerator.scala @@ -73,20 +73,6 @@ class CollectorCodeGenerator( val input1TypeClass = boxedTypeTermForTypeInfo(input1) val input2TypeClass = boxedTypeTermForTypeInfo(collectedType) - // declaration in case of code splits - val recordMember = if (hasCodeSplits) { - s"private $input2TypeClass $input2Term;" - } else { - "" - } - - // assignment in case of code splits - val recordAssignment = if (hasCodeSplits) { - s"$input2Term" // use member - } else { - s"$input2TypeClass $input2Term" // local variable - } - reusableMemberStatements ++= filterGenerator.reusableMemberStatements reusableInitStatements ++= filterGenerator.reusableInitStatements reusablePerRecordStatements ++= filterGenerator.reusablePerRecordStatements @@ -94,7 +80,9 @@ class CollectorCodeGenerator( val funcCode = j""" |public class $className extends ${classOf[TableFunctionCollector[_]].getCanonicalName} { | - | $recordMember + | private $input1TypeClass $input1Term; + | private $input2TypeClass $input2Term; + | | ${reuseMemberCode()} | | public $className() throws Exception { @@ -109,8 +97,8 @@ class CollectorCodeGenerator( | @Override | public void collect(Object record) throws Exception { | super.collect(record); - | $input1TypeClass $input1Term = ($input1TypeClass) getInput(); - | $recordAssignment = ($input2TypeClass) record; + | $input1Term = ($input1TypeClass) getInput(); + | $input2Term = ($input2TypeClass) record; | ${reuseInputUnboxingCode()} | ${reusePerRecordCode()} | $bodyCode diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala index 8ac18cd..527e6b0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/FunctionCodeGenerator.scala @@ -101,18 +101,22 @@ class FunctionCodeGenerator( if (clazz == classOf[FlatMapFunction[_, _]]) { val baseClass = classOf[RichFlatMapFunction[_, _]] val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + // declaration: make variable accessible for separated method + reusableMemberStatements.add(s"private $inputTypeTerm $input1Term;") (baseClass, s"void flatMap(Object _in1, $collectorTypeTerm $collectorTerm)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + List(s"$input1Term = ($inputTypeTerm) _in1;")) } // MapFunction else if (clazz == classOf[MapFunction[_, _]]) { val baseClass = classOf[RichMapFunction[_, _]] val inputTypeTerm = boxedTypeTermForTypeInfo(input1) + // declaration: make variable accessible for separated method + reusableMemberStatements.add(s"private $inputTypeTerm $input1Term;") (baseClass, "Object map(Object _in1)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + List(s"$input1Term = ($inputTypeTerm) _in1;")) } // FlatJoinFunction @@ -121,10 +125,13 @@ class FunctionCodeGenerator( val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( throw new CodeGenException("Input 2 for FlatJoinFunction should not be null"))) + // declaration: make variables accessible for separated methods + reusableMemberStatements.add(s"private $inputTypeTerm1 $input1Term;") + reusableMemberStatements.add(s"private $inputTypeTerm2 $input2Term;") (baseClass, s"void join(Object _in1, Object _in2, $collectorTypeTerm $collectorTerm)", - List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", - s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + List(s"$input1Term = ($inputTypeTerm1) _in1;", + s"$input2Term = ($inputTypeTerm2) _in2;")) } // JoinFunction @@ -133,10 +140,13 @@ class FunctionCodeGenerator( val inputTypeTerm1 = boxedTypeTermForTypeInfo(input1) val inputTypeTerm2 = boxedTypeTermForTypeInfo(input2.getOrElse( throw new CodeGenException("Input 2 for JoinFunction should not be null"))) + // declaration: make variables accessible for separated methods + reusableMemberStatements.add(s"private $inputTypeTerm1 $input1Term;") + reusableMemberStatements.add(s"private $inputTypeTerm2 $input2Term;") (baseClass, s"Object join(Object _in1, Object _in2)", - List(s"$inputTypeTerm1 $input1Term = ($inputTypeTerm1) _in1;", - s"$inputTypeTerm2 $input2Term = ($inputTypeTerm2) _in2;")) + List(s"$input1Term = ($inputTypeTerm1) _in1;", + s"$input2Term = ($inputTypeTerm2) _in2;")) } // ProcessFunction @@ -155,10 +165,12 @@ class FunctionCodeGenerator( Nil } + // declaration: make variable accessible for separated method + reusableMemberStatements.add(s"private $inputTypeTerm $input1Term;") (baseClass, s"void processElement(Object _in1, $contextTypeTerm $contextTerm, " + s"$collectorTypeTerm $collectorTerm)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;") ++ globalContext) + List(s"$input1Term = ($inputTypeTerm) _in1;") ++ globalContext) } else { // TODO more functions diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala index 6097178..6064c6c 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/codegen/MatchCodeGenerator.scala @@ -315,20 +315,22 @@ class MatchCodeGenerator( val baseClass = classOf[RichIterativeCondition[_]] val inputTypeTerm = boxedTypeTermForTypeInfo(input) val contextType = classOf[IterativeCondition.Context[_]].getCanonicalName - + // declaration: make variable accessible for separated methods + reusableMemberStatements.add(s"private $inputTypeTerm $input1Term;") (baseClass, s"boolean filter(Object _in1, $contextType $contextTerm)", - List(s"$inputTypeTerm $input1Term = ($inputTypeTerm) _in1;")) + List(s"$input1Term = ($inputTypeTerm) _in1;")) } else if (clazz == classOf[PatternProcessFunction[_, _]]) { val baseClass = classOf[PatternProcessFunction[_, _]] val inputTypeTerm = s"java.util.Map<String, java.util.List<${boxedTypeTermForTypeInfo(input)}>>" val contextTypeTerm = classOf[PatternProcessFunction.Context].getCanonicalName - + // declaration: make variable accessible for separated method + reusableMemberStatements.add(s"private $inputTypeTerm $input1Term;") (baseClass, - s"void processMatch($inputTypeTerm $input1Term, $contextTypeTerm $contextTerm, " + + s"void processMatch($inputTypeTerm _in1, $contextTypeTerm $contextTerm, " + s"$collectorTypeTerm $collectorTerm)", - List()) + List(s"this.$input1Term = ($inputTypeTerm) _in1;")) } else { throw new CodeGenException("Unsupported Function.") } @@ -432,7 +434,7 @@ class MatchCodeGenerator( returnType.fieldNames) aggregatesPerVariable.values.foreach(_.generateAggFunction()) if (hasCodeSplits) { - makeReusableInSplits(reusableAggregationExpr.values) + makeReusableInSplits() } exp @@ -442,12 +444,18 @@ class MatchCodeGenerator( val exp = call.accept(this) aggregatesPerVariable.values.foreach(_.generateAggFunction()) if (hasCodeSplits) { - makeReusableInSplits(reusableAggregationExpr.values) + makeReusableInSplits() } exp } + private def makeReusableInSplits(): Unit = { + reusableAggregationExpr.keys.foreach( + key => + reusableAggregationExpr(key) = makeReusableInSplits(reusableAggregationExpr(key))) + } + override def visitCall(call: RexCall): GeneratedExpression = { call.getOperator match { case PREV | NEXT => @@ -537,11 +545,13 @@ class MatchCodeGenerator( } else { "" } + + reusableMemberStatements.add(s"java.util.List $listName = new java.util.ArrayList();") val listCode = if (patternName == ALL_PATTERN_VARIABLE) { addReusablePatternNames() val patternTerm = newName("pattern") j""" - |java.util.List $listName = new java.util.ArrayList(); + |$listName = new java.util.ArrayList(); |for (String $patternTerm : $patternNamesTerm) { | for ($eventTypeTerm $eventNameTerm : | $contextTerm.getEventsForPattern($patternTerm)) { @@ -552,7 +562,7 @@ class MatchCodeGenerator( } else { val escapedPatternName = EncodingUtils.escapeJava(patternName) j""" - |java.util.List $listName = new java.util.ArrayList(); + |$listName = new java.util.ArrayList(); |for ($eventTypeTerm $eventNameTerm : | $contextTerm.getEventsForPattern("$escapedPatternName")) { | $listName.add($eventNameTerm); @@ -572,13 +582,14 @@ class MatchCodeGenerator( private def generateMeasurePatternVariableExp(patternName: String): GeneratedPatternList = { val listName = newName("patternEvents") + reusableMemberStatements.add(s"java.util.List $listName = new java.util.ArrayList();") val code = if (patternName == ALL_PATTERN_VARIABLE) { addReusablePatternNames() val patternTerm = newName("pattern") j""" - |java.util.List $listName = new java.util.ArrayList(); + |$listName = new java.util.ArrayList(); |for (String $patternTerm : $patternNamesTerm) { | java.util.List rows = (java.util.List) $input1Term.get($patternTerm); | if (rows != null) { @@ -589,7 +600,7 @@ class MatchCodeGenerator( } else { val escapedPatternName = EncodingUtils.escapeJava(patternName) j""" - |java.util.List $listName = (java.util.List) $input1Term.get("$escapedPatternName"); + |$listName = (java.util.List) $input1Term.get("$escapedPatternName"); |if ($listName == null) { | $listName = java.util.Collections.emptyList(); |} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala index 2bdaa86..44e9ead 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala @@ -39,6 +39,7 @@ import org.junit.Assert._ import org.junit._ import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer class SqlITCase extends StreamingWithStateTestBase { @@ -870,6 +871,45 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(List(expected.toString()), StreamITCase.testResults.sorted) } + + @Test + def testProjectionWithManyColumns(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = StreamTableEnvironment.create(env) + StreamITCase.clear + + // force code split + tEnv.getConfig.setMaxGeneratedCodeLength(1) + + val length = 1000 + val rowData = List.range(0, length) + val row: Row = new Row(length) + val fieldTypes = new ArrayBuffer[TypeInformation[_]]() + val fieldNames = new ArrayBuffer[String]() + rowData.foreach { i => + row.setField(i, i) + fieldTypes += Types.INT() + fieldNames += s"f$i" + } + + val data = new mutable.MutableList[Row] + data.+=(row) + val t = env.fromCollection(data)(new RowTypeInfo(fieldTypes.toArray: _*)).toTable(tEnv) + tEnv.registerTable("MyTable", t) + + val expected = List(rowData.reverse.mkString(",")) + val sql = + s""" + |SELECT ${fieldNames.reverse.mkString(", ")} FROM MyTable + """.stripMargin + + val result = tEnv.sqlQuery(sql).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + assertEquals(expected, StreamITCase.testResults) + } } object SqlITCase {