[ 
https://issues.apache.org/jira/browse/FLINK-6075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015385#comment-16015385
 ] 

ASF GitHub Bot commented on FLINK-6075:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3889#discussion_r117182703
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeSortProcessFunction.scala
 ---
    @@ -0,0 +1,169 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
    +import org.apache.flink.api.java.typeutils.{RowTypeInfo, ListTypeInfo}
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.configuration.Configuration
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import java.util.{List => JList, ArrayList => JArrayList}
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the aggregate in bounded rowtime sort without 
offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param fieldCount Is used to indicate fields in the current element to 
forward
    + * @param inputType It is used to mark the type of the incoming data
    + * @param rowComparator the [[java.util.Comparator]] is used for this sort 
aggregation
    + */
    +class RowTimeSortProcessFunction(
    +  private val fieldCount: Int,
    +  private val inputRowType: CRowTypeInfo,
    +  private val rowComparator: CollectionRowComparator)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  Preconditions.checkNotNull(rowComparator)
    +
    +  private val sortArray: ArrayList[Row] = new ArrayList[Row]
    +  
    +  // the state which keeps all the events that are not expired.
    +  // Each timestamp will contain an associated list with the events 
    +  // received at that timestamp
    +  private var dataState: MapState[Long, JList[Row]] = _
    +
    +    // the state which keeps the last triggering timestamp to filter late 
events
    +  private var lastTriggeringTsState: ValueState[Long] = _
    +  
    +  private var outputC: CRow = _
    +  
    +  
    +  override def open(config: Configuration) {
    +     
    +    val keyTypeInformation: TypeInformation[Long] =
    +      BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]]
    +    val valueTypeInformation: TypeInformation[JList[Row]] = new 
ListTypeInfo[Row](
    +        inputRowType.asInstanceOf[CRowTypeInfo].rowType)
    +
    +    val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] =
    +      new MapStateDescriptor[Long, JList[Row]](
    +        "dataState",
    +        keyTypeInformation,
    +        valueTypeInformation)
    +
    +    dataState = getRuntimeContext.getMapState(mapStateDescriptor)
    +    
    +    val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] =
    +      new ValueStateDescriptor[Long]("lastTriggeringTsState", 
classOf[Long])
    +    lastTriggeringTsState = 
getRuntimeContext.getState(lastTriggeringTsDescriptor)
    +  }
    +
    +  
    +  override def processElement(
    +    inputC: CRow,
    +    ctx: ProcessFunction[CRow, CRow]#Context,
    +    out: Collector[CRow]): Unit = {
    +
    +     val input = inputC.row
    +    
    +     if( outputC == null) {
    +      outputC = new CRow(input, true)
    +    }
    +    
    +    // triggering timestamp for trigger calculation
    +    val triggeringTs = ctx.timestamp
    +
    +    val lastTriggeringTs = lastTriggeringTsState.value
    +
    +    // check if the data is expired, if not, save the data and register 
event time timer
    +    if (triggeringTs > lastTriggeringTs) {
    +      val data = dataState.get(triggeringTs)
    +      if (null != data) {
    +        data.add(input)
    +        dataState.put(triggeringTs, data)
    +      } else {
    +        val data = new JArrayList[Row]
    +        data.add(input)
    +        dataState.put(triggeringTs, data)
    +        // register event time timer
    +        ctx.timerService.registerEventTimeTimer(triggeringTs)
    +      }
    +    }
    +  }
    +  
    +  
    +  override def onTimer(
    +    timestamp: Long,
    +    ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
    +    out: Collector[CRow]): Unit = {
    +    
    +    // gets all window data from state for the calculation
    +    val inputs: JList[Row] = dataState.get(timestamp)
    +
    +    if (null != inputs) {
    +      
    +      var dataListIndex = 0
    +
    +      // no retraction needed for time order sort
    +      
    +      //no selection of offset/fetch
    +      
    +      dataListIndex = 0
    +      sortArray.clear()
    --- End diff --
    
    `inputs` is not a `ListState` but an actual `ArrayList` that was returned 
from the `dataState: MapState[JList[Row]]`. So we are copying the elements from 
one `ArrayList` into another.
    
    In `ProctimeSortProcessFunction` the `ListState[Row]` is much better than 
`ValueState[JList[Row]]` because adding to the `ListState` is basically free, 
while `ValueState` would need to deserialized the `List` every time we read or 
write.


> Support Limit/Top(Sort) for Stream SQL
> --------------------------------------
>
>                 Key: FLINK-6075
>                 URL: https://issues.apache.org/jira/browse/FLINK-6075
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: radu
>              Labels: features
>         Attachments: sort.png
>
>
> These will be split in 3 separated JIRA issues. However, the design is the 
> same only the processing function differs in terms of the output. Hence, the 
> design is the same for all of them.
> Time target: Proc Time
> **SQL targeted query examples:**
> *Sort example*
> Q1)` SELECT a FROM stream1 GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL 
> '3' HOUR) ORDER BY b` 
> Comment: window is defined using GROUP BY
> Comment: ASC or DESC keywords can be placed to mark the ordering type
> *Limit example*
> Q2) `SELECT a FROM stream1 WHERE rowtime BETWEEN current_timestamp - INTERVAL 
> '1' HOUR AND current_timestamp ORDER BY b LIMIT 10`
> Comment: window is defined using time ranges in the WHERE clause
> Comment: window is row triggered
> *Top example*
> Q3) `SELECT sum(a) OVER (ORDER BY proctime RANGE INTERVAL '1' HOUR PRECEDING 
> LIMIT 10) FROM stream1`  
> Comment: limit over the contents of the sliding window
> General Comments:
> -All these SQL clauses are supported only over windows (bounded collections 
> of data). 
> -Each of the 3 operators will be supported with each of the types of 
> expressing the windows. 
> **Description**
> The 3 operations (limit, top and sort) are similar in behavior as they all 
> require a sorted collection of the data on which the logic will be applied 
> (i.e., select a subset of the items or the entire sorted set). These 
> functions would make sense in the streaming context only in the context of a 
> window. Without defining a window the functions could never emit as the sort 
> operation would never trigger. If an SQL query will be provided without 
> limits an error will be thrown (`SELECT a FROM stream1 TOP 10` -> ERROR). 
> Although not targeted by this JIRA, in the case of working based on event 
> time order, the retraction mechanisms of windows and the lateness mechanisms 
> can be used to deal with out of order events and retraction/updates of 
> results.
> **Functionality example**
> We exemplify with the query below for all the 3 types of operators (sorting, 
> limit and top). Rowtime indicates when the HOP window will trigger – which 
> can be observed in the fact that outputs are generated only at those moments. 
> The HOP windows will trigger at every hour (fixed hour) and each event will 
> contribute/ be duplicated for 2 consecutive hour intervals. Proctime 
> indicates the processing time when a new event arrives in the system. Events 
> are of the type (a,b) with the ordering being applied on the b field.
> `SELECT a FROM stream1 HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR) 
> ORDER BY b (LIMIT 2/ TOP 2 / [ASC/DESC] `)
> ||Rowtime||   Proctime||      Stream1||       Limit 2||       Top 2|| Sort 
> [ASC]||
> |         |10:00:00  |(aaa, 11)       |               |             |         
>    |
> |         |10:05:00    |(aab, 7)  |           |             |            |
> |10-11          |11:00:00  |          |       aab,aaa |aab,aaa  |     aab,aaa 
>    |
> |         |11:03:00  |(aac,21)  |           |         |            |          
>         
> |11-12    |12:00:00  |          |     aab,aaa |aab,aaa  |     aab,aaa,aac|
> |         |12:10:00  |(abb,12)  |           |         |            |          
>         
> |         |12:15:00  |(abb,12)  |           |         |            |          
>         
> |12-13          |13:00:00  |          |       abb,abb | abb,abb |     
> abb,abb,aac|
> |...|
> **Implementation option**
> Considering that the SQL operators will be associated with window boundaries, 
> the functionality will be implemented within the logic of the window as 
> follows.
> * Window assigner – selected based on the type of window used in SQL 
> (TUMBLING, SLIDING…)
> * Evictor/ Trigger – time or count evictor based on the definition of the 
> window boundaries
> * Apply – window function that sorts data and selects the output to trigger 
> (based on LIMIT/TOP parameters). All data will be sorted at once and result 
> outputted when the window is triggered
> An alternative implementation can be to use a fold window function to sort 
> the elements as they arrive, one at a time followed by a flatMap to filter 
> the number of outputs. 
> !sort.png!
> **General logic of Join**
> ```
> inputDataStream.window(new [Slide/Tumble][Time/Count]Window())
> //.trigger(new [Time/Count]Trigger()) – use default
> //.evictor(new [Time/Count]Evictor()) – use default
>               .apply(SortAndFilter());
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to