This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git


The following commit(s) were added to refs/heads/main by this push:
     new 06789ec  [FLINK-31599] Update kafka version to 3.4.0
06789ec is described below

commit 06789ec7b86a3b2edae1dc7f9f29d672ebd3610b
Author: Alex Sorokoumov <aleksandr.sorokou...@gmail.com>
AuthorDate: Thu Mar 23 11:13:45 2023 -0700

    [FLINK-31599] Update kafka version to 3.4.0
    
    This closes #11.
---
 .../flink/connector/kafka/sink/FlinkKafkaInternalProducer.java       | 5 ++---
 .../connector/kafka/source/reader/KafkaPartitionSplitReader.java     | 5 +++++
 .../connectors/kafka/internals/FlinkKafkaInternalProducer.java       | 5 ++---
 flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE         | 2 +-
 pom.xml                                                              | 2 +-
 5 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
index a023cdd..7a3ed56 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducer.java
@@ -288,11 +288,10 @@ class FlinkKafkaInternalProducer<K, V> extends 
KafkaProducer<K, V> {
 
         Object transactionManager = getTransactionManager();
         synchronized (transactionManager) {
-            Object topicPartitionBookkeeper =
-                    getField(transactionManager, "topicPartitionBookkeeper");
+            Object txnPartitionMap = getField(transactionManager, 
"txnPartitionMap");
 
             transitionTransactionManagerStateTo(transactionManager, 
"INITIALIZING");
-            invoke(topicPartitionBookkeeper, "reset");
+            invoke(txnPartitionMap, "reset");
 
             setField(
                     transactionManager,
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
index c440fc2..f52940c 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
@@ -373,7 +373,12 @@ public class KafkaPartitionSplitReader
             SplitsChange<KafkaPartitionSplit> splitsChange) {
         if (LOG.isDebugEnabled()) {
             StringJoiner splitsInfo = new StringJoiner(",");
+            Set<TopicPartition> assginment = consumer.assignment();
             for (KafkaPartitionSplit split : splitsChange.splits()) {
+                if (!assginment.contains(split.getTopicPartition())) {
+                    continue;
+                }
+
                 long startingOffset =
                         retryOnWakeup(
                                 () -> 
consumer.position(split.getTopicPartition()),
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
index a424a81..4b270b0 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/FlinkKafkaInternalProducer.java
@@ -211,15 +211,14 @@ public class FlinkKafkaInternalProducer<K, V> implements 
Producer<K, V> {
 
             Object transactionManager = getField(kafkaProducer, 
"transactionManager");
             synchronized (transactionManager) {
-                Object topicPartitionBookkeeper =
-                        getField(transactionManager, 
"topicPartitionBookkeeper");
+                Object txnPartitionMap = getField(transactionManager, 
"txnPartitionMap");
 
                 invoke(
                         transactionManager,
                         "transitionTo",
                         getEnum(
                                 
"org.apache.kafka.clients.producer.internals.TransactionManager$State.INITIALIZING"));
-                invoke(topicPartitionBookkeeper, "reset");
+                invoke(txnPartitionMap, "reset");
 
                 setField(
                         transactionManager,
diff --git a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE 
b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
index 533611f..641d404 100644
--- a/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
+++ b/flink-sql-connector-kafka/src/main/resources/META-INF/NOTICE
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
 
-- org.apache.kafka:kafka-clients:3.2.3
+- org.apache.kafka:kafka-clients:3.4.0
diff --git a/pom.xml b/pom.xml
index c7f5fd9..8bb3394 100644
--- a/pom.xml
+++ b/pom.xml
@@ -51,7 +51,7 @@ under the License.
     <properties>
         <flink.version>1.17-SNAPSHOT</flink.version>
         <flink.shaded.version>16.1</flink.shaded.version>
-        <kafka.version>3.2.3</kafka.version>
+        <kafka.version>3.4.0</kafka.version>
         <zookeeper.version>3.5.9</zookeeper.version>
 
         <jackson-bom.version>2.13.4.20221013</jackson-bom.version>

Reply via email to