This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push: new 06789ec [FLINK-31599] Update kafka version to 3.4.0 06789ec is described below commit 06789ec7b86a3b2edae1dc7f9f29d672ebd3610b Author: Alex Sorokoumov <aleksandr.sorokou...@gmail.com> AuthorDate: Thu Mar 23 11:13:45 2023 -0700 [FLINK-31599] Update kafka version to 3.4.0 This closes #11. --- .../flink/connector/kafka/sink/FlinkKafkaInternalProducer.java | 5 ++--- .../connector/kafka/source/reader/KafkaPartitionSplitReader.java | 5 +++++ .../connectors/kafka/internals/FlinkKafkaInternalProducer.java | 5 ++--- flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE | 2 +- pom.xml | 2 +- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java index a023cdd..7a3ed56 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java @@ -288,11 +288,10 @@ class FlinkKafkaInternalProducer<K, V> extends KafkaProducer<K, V> { Object transactionManager = getTransactionManager(); synchronized (transactionManager) { - Object topicPartitionBookkeeper = - getField(transactionManager, "topicPartitionBookkeeper"); + Object txnPartitionMap = getField(transactionManager, "txnPartitionMap"); transitionTransactionManagerStateTo(transactionManager, "INITIALIZING"); - invoke(topicPartitionBookkeeper, "reset"); + invoke(txnPartitionMap, "reset"); setField( transactionManager, diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java index c440fc2..f52940c 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java @@ -373,7 +373,12 @@ public class KafkaPartitionSplitReader SplitsChange<KafkaPartitionSplit> splitsChange) { if (LOG.isDebugEnabled()) { StringJoiner splitsInfo = new StringJoiner(","); + Set<TopicPartition> assginment = consumer.assignment(); for (KafkaPartitionSplit split : splitsChange.splits()) { + if (!assginment.contains(split.getTopicPartition())) { + continue; + } + long startingOffset = retryOnWakeup( () -> consumer.position(split.getTopicPartition()), diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java index a424a81..4b270b0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java @@ -211,15 +211,14 @@ public class FlinkKafkaInternalProducer<K, V> implements Producer<K, V> { Object transactionManager = getField(kafkaProducer, "transactionManager"); synchronized (transactionManager) { - Object topicPartitionBookkeeper = - getField(transactionManager, "topicPartitionBookkeeper"); + Object txnPartitionMap = getField(transactionManager, "txnPartitionMap"); invoke( transactionManager, "transitionTo", getEnum( "org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING")); - invoke(topicPartitionBookkeeper, "reset"); + invoke(txnPartitionMap, "reset"); setField( transactionManager, diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE index 533611f..641d404 100644 --- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE @@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt) -- org.apache.kafka:kafka-clients:3.2.3 +- org.apache.kafka:kafka-clients:3.4.0 diff --git a/pom.xml b/pom.xml index c7f5fd9..8bb3394 100644 --- a/pom.xml +++ b/pom.xml @@ -51,7 +51,7 @@ under the License. <properties> <flink.version>1.17-SNAPSHOT</flink.version> <flink.shaded.version>16.1</flink.shaded.version> - <kafka.version>3.2.3</kafka.version> + <kafka.version>3.4.0</kafka.version> <zookeeper.version>3.5.9</zookeeper.version> <jackson-bom.version>2.13.4.20221013</jackson-bom.version>