Repository: flink
Updated Branches:
  refs/heads/release-1.3 fd910f2c9 -> 28a89d1ca


[FLINK-6479] [table] Fix IOOBE in DataStreamGroupWindowAggregate.

This closes #3841.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4556b49c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4556b49c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4556b49c

Branch: refs/heads/release-1.3
Commit: 4556b49c80108e83d67a56ac5039f631dd681a96
Parents: fd910f2
Author: sunjincheng121 <sunjincheng...@gmail.com>
Authored: Mon May 8 16:41:31 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Tue May 9 18:52:20 2017 +0200

----------------------------------------------------------------------
 .../DataStreamGroupWindowAggregate.scala        |  2 +-
 .../table/GroupWindowAggregationsITCase.scala   | 32 +++++++++++++++
 .../scala/stream/table/GroupWindowTest.scala    | 41 +++++++++++++++++++-
 3 files changed, 73 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4556b49c/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 1be1896..c38e5af 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -134,7 +134,7 @@ class DataStreamGroupWindowAggregate(
       namedAggregates,
       namedProperties)
 
-    val keyedAggOpName = s"groupBy: (${groupingToString(schema.logicalType, 
grouping)}), " +
+    val keyedAggOpName = s"groupBy: 
(${groupingToString(inputSchema.logicalType, grouping)}), " +
       s"window: ($window), " +
       s"select: ($aggString)"
     val nonKeyedAggOpName = s"window: ($window), select: ($aggString)"

http://git-wip-us.apache.org/repos/asf/flink/blob/4556b49c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
index 2c027a9..846fe3e 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowAggregationsITCase.scala
@@ -175,6 +175,38 @@ class GroupWindowAggregationsITCase extends 
StreamingMultipleProgramsTestBase {
       "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testGroupWindowWithoutKeyInProjection(): Unit = {
+    val data = List(
+      (1L, 1, "Hi", 1, 1),
+      (2L, 2, "Hello", 2, 2),
+      (4L, 2, "Hello", 2, 2),
+      (8L, 3, "Hello world", 3, 3),
+      (16L, 3, "Hello world", 3, 3))
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env.fromCollection(data)
+    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 
'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val results = windowedTable.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq("12", "8", "2", "3", "1")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 object GroupWindowAggregationsITCase {

http://git-wip-us.apache.org/repos/asf/flink/blob/4556b49c/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 0573ff3..ef071b7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.table.api.scala.stream.table
 
 import org.apache.flink.api.scala._
-import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
+import 
org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, 
WeightedAvgWithMerge}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.WindowReference
@@ -792,6 +792,45 @@ class GroupWindowTest extends TableTestBase {
   }
 
   @Test
+  def testSlidingWindowWithUDAF(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Long, Int, String, Int, Int)](
+      'long,
+      'int,
+      'string,
+      'int2,
+      'int3,
+      'proctime.proctime)
+
+    val weightAvgFun = new WeightedAvg
+
+    val windowedTable = table
+      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+      .groupBy('w, 'int2, 'int3, 'string)
+      .select(weightAvgFun('long, 'int))
+
+    val expected =
+      unaryNode(
+        "DataStreamCalc",
+        unaryNode(
+          "DataStreamGroupWindowAggregate",
+          streamTableNode(0),
+          term("groupBy", "string, int2, int3"),
+          term("window", SlidingGroupWindow(WindowReference("w"), 'proctime,  
2.rows, 1.rows)),
+          term(
+            "select",
+            "string",
+            "int2",
+            "int3",
+            "WeightedAvg(long, int) AS TMP_0")
+        ),
+        term("select","TMP_0")
+      )
+
+    util.verifyTable(windowedTable, expected)
+  }
+
+  @Test
   def testSlideWindowStartEnd(): Unit = {
     val util = streamTestUtil()
     val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 
'rowtime.rowtime)

Reply via email to