Repository: flink Updated Branches: refs/heads/master 536e4b352 -> 614acc3e7
[FLINK-5357] [table] Fix dropped projections This closes #3063. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/614acc3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/614acc3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/614acc3e Branch: refs/heads/master Commit: 614acc3e7b86ff40364fcfd62b3064c93755379a Parents: 536e4b3 Author: twalthr <twal...@apache.org> Authored: Thu Jan 5 14:14:52 2017 +0100 Committer: twalthr <twal...@apache.org> Committed: Tue Jan 10 11:22:04 2017 +0100 ---------------------------------------------------------------------- .../flink/table/plan/logical/operators.scala | 17 ++++--------- .../scala/batch/table/FieldProjectionTest.scala | 25 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/614acc3e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index eae42cd..743bdfe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -92,20 +92,11 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend } override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = { - val allAlias = projectList.forall(_.isInstanceOf[Alias]) child.construct(relBuilder) - if (allAlias) { - // Calcite's RelBuilder does not translate identity projects even if they rename fields. - // Add a projection ourselves (will be automatically removed by translation rules). - val project = LogicalProject.create(relBuilder.peek(), - // avoid AS call - projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava, - projectList.map(_.name).asJava) - relBuilder.build() // pop previous relNode - relBuilder.push(project) - } else { - relBuilder.project(projectList.map(_.toRexNode(relBuilder)): _*) - } + relBuilder.project( + projectList.map(_.toRexNode(relBuilder)).asJava, + projectList.map(_.name).asJava, + true) } } http://git-wip-us.apache.org/repos/asf/flink/blob/614acc3e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index a80e0cb..cc691d2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -280,6 +280,30 @@ class FieldProjectionTest extends TableTestBase { streamUtil.verifyTable(resultTable, expected) } + @Test + def testSelectFromAggregatedPojoTable(): Unit = { + val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency) + val resultTable = sourceTable + .groupBy('word) + .select('word, 'frequency.sum as 'frequency) + .filter('frequency === 2) + + val expected = + unaryNode( + "DataSetCalc", + unaryNode( + "DataSetAggregate", + batchTableNode(0), + term("groupBy", "word"), + term("select", "word", "SUM(frequency) AS TMP_0") + ), + term("select", "word, frequency"), + term("where", "=(frequency, 2)") + ) + + util.verifyTable(resultTable, expected) + } + @Test(expected = classOf[ValidationException]) def testSelectFromBatchWindow1(): Unit = { val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd) @@ -315,4 +339,5 @@ object FieldProjectionTest { def eval(s: String): Int = s.hashCode() } + case class WC(word: String, frequency: Long) }