Theo Diefenthal created FLINK-19383:
---------------------------------------

             Summary: Per Partition State
                 Key: FLINK-19383
                 URL: https://issues.apache.org/jira/browse/FLINK-19383
             Project: Flink
          Issue Type: Improvement
            Reporter: Theo Diefenthal


With Kafka possibly being the mostly used data source in Flink, I'd like to 
propse a new "per-partition-state".

Right now, Flink only knows about OperatorState (evenly distributed or union) 
or keyedState.

With Kafka having multiple partitions per topic, Flink already exploits that 
nicely. Most widely used is the feature that one can produce data with 
ascending timestamps per kafka partition. (e.g. server logs with one server 
sending data to one partition). In Flink, this results in a huge optimization 
namingly that in that case, one can use an AscendingTimestampWatermarkAssigner 
and windows can be closed quickly. 

Making use of the performance optimization leads me to thinking that we could 
go a step further and introduce a per-kafka-partition state. In my current 
scenario, I need to buffer the data per server (and thus per kafka partition) 
for 1 minute in event time, waiting if during that time certain other events 
arrive or not.

A state per kafka partition is currently hard to implement. The best to do is 
keyby the datastream by kafka-partition. However, the KafkaAssigner has 
different assignment characteristics then the KeyGroupRangeAssignment leading 
to an unnecessary shuffle step. Even worse, the KeyGroupRangeAssignment is kind 
of random whereas the kafka-partition assignment from source works round 
robing. Having similarly loaded kafka-partitions, after keying, the load can be 
skewed on the taskmanagers. For a simple pipeline with parallelism 3 and 3 
partitions, this can lead to e.g. one taskManager processing 2 partitions, one 
taskmanager 1 partition and one taskManager being idle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to