[
https://issues.apache.org/jira/browse/FLINK-5653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942809#comment-15942809
]
ASF GitHub Bot commented on FLINK-5653:
---------------------------------------
Github user huawei-flink commented on the issue:
https://github.com/apache/flink/pull/3574
Hi @fhueske, @sunjincheng121 ,
let me try to explain my perspective on this specific case (row based, proc
time). This is for the purpose of discussion, to show that we are spending
thoughts on this topic for a while now.
In case of the row range, the "serialization savings" coming from MapState
exists up to the point in which the "buffer" is filled. After that that, we
need to start retracting to keep the value correct and to do that, we need to
deserialize all the objects. as @rtudoran mentioned, we implemented a version
using a Queue object.
This has many advantages:
- removing the object from the buffer at the right moment freeing memory on
the go (without any iteration over the key set)
- has the data access pattern of O(1) without any "key resolution costs"
and no list iteration
- keeps the natural processing order by design, without the need of
indexing objects with timestamps
- the experiments we run show that there are no difference for windows up
to 100k elements, and after that the queue seems to be more efficient (as the
the key resolution does not come for free).
The map state may have a slight advantage in the early stages, when the
window is not filled, but after it just introduces useless operations.
Furthermore, the need to index objects with a created timestamp (more memory
wasted), dealing with a sequential access (List) to get the most recent object
when you can actually just use the natural arrival order seems useless
complication. Applying the Occam Razor there should be no doubt on which
solution we should be selecting first. The serialization optimization while
window gets filled sounds like a premature optimization not worth in the long
run. The further implementation of SQL operators (e.g. LIMIT, OFFSET etc) can
just benefit from the fact that the state is already sorted, whereas the map
would need to be sorted all the time.
Of course I am talking specifically of the procTime semantic operations.
eventTime is another story anyway. The map state as minor advantages in the
beginning (as anyway the serialization costs are small), the queue state as
advantages in executions running steadily because of access pattern and natural
buffer cleansing.
These are my two cents on the discussion
> Add processing time OVER ROWS BETWEEN x PRECEDING aggregation to SQL
> --------------------------------------------------------------------
>
> Key: FLINK-5653
> URL: https://issues.apache.org/jira/browse/FLINK-5653
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Stefano Bortoli
>
> The goal of this issue is to add support for OVER ROWS aggregations on
> processing time streams to the SQL interface.
> Queries similar to the following should be supported:
> {code}
> SELECT
> a,
> SUM(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING
> AND CURRENT ROW) AS sumB,
> MIN(b) OVER (PARTITION BY c ORDER BY procTime() ROWS BETWEEN 2 PRECEDING
> AND CURRENT ROW) AS minB
> FROM myStream
> {code}
> The following restrictions should initially apply:
> - All OVER clauses in the same SELECT clause must be exactly the same.
> - The PARTITION BY clause is optional (no partitioning results in single
> threaded execution).
> - The ORDER BY clause may only have procTime() as parameter. procTime() is a
> parameterless scalar function that just indicates processing time mode.
> - UNBOUNDED PRECEDING is not supported (see FLINK-5656)
> - FOLLOWING is not supported.
> The restrictions will be resolved in follow up issues. If we find that some
> of the restrictions are trivial to address, we can add the functionality in
> this issue as well.
> This issue includes:
> - Design of the DataStream operator to compute OVER ROW aggregates
> - Translation from Calcite's RelNode representation (LogicalProject with
> RexOver expression).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)