Repository: flink
Updated Branches:
  refs/heads/master 28ab73750 -> f37988c19


http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 084ee14..de6cbfa 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -20,9 +20,9 @@ package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
 import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
+import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.expressions.{RowtimeAttribute, WindowReference}
+import org.apache.flink.table.expressions.WindowReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, 
unaryNode}
@@ -40,46 +40,10 @@ class GroupWindowTest extends TableTestBase {
       .select('string, 'string.start) // property in non windowed table
   }
 
-  @Test(expected = classOf[TableException])
-  def testInvalidRowtime1(): Unit = {
-    val util = streamTestUtil()
-    // rowtime attribute must not be a field name
-    util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowtime2(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .select('string, 'int as 'rowtime) // rowtime attribute must not be an 
alias
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowtime3(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an 
alias
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowtime4(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-    // only rowtime is a valid time attribute in a stream environment
-      .window(Tumble over 50.milli on 'string as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-  }
-
   @Test(expected = classOf[ValidationException])
   def testGroupByWithoutWindowAlias(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
       .window(Tumble over 5.milli on 'long as 'w)
@@ -90,7 +54,7 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidRowTimeRef(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
       .window(Tumble over 5.milli on 'long as 'w)
@@ -104,10 +68,10 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidTumblingSize(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
-      .window(Tumble over "WRONG" as 'w) // string is not a valid interval
+      .window(Tumble over "WRONG" on 'long as 'w) // string is not a valid 
interval
       .groupBy('w, 'string)
       .select('string, 'int.count)
   }
@@ -127,10 +91,10 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidSlidingSize(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
-      .window(Slide over "WRONG" every "WRONG" as 'w) // string is not a valid 
interval
+      .window(Slide over "WRONG" every "WRONG" on 'long as 'w) // string is 
not a valid interval
       .groupBy('w, 'string)
       .select('string, 'int.count)
   }
@@ -138,10 +102,11 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidSlidingSlide(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
-      .window(Slide over 12.rows every 1.minute as 'w) // row and time 
intervals may not be mixed
+      // row and time intervals may not be mixed
+      .window(Slide over 12.rows every 1.minute on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
   }
@@ -161,10 +126,11 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidSessionGap(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
-      .window(Session withGap 10.rows as 'w) // row interval is not valid for 
session windows
+      // row interval is not valid for session windows
+      .window(Session withGap 10.rows on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
   }
@@ -172,10 +138,10 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidWindowAlias1(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
-      .window(Session withGap 100.milli as 1 + 1) // expression instead of a 
symbol
+      .window(Session withGap 100.milli on 'long as 1 + 1) // expression 
instead of a symbol
       .groupBy('string)
       .select('string, 'int.count)
   }
@@ -183,10 +149,11 @@ class GroupWindowTest extends TableTestBase {
   @Test(expected = classOf[ValidationException])
   def testInvalidWindowAlias2(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     table
-      .window(Session withGap 100.milli as 'string) // field name "string" is 
already present
+      // field name "string" is already present
+      .window(Session withGap 100.milli on 'long as 'string)
       .groupBy('string)
       .select('string, 'int.count)
   }
@@ -195,7 +162,7 @@ class GroupWindowTest extends TableTestBase {
   def testSessionUdAggWithInvalidArgs(): Unit = {
     val util = streamTestUtil()
     val weightedAvg = new WeightedAvgWithMerge
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'rowtime.rowtime)
 
     table
       .window(Session withGap 2.hours on 'rowtime as 'w)
@@ -203,16 +170,17 @@ class GroupWindowTest extends TableTestBase {
       .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
   }
 
+  @Ignore // TODO
   @Test
   def testMultiWindow(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Tumble over 50.milli as 'w1)
+      .window(Tumble over 50.milli on 'proctime as 'w1)
       .groupBy('w1, 'string)
-      .select('string, 'int.count)
-      .window(Slide over 20.milli every 10.milli as 'w2)
+      .select('w.end as 'proctime, 'string, 'int.count)
+      .window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
       .groupBy('w2)
       .select('string.count)
 
@@ -230,8 +198,9 @@ class GroupWindowTest extends TableTestBase {
           term("groupBy", "string"),
           term(
             "window",
-            ProcessingTimeTumblingGroupWindow(
+            TumblingGroupWindow(
               WindowReference("w1"),
+              'proctime,
               50.milli)),
           term("select", "string", "COUNT(int) AS TMP_0")
         ),
@@ -239,9 +208,11 @@ class GroupWindowTest extends TableTestBase {
       ),
       term(
         "window",
-        ProcessingTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w2"),
-          20.milli, 10.milli)),
+          'proctime,
+          20.milli,
+          10.milli)),
       term("select", "COUNT(string) AS TMP_1")
     )
     util.verifyTable(windowedTable, expected)
@@ -250,10 +221,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Tumble over 50.milli as 'w)
+      .window(Tumble over 50.milli on 'proctime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -262,13 +233,14 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "string", "int")
+        term("select", "string", "int", "proctime")
       ),
       term("groupBy", "string"),
       term(
         "window",
-        ProcessingTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
+          'proctime,
           50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -279,38 +251,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 2.rows as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        ProcessingTimeTumblingGroupWindow(
-          WindowReference("w"), 2.rows)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .window(Tumble over 2.rows on 'proctime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -319,15 +263,15 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "string", "int")
+        term("select", "string", "int", "proctime")
       ),
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(),
-          5.milli)),
+          'proctime,
+          2.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -335,13 +279,12 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testEventTimeTumblingGroupWindowOverCount(): Unit = {
+  def testEventTimeTumblingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Tumble over 2.rows on 'rowtime as 'w)
+      .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -351,9 +294,10 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(), 2.rows)),
+          'long,
+          5.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -363,7 +307,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeTumblingGroupWindowWithUdAgg(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'rowtime.rowtime)
 
     val weightedAvg = new WeightedAvgWithMerge
 
@@ -378,9 +322,9 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(),
+          'rowtime,
           5.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
@@ -391,10 +335,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Slide over 50.milli every 50.milli as 'w)
+      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -403,14 +347,16 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "string", "int")
+        term("select", "string", "int", "proctime")
       ),
       term("groupBy", "string"),
       term(
         "window",
-        ProcessingTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          50.milli, 50.milli)),
+          'proctime,
+          50.milli,
+          50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -420,10 +366,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows as 'w)
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -432,14 +378,16 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "string", "int")
+        term("select", "string", "int", "proctime")
       ),
       term("groupBy", "string"),
       term(
         "window",
-        ProcessingTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          2.rows, 1.rows)),
+          'proctime,
+          2.rows,
+          1.rows)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -449,40 +397,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeSlidingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
-      term("groupBy", "string"),
-      term(
-        "window",
-        EventTimeSlidingGroupWindow(
-          WindowReference("w"),
-          RowtimeAttribute(), 8.milli, 10.milli)),
-      term("select", "string", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'rowtime as 'w)
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -492,9 +410,11 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(), 2.rows, 1.rows)),
+          'long,
+          8.milli,
+          10.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -504,7 +424,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeSlidingGroupWindowWithUdAgg(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'rowtime.rowtime)
 
     val weightedAvg = new WeightedAvgWithMerge
 
@@ -519,9 +439,7 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeSlidingGroupWindow(
-          WindowReference("w"),
-          RowtimeAttribute(), 8.milli, 10.milli)),
+        SlidingGroupWindow(WindowReference("w"), 'rowtime, 8.milli, 10.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -531,26 +449,23 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeSessionGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .window(Session withGap 7.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
     val expected = unaryNode(
       "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
+      streamTableNode(0),
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeSessionGroupWindow(
+        SessionGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(), 7.milli)),
+          'long,
+          7.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
 
@@ -560,7 +475,7 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testEventTimeSessionGroupWindowWithUdAgg(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'rowtime.rowtime)
 
     val weightedAvg = new WeightedAvgWithMerge
 
@@ -575,9 +490,7 @@ class GroupWindowTest extends TableTestBase {
       term("groupBy", "string"),
       term(
         "window",
-        EventTimeSessionGroupWindow(
-          WindowReference("w"),
-          RowtimeAttribute(), 7.milli)),
+        SessionGroupWindow(WindowReference("w"), 'rowtime, 7.milli)),
       term("select", "string", "WeightedAvgWithMerge(long, int) AS TMP_0")
     )
 
@@ -587,10 +500,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Tumble over 50.milli as 'w)
+      .window(Tumble over 50.milli on 'proctime as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count)
 
@@ -599,13 +512,14 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "string", "int")
+        term("select", "string", "int", "proctime")
       ),
       term("groupBy", "string"),
       term(
         "window",
-        ProcessingTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
+          'proctime,
           50.milli)),
       term("select", "string", "COUNT(int) AS TMP_0")
     )
@@ -616,10 +530,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Tumble over 2.rows as 'w)
+      .window(Tumble over 2.rows on 'proctime as 'w)
       .groupBy('w)
       .select('int.count)
 
@@ -628,12 +542,13 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "int")
+        term("select", "int", "proctime")
       ),
       term(
         "window",
-        ProcessingTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
+          'proctime,
           2.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
@@ -644,39 +559,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term(
-        "window",
-        EventTimeTumblingGroupWindow(
-          WindowReference("w"),
-          RowtimeAttribute(), 5.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Tumble over 2.rows on 'rowtime as 'w)
+      .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('w)
       .select('int.count)
 
@@ -685,27 +571,27 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "int")
+        term("select", "int", "long")
       ),
       term(
         "window",
-        EventTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(), 2.rows)),
+          'long,
+          5.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
     util.verifyTable(windowedTable, expected)
   }
 
-
   @Test
   def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Slide over 50.milli every 50.milli as 'w)
+      .window(Slide over 50.milli every 50.milli on 'proctime as 'w)
       .groupBy('w)
       .select('int.count)
 
@@ -714,13 +600,15 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "int")
+        term("select", "int", "proctime")
       ),
       term(
         "window",
-        ProcessingTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          50.milli, 50.milli)),
+          'proctime,
+          50.milli,
+          50.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -730,10 +618,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'proctime.proctime)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows as 'w)
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
       .groupBy('w)
       .select('int.count)
 
@@ -742,13 +630,15 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "int")
+        term("select", "int", "proctime")
       ),
       term(
         "window",
-        ProcessingTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          2.rows, 1.rows)),
+          'proctime,
+          2.rows,
+          1.rows)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -758,39 +648,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val windowedTable = table
-      .window(Slide over 8.milli every 10.milli on 'rowtime as 'w)
-      .groupBy('w)
-      .select('int.count)
-
-    val expected = unaryNode(
-      "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "int")
-      ),
-      term(
-        "window",
-        EventTimeSlidingGroupWindow(
-          WindowReference("w"),
-          RowtimeAttribute(), 8.milli, 10.milli)),
-      term("select", "COUNT(int) AS TMP_0")
-    )
-
-    util.verifyTable(windowedTable, expected)
-  }
-
-  @Test
-  @Ignore // see comments in DataStreamAggregate
-  def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Slide over 2.rows every 1.rows on 'rowtime as 'w)
+      .window(Slide over 8.milli every 10.milli on 'long as 'w)
       .groupBy('w)
       .select('int.count)
 
@@ -799,13 +660,15 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "int")
+        term("select", "int", "long")
       ),
       term(
         "window",
-        EventTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(), 2.rows, 1.rows)),
+          'long,
+          8.milli,
+          10.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -815,10 +678,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Session withGap 7.milli on 'rowtime as 'w)
+      .window(Session withGap 7.milli on 'long as 'w)
       .groupBy('w)
       .select('int.count)
 
@@ -827,13 +690,14 @@ class GroupWindowTest extends TableTestBase {
       unaryNode(
         "DataStreamCalc",
         streamTableNode(0),
-        term("select", "int")
+        term("select", "int", "long")
       ),
       term(
         "window",
-        EventTimeSessionGroupWindow(
+        SessionGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(), 7.milli)),
+          'long,
+          7.milli)),
       term("select", "COUNT(int) AS TMP_0")
     )
 
@@ -843,25 +707,21 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testTumbleWindowStartEnd(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Tumble over 5.milli on 'rowtime as 'w)
+      .window(Tumble over 5.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
       "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
+      streamTableNode(0),
       term("groupBy", "string"),
       term("window",
-        EventTimeTumblingGroupWindow(
+        TumblingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(),
+          'long,
           5.milli)),
       term("select",
         "string",
@@ -876,25 +736,21 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testSlideWindowStartEnd(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .window(Slide over 10.milli every 5.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
     val expected = unaryNode(
       "DataStreamAggregate",
-      unaryNode(
-        "DataStreamCalc",
-        streamTableNode(0),
-        term("select", "string", "int")
-      ),
+      streamTableNode(0),
       term("groupBy", "string"),
       term("window",
-        EventTimeSlidingGroupWindow(
+        SlidingGroupWindow(
           WindowReference("w"),
-          RowtimeAttribute(),
+          'long,
           10.milli,
           5.milli)),
       term("select",
@@ -910,10 +766,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testSessionWindowStartWithTwoEnd(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Session withGap 3.milli on 'rowtime as 'w)
+      .window(Session withGap 3.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 
'w.end as 'we2)
 
@@ -921,16 +777,12 @@ class GroupWindowTest extends TableTestBase {
       "DataStreamCalc",
       unaryNode(
         "DataStreamAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "string", "int")
-        ),
+        streamTableNode(0),
         term("groupBy", "string"),
         term("window",
-          EventTimeSessionGroupWindow(
+          SessionGroupWindow(
             WindowReference("w"),
-            RowtimeAttribute(),
+            'long,
             3.milli)),
         term("select",
           "string",
@@ -947,10 +799,10 @@ class GroupWindowTest extends TableTestBase {
   @Test
   def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
     val util = streamTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+    val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 
'string)
 
     val windowedTable = table
-      .window(Tumble over 5.millis on 'rowtime as 'w)
+      .window(Tumble over 5.millis on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 
'x, 'w.start as 'x2,
         'w.end as 'x3, 'w.end)
@@ -959,16 +811,12 @@ class GroupWindowTest extends TableTestBase {
       "DataStreamCalc",
       unaryNode(
         "DataStreamAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "string", "int")
-        ),
+        streamTableNode(0),
         term("groupBy", "string"),
         term("window",
-          EventTimeTumblingGroupWindow(
+          TumblingGroupWindow(
             WindowReference("w"),
-            RowtimeAttribute(),
+            'long,
             5.millis)),
         term("select",
           "string",

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
index 4c0fea7..b097767 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala
@@ -58,7 +58,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     StreamITCase.testResults = mutable.MutableList()
     StreamITCase.clear
     val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv, 'a, 'b, 'c)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
     val countFun = new CountAggFunction
     val weightAvgFun = new WeightedAvg
 
@@ -107,7 +107,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     )
     val table = env
       .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
-      .toTable(tEnv).as('a, 'b, 'c)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
     val countFun = new CountAggFunction
     val weightAvgFun = new WeightedAvg
 
@@ -173,7 +173,7 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     StreamITCase.testResults = mutable.MutableList()
 
     val stream = env.fromCollection(data)
-    val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
+    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
 
     val windowedTable = table
       .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following 
CURRENT_ROW as 'w)
@@ -234,7 +234,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     StreamITCase.clear
 
     val table = env.addSource[(Long, Int, String)](
-      new RowTimeSourceFunction[(Long, Int, 
String)](data)).toTable(tEnv).as('a, 'b, 'c)
+      new RowTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     val windowedTable = table
       .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following 
CURRENT_ROW as 'w)
@@ -295,7 +296,8 @@ class OverWindowITCase extends StreamingWithStateTestBase {
     StreamITCase.clear
 
     val table = env.addSource[(Long, Int, String)](
-      new RowTimeSourceFunction[(Long, Int, 
String)](data)).toTable(tEnv).as('a, 'b, 'c)
+      new RowTimeSourceFunction[(Long, Int, String)](data))
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
 
     val windowedTable = table
       .window(

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
index 7dea521..ea3ab22 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
-import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvgWithRetract}
+import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
 import org.apache.flink.table.api.{Table, ValidationException}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.utils.TableTestUtil._
@@ -27,7 +27,8 @@ import org.junit.Test
 
 class OverWindowTest extends TableTestBase {
   private val streamUtil: StreamTableTestUtil = streamTestUtil()
-  val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 
'b, 'c)
+  val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable",
+    'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
 
   @Test(expected = classOf[ValidationException])
   def testInvalidWindowAlias(): Unit = {
@@ -121,12 +122,12 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "b", "c", "PROCTIME() AS $3")
+            term("select", "a", "b", "c", "proctime")
           ),
           term("partitionBy", "b"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "PROCTIME", "WeightedAvgWithRetract(c, 
a) AS w0$o0")
+          term("select", "a", "b", "c", "proctime", "WeightedAvgWithRetract(c, 
a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS _c1")
       )
@@ -150,16 +151,16 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
           term("partitionBy", "a"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
           term(
             "select",
             "a",
             "c",
-            "PROCTIME",
+            "proctime",
             "WeightedAvgWithRetract(c, a) AS w0$o0"
           )
         ),
@@ -183,11 +184,11 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(c) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(c) AS w0$o0")
         ),
         term("select", "a", "w0$o0 AS _c1")
       )
@@ -209,11 +210,11 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS _c1")
       )
@@ -238,16 +239,16 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
           term(
             "select",
             "a",
             "c",
-            "PROCTIME",
+            "proctime",
             "COUNT(a) AS w0$o0",
             "WeightedAvgWithRetract(c, a) AS w0$o1"
           )
@@ -280,12 +281,12 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME",
+          term("select", "a", "c", "proctime",
                "COUNT(a) AS w0$o0",
                "WeightedAvgWithRetract(c, a) AS w0$o1")
         ),
@@ -310,15 +311,15 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
           term(
             "select",
             "a",
             "c",
-            "PROCTIME",
+            "proctime",
             "COUNT(a) AS w0$o0",
             "SUM(a) AS w0$o1"
           )
@@ -349,11 +350,11 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "PROCTIME() AS $2")
+            term("select", "a", "c", "proctime")
           ),
-          term("orderBy", "PROCTIME"),
+          term("orderBy", "proctime"),
           term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "PROCTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "proctime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS _c1")
       )
@@ -378,12 +379,12 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "b", "c", "ROWTIME() AS $3")
+            term("select", "a", "b", "c", "rowtime")
           ),
           term("partitionBy", "b"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "b", "c", "ROWTIME",
+          term("select", "a", "b", "c", "rowtime",
                "COUNT(b) AS w0$o0",
                "WeightedAvgWithRetract(c, a) AS w0$o1")
         ),
@@ -410,16 +411,16 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
           term("partitionBy", "a"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN 7200000 PRECEDING AND CURRENT ROW"),
           term(
             "select",
             "a",
             "c",
-            "ROWTIME",
+            "rowtime",
             "AVG(c) AS w0$o0",
             "WeightedAvgWithRetract(c, a) AS w0$o1"
           )
@@ -444,11 +445,11 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN 10000 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(c) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(c) AS w0$o0")
         ),
         term("select", "a", "w0$o0 AS _c1")
       )
@@ -470,11 +471,11 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("rows", "BETWEEN 2 PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS _c1")
       )
@@ -499,16 +500,16 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
           term(
             "select",
             "a",
             "c",
-            "ROWTIME",
+            "rowtime",
             "COUNT(a) AS w0$o0",
             "WeightedAvgWithRetract(c, a) AS w0$o1"
           )
@@ -542,12 +543,12 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
           term("partitionBy", "c"),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME",
+          term("select", "a", "c", "rowtime",
                "COUNT(a) AS w0$o0",
                "WeightedAvgWithRetract(c, a) AS w0$o1")
         ),
@@ -572,15 +573,15 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("range", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
           term(
             "select",
             "a",
             "c",
-            "ROWTIME",
+            "rowtime",
             "COUNT(a) AS w0$o0",
             "SUM(a) AS w0$o1"
           )
@@ -611,11 +612,11 @@ class OverWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "c", "ROWTIME() AS $2")
+            term("select", "a", "c", "rowtime")
           ),
-          term("orderBy", "ROWTIME"),
+          term("orderBy", "rowtime"),
           term("rows", "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"),
-          term("select", "a", "c", "ROWTIME", "COUNT(a) AS w0$o0")
+          term("select", "a", "c", "rowtime", "COUNT(a) AS w0$o0")
         ),
         term("select", "c", "w0$o0 AS _c1")
       )

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index d314c9a..d261e36 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -31,7 +31,7 @@ class GroupWindowStringExpressionTest extends TableTestBase {
   @Test
   def testJavaScalaTableAPIEquality(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+    val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 
'rowtime.rowtime)
 
     val myCountFun = new CountAggFunction
     util.tEnv.registerFunction("myCountFun", myCountFun)
@@ -40,7 +40,7 @@ class GroupWindowStringExpressionTest extends TableTestBase {
 
     // Expression / Scala API
     val resScala = t
-      .window(Slide over 4.rows every 2.rows as 'w)
+      .window(Slide over 4.hours every 2.hours on 'rowtime as 'w)
       .groupBy('w, 'string)
       .select(
         'string,
@@ -51,7 +51,7 @@ class GroupWindowStringExpressionTest extends TableTestBase {
 
     // String / Java API
     val resJava = t
-      .window(JSlide.over("4.rows").every("2.rows").as("w"))
+      .window(JSlide.over("4.hours").every("2.hours").on("rowtime").as("w"))
       .groupBy("w, string")
       .select(
         "string, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
index 0a5e001..04016f1 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
@@ -29,7 +29,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testPartitionedUnboundedOverRow(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 
'w)
@@ -44,7 +44,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testUnboundedOverRow(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following 
CURRENT_ROW as 'w)
@@ -59,7 +59,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testPartitionedBoundedOverRow(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 
'w)
@@ -74,7 +74,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testBoundedOverRow(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW 
as 'w)
@@ -89,7 +89,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testPartitionedUnboundedOverRange(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE 
as 'w)
@@ -104,7 +104,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testUnboundedOverRange(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following 
CURRENT_RANGE as 'w)
@@ -120,7 +120,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testPartitionedBoundedOverRange(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes 
as 'w)
@@ -135,7 +135,7 @@ class OverWindowStringExpressionTest extends TableTestBase {
   @Test
   def testBoundedOverRange(): Unit = {
     val util = streamTestUtil()
-    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e)
+    val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 
'rowtime.rowtime)
 
     val resScala = t
       .window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE 
as 'w)

http://git-wip-us.apache.org/repos/asf/flink/blob/495f104b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
index dcd3c6c..05e1892 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/DataStreamAggregateITCase.scala
@@ -62,10 +62,10 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 
'string)
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
 
     val windowedTable = table
-      .window(Slide over 5.milli every 2.milli on 'rowtime as 'w)
+      .window(Slide over 5.milli every 2.milli on 'long as 'w)
       .groupBy('w)
       .select('int.count, 'w.start, 'w.end)
 
@@ -97,10 +97,10 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 
'string)
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
 
     val windowedTable = table
-      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+      .window(Slide over 10.milli every 5.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
@@ -134,10 +134,10 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 
'string)
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
 
     val windowedTable = table
-      .window(Slide over 5.milli every 4.milli on 'rowtime as 'w)
+      .window(Slide over 5.milli every 4.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
@@ -168,10 +168,10 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 
'string)
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
 
     val windowedTable = table
-      .window(Slide over 5.milli every 10.milli on 'rowtime as 'w)
+      .window(Slide over 5.milli every 10.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
@@ -197,10 +197,10 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
     val stream = env
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
-    val table = stream.toTable(tEnv, 'long, 'int, 'double, 'float, 'bigdec, 
'string)
+    val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 
'bigdec, 'string)
 
     val windowedTable = table
-      .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+      .window(Slide over 3.milli every 10.milli on 'long as 'w)
       .groupBy('w, 'string)
       .select('string, 'int.count, 'w.start, 'w.end)
 
@@ -225,7 +225,7 @@ class DataStreamAggregateITCase extends 
StreamingMultipleProgramsTestBase {
       .fromCollection(data)
       .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
       .map(t => (t._2, t._6))
-    val table = stream.toTable(tEnv, 'int, 'string)
+    val table = stream.toTable(tEnv, 'int, 'string, 'rowtime.rowtime)
 
     val windowedTable = table
       .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)

Reply via email to