[ 
https://issues.apache.org/jira/browse/FLINK-21317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17280814#comment-17280814
 ] 

Yuan Mei edited comment on FLINK-21317 at 2/8/21, 7:33 AM:
-----------------------------------------------------------

Hey [~kezhuw], thanks very much for reporting this.

The current version of FlinkKafkaShuffle has some limitations of use cases.

One is that the key-group from the read side is not completely the same as the 
original one (the writer side) because of the reassignment of the Flink 
Consumer (I think that's what you mentioned as breaking prerequisites).

But before preceding, may I understand a bit more of your use cases? I'd happy 
to work with you together to enrich the feature a bit if it is something you 
are looking for to make use of.


was (Author: ym):
Hey [~kezhuw], thanks very much for reporting this.

The current version of FlinkKafkaShuffle has some limitations of use cases.

One is that the key-group from the read side is not completely the same as the 
original one (the writer side) because of the reassignment of the Flink 
Consumer (I think that's what you mentioned by breaking prerequisites).

But before preceding, may I understand a bit more of your use cases? I'd happy 
to work with you together to enrich the feature a bit if it is something you 
are looking for to make use of.

> Downstream keyed state not work after FlinkKafkaShuffle
> -------------------------------------------------------
>
>                 Key: FLINK-21317
>                 URL: https://issues.apache.org/jira/browse/FLINK-21317
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.0
>            Reporter: Kezhu Wang
>            Priority: Major
>
> {{FlinkKafkaShuffle}} uses 
> {{KeyGroupRangeAssignment.assignKeyToParallelOperator}} to assign partition 
> records to kafka topic partition. The assignment works as follow:
>  # {{KeyGroupRangeAssignment.assignToKeyGroup(Object key, int 
> maxParallelism)}} assigns key to key group.
>  # {{KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(int 
> maxParallelism, int parallelism, int keyGroupId)}} assigns that key group to 
> operator/subtask index.
> When kafka topic partitions are consumed, they are redistributed by 
> {{KafkaTopicPartitionAssigner.assign(KafkaTopicPartition partition, int 
> numParallelSubtasks)}}. I copied code of this redistribution here.
> {code:java}
> public class KafkaTopicPartitionAssigner {
>     public static int assign(KafkaTopicPartition partition, int 
> numParallelSubtasks) {
>         int startIndex =
>                 ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % 
> numParallelSubtasks;
>         // here, the assumption is that the id of Kafka partitions are always 
> ascending
>         // starting from 0, and therefore can be used directly as the offset 
> clockwise from the
>         // start index
>         return (startIndex + partition.getPartition()) % numParallelSubtasks;
>     }
> }
> {code}
> This partition redistribution breaks prerequisites for 
> {{DataStreamUtils.reinterpretAsKeyedStream}}, that is key groups are messed 
> up. The consequence is unusable keyed state. I list deepest stack trace 
> captured here:
> {noformat}
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.flink.runtime.state.heap.StateTable.transform(StateTable.java:205)
>       at 
> org.apache.flink.runtime.state.heap.HeapReducingState.add(HeapReducingState.java:100)
> {noformat}
> cc [~ym]  [~sewen] [~AHeise]  [~pnowojski]
> Below is my proposed changes:
> * Make assignment between partition and subtask customizable.
> * Provide a 0-based round-robin assignment. (This is making {{startIndex}} 0 
> in existing assignment algorithms.)
> I saw FLINK-8570, above changes could be helpful if we finally decide to 
> deliver FLINK-8570.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to