[FLINK-3226] implement getUniqueName method in TranslationContext

This closes #1600 and #1567


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7428263
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7428263
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7428263

Branch: refs/heads/tableOnCalcite
Commit: e742826326c2719f2f5e06a5e4020f743c3278f9
Parents: 18e7f2f
Author: vasia <va...@apache.org>
Authored: Thu Feb 11 14:24:24 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Feb 12 11:34:10 2016 +0100

----------------------------------------------------------------------
 .../flink/api/table/plan/RexNodeTranslator.scala  |  2 +-
 .../flink/api/table/plan/TranslationContext.scala |  4 ++++
 .../plan/nodes/dataset/DataSetGroupReduce.scala   |  2 +-
 .../api/java/table/test/AggregationsITCase.java   |  4 +---
 .../api/scala/table/test/AggregationsITCase.scala | 18 +++++++++++++++---
 .../api/scala/table/test/ExpressionsITCase.scala  |  1 -
 6 files changed, 22 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index 07e3924..bad111f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -35,7 +35,7 @@ object RexNodeTranslator {
 
     exp match {
       case agg: Aggregation =>
-        val name = "TMP_" + agg.hashCode().toHexString.toUpperCase
+        val name = TranslationContext.getUniqueName
         val aggCall = toAggCall(agg, name, relBuilder)
         val fieldExp = new UnresolvedFieldReference(name)
         (fieldExp, List(aggCall))

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index b2b0c2b..51af8d6 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -70,6 +70,10 @@ object TranslationContext {
 
   }
 
+  def getUniqueName: String = {
+    "TMP_" + nameCntr.getAndIncrement()
+  }
+
   def getRelBuilder: RelBuilder = {
     relBuilder
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
index ad7e0e9..afe09bb 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala
@@ -67,7 +67,7 @@ class DataSetGroupReduce(
       config: TableConfig,
       expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
 
-    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config)
+    val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(config, 
expectedType)
 
     // get the output types
     val fieldsNames = rowType.getFieldNames

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
index 8e81893..30598c4 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java
@@ -62,7 +62,6 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                super(mode);
        }
 
-       @Ignore //DataSetMap needs to be implemented
        @Test
        public void testAggregationTypes() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
@@ -116,8 +115,7 @@ public class AggregationsITCase extends 
MultipleProgramsTestBase {
                compareResultAsText(results, expected);
        }
 
-       @Ignore // it seems like the arithmetic expression is added to the 
field position
-       @Test(expected = NotImplementedError.class)
+       @Test
        public void testAggregationWithArithmetic() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                TableEnvironment tableEnv = new TableEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
index 64f6757..685d4c8 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala
@@ -32,7 +32,6 @@ import scala.collection.JavaConverters._
 @RunWith(classOf[Parameterized])
 class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
 
-  @Ignore //DataSetMap needs to be implemented
   @Test
   def testAggregationTypes(): Unit = {
 
@@ -71,8 +70,21 @@ class AggregationsITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Ignore // it seems like the arithmetic expression is added to the field 
position
-  @Test(expected = classOf[NotImplementedError])
+  @Test
+  def testProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val t = env.fromElements(
+      (1: Byte, 1: Short),
+      (2: Byte, 2: Short)).toTable
+      .select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+
+    val expected = "1,3,2,1,3"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
   def testAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/e7428263/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
index c56ab92..8ab44a3 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala
@@ -77,7 +77,6 @@ class ExpressionsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Ignore
   @Test
   def testCaseInsensitiveForAs(): Unit = {
 

Reply via email to