This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit ffa30cf50e5c4b6cb72809660a28eb4d1aec0df9
Author: Sergey Nuyanzin <snuyan...@gmail.com>
AuthorDate: Tue Oct 25 15:25:56 2022 +0200

    [hotfix][kafka] Replace deprecated API usages
---
 .../kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java   | 2 +-
 .../connector/kafka/sink/testutils/KafkaSinkExternalContext.java   | 4 ++--
 .../connector/kafka/testutils/KafkaSourceExternalContext.java      | 5 ++++-
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 7 +++++--
 .../flink/streaming/connectors/kafka/table/KafkaTableTestBase.java | 2 +-
 5 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
index 931de8c..404ffae 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
@@ -41,7 +41,7 @@ class KafkaSubscriberUtils {
     static Map<String, TopicDescription> getTopicMetadata(
             AdminClient adminClient, Set<String> topicNames) {
         try {
-            return adminClient.describeTopics(topicNames).all().get();
+            return 
adminClient.describeTopics(topicNames).allTopicNames().get();
         } catch (Exception e) {
             throw new RuntimeException(
                     String.format("Failed to get metadata for topics %s.", 
topicNames), e);
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
index ee9ac21..ae26048 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -193,7 +193,7 @@ public class KafkaSinkExternalContext implements 
DataStreamSinkV2ExternalContext
 
     protected Map<String, TopicDescription> getTopicMetadata(List<String> 
topics) {
         try {
-            return kafkaAdminClient.describeTopics(topics).all().get();
+            return 
kafkaAdminClient.describeTopics(topics).allTopicNames().get();
         } catch (Exception e) {
             throw new RuntimeException(
                     String.format("Failed to get metadata for topics %s.", 
topics), e);
@@ -202,7 +202,7 @@ public class KafkaSinkExternalContext implements 
DataStreamSinkV2ExternalContext
 
     private boolean topicExists(String topic) {
         try {
-            kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get();
+            
kafkaAdminClient.describeTopics(Arrays.asList(topic)).allTopicNames().get();
             return true;
         } catch (Exception e) {
             return false;
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
index 67b214b..658a1c9 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
@@ -190,7 +190,10 @@ public class KafkaSourceExternalContext implements 
DataStreamSourceExternalConte
         final Set<String> topics = adminClient.listTopics().names().get();
         if (topics.contains(topicName)) {
             final Map<String, TopicDescription> topicDescriptions =
-                    
adminClient.describeTopics(Collections.singletonList(topicName)).all().get();
+                    adminClient
+                            
.describeTopics(Collections.singletonList(topicName))
+                            .allTopicNames()
+                            .get();
             final int numPartitions = 
topicDescriptions.get(topicName).partitions().size();
             LOG.info("Creating partition {} for topic '{}'", numPartitions + 
1, topicName);
             adminClient
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 0ef16f7..c5bc3b0 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -180,7 +180,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                             topicDescriptions =
                                     adminClient
                                             
.describeTopics(Collections.singleton(topic))
-                                            .all()
+                                            .allTopicNames()
                                             .get(REQUEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
                         } catch (Exception e) {
                             LOG.warn("Exception caught when describing Kafka 
topics", e);
@@ -331,7 +331,10 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
     public int getLeaderToShutDown(String topic) throws Exception {
         try (final AdminClient client = 
AdminClient.create(getStandardProperties())) {
             TopicDescription result =
-                    
client.describeTopics(Collections.singleton(topic)).all().get().get(topic);
+                    client.describeTopics(Collections.singleton(topic))
+                            .allTopicNames()
+                            .get()
+                            .get(topic);
             return result.partitions().get(0).leader().id();
         }
     }
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 10b11ab..f80a54f 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -199,7 +199,7 @@ public abstract class KafkaTableTestBase extends 
AbstractTestBase {
                             .map(TopicListing::name)
                             .collect(Collectors.toList());
 
-            return adminClient.describeTopics(topics).all().get();
+            return adminClient.describeTopics(topics).allTopicNames().get();
         } catch (Exception e) {
             throw new RuntimeException("Failed to list Kafka topics", e);
         }

Reply via email to