[FLINK-3226] implement getUniqueName method in TranslationContext This closes #1600 and #1567
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7428263 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7428263 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7428263 Branch: refs/heads/tableOnCalcite Commit: e742826326c2719f2f5e06a5e4020f743c3278f9 Parents: 18e7f2f Author: vasia <va...@apache.org> Authored: Thu Feb 11 14:24:24 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Feb 12 11:34:10 2016 +0100 ---------------------------------------------------------------------- .../flink/api/table/plan/RexNodeTranslator.scala | 2 +- .../flink/api/table/plan/TranslationContext.scala | 4 ++++ .../plan/nodes/dataset/DataSetGroupReduce.scala | 2 +- .../api/java/table/test/AggregationsITCase.java | 4 +--- .../api/scala/table/test/AggregationsITCase.scala | 18 +++++++++++++++--- .../api/scala/table/test/ExpressionsITCase.scala | 1 - 6 files changed, 22 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index 07e3924..bad111f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -35,7 +35,7 @@ object RexNodeTranslator { exp match { case agg: Aggregation => - val name = "TMP_" + agg.hashCode().toHexString.toUpperCase + val name = TranslationContext.getUniqueName val aggCall = toAggCall(agg, name, relBuilder) val fieldExp = new UnresolvedFieldReference(name) (fieldExp, List(aggCall)) http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index b2b0c2b..51af8d6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -70,6 +70,10 @@ object TranslationContext { } + def getUniqueName: String = { + "TMP_" + nameCntr.getAndIncrement() + } + def getRelBuilder: RelBuilder = { relBuilder } http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index ad7e0e9..afe09bb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -67,7 +67,7 @@ class DataSetGroupReduce( config: TableConfig, expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { - val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config) + val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, expectedType) // get the output types val fieldsNames = rowType.getFieldNames http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index 8e81893..30598c4 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -62,7 +62,6 @@ public class AggregationsITCase extends MultipleProgramsTestBase { super(mode); } - @Ignore //DataSetMap needs to be implemented @Test public void testAggregationTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -116,8 +115,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Ignore // it seems like the arithmetic expression is added to the field position - @Test(expected = NotImplementedError.class) + @Test public void testAggregationWithArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala index 64f6757..685d4c8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala @@ -32,7 +32,6 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Ignore //DataSetMap needs to be implemented @Test def testAggregationTypes(): Unit = { @@ -71,8 +70,21 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Ignore // it seems like the arithmetic expression is added to the field position - @Test(expected = classOf[NotImplementedError]) + @Test + def testProjection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val t = env.fromElements( + (1: Byte, 1: Short), + (2: Byte, 2: Short)).toTable + .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum) + + val expected = "1,3,2,1,3" + val results = t.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testAggregationWithArithmetic(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index c56ab92..8ab44a3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -77,7 +77,6 @@ class ExpressionsITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Ignore @Test def testCaseInsensitiveForAs(): Unit = {