[
https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938639#comment-15938639
]
ASF GitHub Bot commented on FLINK-5990:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3585#discussion_r107707074
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
---
@@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env.fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0))
+ .toTable(tEnv).as('a, 'b, 'c)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, a, " +
+ "sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2
preceding AND CURRENT ROW)" +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,9", "Hello,5,12",
+ "Hello,6,15", "Hello World,7,7", "Hello World,8,15", "Hello
World,20,35")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testBoundPartitionedEventTimeWindowWithRowWithLateEvent(): Unit = {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (4L, 4, "Hello"),
+ (3L, 3, "Hello"),
+ (7L, 7, "Hello"),
+ (8L, 8, "Hello World"),
+ (7L, 8, "Hello"),
+ (5L, 5, "Hello"),
+ (20L, 20, "Hello World"),
+ (9L, 9, "Hello World"))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+
+ // set the parallelism to 1 such that the test elements are arrived in
order. For instance,
+ // element (20L, 20, "Hello World") arrives before element (9L, 9,
"Hello World").
+ env.setParallelism(1)
+
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val t1 = env.fromCollection(data)
+ .assignTimestampsAndWatermarks(new
TimestampWithLatenessWatermark(2)) // allowedLateness = 2
--- End diff --
I don't think we need to test this case. The complete logic of the over
window is the same, just the input is different but very similar as before.
> Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> -----------------------------------------------------------------------------
>
> Key: FLINK-5990
> URL: https://issues.apache.org/jira/browse/FLINK-5990
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER ROWS aggregations on event
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT
> a,
> SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND
> CURRENT ROW) AS sumB,
> MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND
> CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is required
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5803)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some
> of the restrictions are trivial to address, we can add the functionality in
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with
> RexOver expression).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)