[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500479#comment-17500479
 ] 

shizhengchao edited comment on FLINK-26033 at 3/3/22, 3:36 AM:
---------------------------------------------------------------

[~MartijnVisser] [~renqs]   I got through testing that the 
RoundRobinPartitioner built into kafka does not work either, that is it can't 
distribute the writes to all partitions equally, due to abortForNewBatch. For 
example, there are 10 partitions, 
`org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to 
even partitions, due to abortForNewBatch is true.  So we should implement a 
round-robin partitioner in flink, and need to automatically discover the 
partition design


was (Author: tinny):
[~MartijnVisser] [~renqs]   I got through testing that the 
RoundRobinPartitioner built into kafka does not work either, that is it can't 
distribute the writes to all partitions equally, due to abortForNewBatch. For 
example, there are 10 partitions, `

org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even 
partitions, due to abortForNewBatch is true.  So we should implement a 
round-robin partitioner in flink, and need to automatically discover the 
partition design

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26033
>                 URL: https://issues.apache.org/jira/browse/FLINK-26033
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>            Reporter: shizhengchao
>            Assignee: shizhengchao
>            Priority: Major
>              Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional<FlinkKafkaPartitioner<RowData>> 
> getFlinkKafkaPartitioner(
>         ReadableConfig tableOptions, ClassLoader classLoader) {
>     return tableOptions
>             .getOptional(SINK_PARTITIONER)
>             .flatMap(
>                     (String partitioner) -> {
>                         switch (partitioner) {
>                             case SINK_PARTITIONER_VALUE_FIXED:
>                                 return Optional.of(new 
> FlinkFixedPartitioner<>());
>                             case SINK_PARTITIONER_VALUE_DEFAULT:
>                             case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
>                                 return Optional.empty();
>                                 // Default fallback to full class name of the 
> partitioner.
>                             default:
>                                 return Optional.of(
>                                         initializePartitioner(partitioner, 
> classLoader));
>                         }
>                     });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
>     if (keyBytes == null) {
>         // Random when there is no key        
>         return stickyPartitionCache.partition(topic, cluster);
>     } 
>     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>     int numPartitions = partitions.size();
>     // hash the keyBytes to choose a partition
>     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
>     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new 
> ConcurrentHashMap<>();
>     public void configure(Map<String, ?> configs) {}
>     /**
>      * Compute the partition for the given record.
>      *
>      * @param topic The topic name
>      * @param key The key to partition on (or null if no key)
>      * @param keyBytes serialized key to partition on (or null if no key)
>      * @param value The value to partition on or null
>      * @param valueBytes serialized value to partition on or null
>      * @param cluster The current cluster metadata
>      */
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
>         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>         int numPartitions = partitions.size();
>         int nextValue = nextValue(topic);
>         List<PartitionInfo> availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
>         if (!availablePartitions.isEmpty()) {
>             int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
>             return availablePartitions.get(part).partition();
>         } else {
>             // no partitions are available, give a non-available partition
>             return Utils.toPositive(nextValue) % numPartitions;
>         }
>     }
>     private int nextValue(String topic) {
>         AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
>             return new AtomicInteger(0);
>         });
>         return counter.getAndIncrement();
>     }
>     public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to