[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2023-10-16 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17775657#comment-17775657
 ] 

Martijn Visser commented on FLINK-26033:


[~tzulitai] WDYT?

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-07-07 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17563814#comment-17563814
 ] 

Martijn Visser commented on FLINK-26033:


[~renqs]Do we still want to review this? 

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available, stale-assigned
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-29 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17514050#comment-17514050
 ] 

shizhengchao commented on FLINK-26033:
--

master: 2049f849a130e738759001c5a6b9d85834da08d0
1.13: 8b18e74cec71c9c480bb747fb2f77a55f675ca2a
1.14: d4f374237b21aab93f5d185cb2d1e15a29c9c537

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-02 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500489#comment-17500489
 ] 

Qingsheng Ren commented on FLINK-26033:
---

Thanks [~tinny] for the explanation. I think this is related to KAFKA-9965 and 
it has not been resolved until now. It looks good to me to implement our own 
partitioner, which could also respect the backward compatibility since Kafka 
integrated round robin partitioner is only available after 2.4.0.

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {co

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-03-02 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500479#comment-17500479
 ] 

shizhengchao commented on FLINK-26033:
--

[~MartijnVisser] [~renqs]   I got through testing that the 
RoundRobinPartitioner built into kafka does not work either, that is it can't 
distribute the writes to all partitions equally, due to abortForNewBatch. For 
example, there are 10 partitions, `

org.apache.kafka.clients.producer.RoundRobinPartitioner` only send data to even 
partitions, due to abortForNewBatch is true.  So we should implement a 
round-robin partitioner in flink, and need to automatically discover the 
partition design

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>  Labels: pull-request-available
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counte

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-17 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494438#comment-17494438
 ] 

Martijn Visser commented on FLINK-26033:


[~tinny] I've assigned it to you. I'm sure that [~renqs] can help with a review.

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Assignee: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-17 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494327#comment-17494327
 ] 

shizhengchao commented on FLINK-26033:
--

[~renqs]  I plan to add a robin class that implements the FlinkKafkaPartitioner 
interface 

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-17 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17494323#comment-17494323
 ] 

shizhengchao commented on FLINK-26033:
--

[~renqs] I 'm interested in fix this bug, please assign it to me

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.11.6, 1.12.7, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-15 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17492980#comment-17492980
 ] 

Qingsheng Ren commented on FLINK-26033:
---

Sorry for my late response! 

I checked the code of Kafka and confirm that the behavior of 
{{DefaultPartitioner}} has been changed from round robin to sticky partitioner 
from Kafka 2.4.0 (See 
[KIP-480|https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner]).
 So it's indeed a bug since Flink 1.11 that we bumped Kafka client version to 
2.4.1. 

Are you interested in fixing this issue? [~tinny] 

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> publ

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-09 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489406#comment-17489406
 ] 

Martijn Visser commented on FLINK-26033:


I'm interesting in [~renqs] his opinion. If this is indeed an issue for higher 
versions of the Kafka Client, I would consider this ticket a blocker for 1.15.0 
since we're updating the Kafka Client there to 2.8.1 (See 
https://issues.apache.org/jira/browse/FLINK-25504)

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-09 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489345#comment-17489345
 ] 

shizhengchao commented on FLINK-26033:
--

[~MartijnVisser] This is not a problem, but in higher versions of kafka-clinet, 
such as 2.4.1, there will be problems

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-09 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489342#comment-17489342
 ] 

shizhengchao commented on FLINK-26033:
--

[~MartijnVisser] I made a mistake,I re-checked, the documentation only applies 
to older kafka versions, such as kafka-client-2.0.1, and the problem I'm 
talking about is kafka-client-2.4.1. 
{code:java}
//kafka-2.0.1
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

// kafka-2.4.1
public int partition(String topic, Object key, byte[] keyBytes, Object value, 
byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
} 
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}

{code}

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
>

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-09 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489329#comment-17489329
 ] 

Martijn Visser commented on FLINK-26033:


[~tinny] When reading up in the documentation 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#sink-partitioning|https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#sink-partitioning]
 it's mentioned that round robin only works when record's keys are not 
specified. So I'm wondering if there is support for round robin, except it 
doesn't work in your case because you're specifying record's key?

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> r

[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-08 Thread Martijn Visser (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489327#comment-17489327
 ] 

Martijn Visser commented on FLINK-26033:


[~renqs] What are your thoughts on this?

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (FLINK-26033) In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it does not take effect

2022-02-08 Thread shizhengchao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-26033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489298#comment-17489298
 ] 

shizhengchao commented on FLINK-26033:
--

[~jark] [~libenchao] Can you check this question?

> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect
> --
>
> Key: FLINK-26033
> URL: https://issues.apache.org/jira/browse/FLINK-26033
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.13.3, 1.14.3
>Reporter: shizhengchao
>Priority: Major
>
> In KafkaConnector, when 'sink.partitioner' is configured as 'round-robin', it 
> does not take effect. Flink treats 'default' and 'round-robin' as the same 
> strategy.
> {code:java}
> //代码占位符
> public static Optional> 
> getFlinkKafkaPartitioner(
> ReadableConfig tableOptions, ClassLoader classLoader) {
> return tableOptions
> .getOptional(SINK_PARTITIONER)
> .flatMap(
> (String partitioner) -> {
> switch (partitioner) {
> case SINK_PARTITIONER_VALUE_FIXED:
> return Optional.of(new 
> FlinkFixedPartitioner<>());
> case SINK_PARTITIONER_VALUE_DEFAULT:
> case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
> return Optional.empty();
> // Default fallback to full class name of the 
> partitioner.
> default:
> return Optional.of(
> initializePartitioner(partitioner, 
> classLoader));
> }
> });
> } {code}
> They both use kafka's default partitioner, but the actual There are two 
> scenarios for the partition on DefaultPartitioner:
> 1. Random when there is no key
> 2. When there is a key, take the modulo according to the key
> {code:java}
> // org.apache.kafka.clients.producer.internals.DefaultPartitioner
> public int partition(String topic, Object key, byte[] keyBytes, Object value, 
> byte[] valueBytes, Cluster cluster) {
> if (keyBytes == null) {
> // Random when there is no key        
> return stickyPartitionCache.partition(topic, cluster);
> } 
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> // hash the keyBytes to choose a partition
> return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
> } {code}
> Therefore, KafkaConnector does not have a round-robin strategy.But we can 
> borrow from kafka's RoundRobinPartitioner
> {code:java}
> //代码占位符
> public class RoundRobinPartitioner implements Partitioner {
> private final ConcurrentMap topicCounterMap = new 
> ConcurrentHashMap<>();
> public void configure(Map configs) {}
> /**
>  * Compute the partition for the given record.
>  *
>  * @param topic The topic name
>  * @param key The key to partition on (or null if no key)
>  * @param keyBytes serialized key to partition on (or null if no key)
>  * @param value The value to partition on or null
>  * @param valueBytes serialized value to partition on or null
>  * @param cluster The current cluster metadata
>  */
> @Override
> public int partition(String topic, Object key, byte[] keyBytes, Object 
> value, byte[] valueBytes, Cluster cluster) {
> List partitions = cluster.partitionsForTopic(topic);
> int numPartitions = partitions.size();
> int nextValue = nextValue(topic);
> List availablePartitions = 
> cluster.availablePartitionsForTopic(topic);
> if (!availablePartitions.isEmpty()) {
> int part = Utils.toPositive(nextValue) % 
> availablePartitions.size();
> return availablePartitions.get(part).partition();
> } else {
> // no partitions are available, give a non-available partition
> return Utils.toPositive(nextValue) % numPartitions;
> }
> }
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> {
> return new AtomicInteger(0);
> });
> return counter.getAndIncrement();
> }
> public void close() {}
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)