[ 
https://issues.apache.org/jira/browse/FLINK-5990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15938639#comment-15938639
 ] 

ASF GitHub Bot commented on FLINK-5990:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3585#discussion_r107707074
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
 ---
    @@ -293,6 +297,82 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, StreamITCase.testResults.sorted)
       }
     
    +  @Test
    +  def testBoundPartitionedEventTimeWindowWithRow(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val t1 = env.fromCollection(data)
    +      .assignTimestampsAndWatermarks(new TimestampWithLatenessWatermark(0))
    +      .toTable(tEnv).as('a, 'b, 'c)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT " +
    +      "c, a, " +
    +      "sum(a) OVER (PARTITION BY c ORDER BY RowTime() ROWS BETWEEN 2 
preceding AND CURRENT ROW)" +
    +      "from T1"
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,9", "Hello,5,12",
    +      "Hello,6,15", "Hello World,7,7", "Hello World,8,15", "Hello 
World,20,35")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testBoundPartitionedEventTimeWindowWithRowWithLateEvent(): Unit = {
    +
    +    val data = List(
    +      (1L, 1, "Hello"),
    +      (2L, 2, "Hello"),
    +      (4L, 4, "Hello"),
    +      (3L, 3, "Hello"),
    +      (7L, 7, "Hello"),
    +      (8L, 8, "Hello World"),
    +      (7L, 8, "Hello"),
    +      (5L, 5, "Hello"),
    +      (20L, 20, "Hello World"),
    +      (9L, 9, "Hello World"))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.clear
    +
    +    // set the parallelism to 1 such that the test elements are arrived in 
order. For instance,
    +    // element (20L, 20, "Hello World") arrives before element (9L, 9, 
"Hello World").
    +    env.setParallelism(1)
    +
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data)
    +        .assignTimestampsAndWatermarks(new 
TimestampWithLatenessWatermark(2)) // allowedLateness = 2
    --- End diff --
    
    I don't think we need to test this case. The complete logic of the over 
window is the same, just the input is different but very similar as before.


> Add [partitioned] event time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-5990
>                 URL: https://issues.apache.org/jira/browse/FLINK-5990
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> The goal of this issue is to add support for OVER ROWS aggregations on event 
> time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT 
>   a, 
>   SUM(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) AS sumB,
>   MIN(b) OVER (PARTITION BY c ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND 
> CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is required
> - The ORDER BY clause may only have rowTime() as parameter. rowTime() is a 
> parameterless scalar function that just indicates event time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5803)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some 
> of the restrictions are trivial to address, we can add the functionality in 
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with 
> RexOver expression).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to