[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17514050#comment-17514050
 ] 

shizhengchao commented on FLINK-26033:
--------------------------------------

master: 2049f849a130e738759001c5a6b9d85834da08d0
1.13: 8b18e74cec71c9c480bb747fb2f77a55f675ca2a
1.14: d4f374237b21aab93f5d185cb2d1e15a29c9c537

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26033
>                 URL: https://issues.apache.org/jira/browse/FLINK-26033
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>            Reporter: shizhengchao
>            Assignee: shizhengchao
>            Priority: Major
>              Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional<FlinkKafkaPartitioner<RowData>> 
> getFlinkKafkaPartitioner(
>         ReadableConfig tableOptions, ClassLoader classLoader) {
>     return tableOptions
>             .getOptional(SINK_PARTITIONER)
>             .flatMap(
>                     (String partitioner) -> {
>                         switch (partitioner) {
>                             case SINK_PARTITIONER_VALUE_FIXED:
>                                 return Optional.of(new 
> FlinkFixedPartitioner<>());
>                             case SINK_PARTITIONER_VALUE_DEFAULT:
>                             case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
>                                 return Optional.empty();
>                                 // Default fallback to full class name of the 
> partitioner.
>                             default:
>                                 return Optional.of(
>                                         initializePartitioner(partitioner, 
> classLoader));
>                         }
>                     });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
>     if (keyBytes == null) {
>         // Random when there is no key        
>         return stickyPartitionCache.partition(topic, cluster);
>     } 
>     List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>     int numPartitions = partitions.size();
>     // hash the keyBytes to choose a partition
>     return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
>     private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new 
> ConcurrentHashMap<>();
>     public void configure(Map<String, ?> configs) {}
>     /**
>      * Compute the partition for the given record.
>      *
>      * @param topic The topic name
>      * @param key The key to partition on (or null if no key)
>      * @param keyBytes serialized key to partition on (or null if no key)
>      * @param value The value to partition on or null
>      * @param valueBytes serialized value to partition on or null
>      * @param cluster The current cluster metadata
>      */
>     @Override
>     public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
>         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
>         int numPartitions = partitions.size();
>         int nextValue = nextValue(topic);
>         List<PartitionInfo> availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
>         if (!availablePartitions.isEmpty()) {
>             int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
>             return availablePartitions.get(part).partition();
>         } else {
>             // no partitions are available, give a non-available partition
>             return Utils.toPositive(nextValue) % numPartitions;
>         }
>     }
>     private int nextValue(String topic) {
>         AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
>             return new AtomicInteger(0);
>         });
>         return counter.getAndIncrement();
>     }
>     public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to