This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit ffa30cf50e5c4b6cb72809660a28eb4d1aec0df9 Author: Sergey Nuyanzin <snuyan...@gmail.com> AuthorDate: Tue Oct 25 15:25:56 2022 +0200 [hotfix][kafka] Replace deprecated API usages --- .../kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java | 2 +- .../connector/kafka/sink/testutils/KafkaSinkExternalContext.java | 4 ++-- .../connector/kafka/testutils/KafkaSourceExternalContext.java | 5 ++++- .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 7 +++++-- .../flink/streaming/connectors/kafka/table/KafkaTableTestBase.java | 2 +- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java index 931de8c..404ffae 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java @@ -41,7 +41,7 @@ class KafkaSubscriberUtils { static Map<String, TopicDescription> getTopicMetadata( AdminClient adminClient, Set<String> topicNames) { try { - return adminClient.describeTopics(topicNames).all().get(); + return adminClient.describeTopics(topicNames).allTopicNames().get(); } catch (Exception e) { throw new RuntimeException( String.format("Failed to get metadata for topics %s.", topicNames), e); diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java index ee9ac21..ae26048 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java @@ -193,7 +193,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext protected Map<String, TopicDescription> getTopicMetadata(List<String> topics) { try { - return kafkaAdminClient.describeTopics(topics).all().get(); + return kafkaAdminClient.describeTopics(topics).allTopicNames().get(); } catch (Exception e) { throw new RuntimeException( String.format("Failed to get metadata for topics %s.", topics), e); @@ -202,7 +202,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext private boolean topicExists(String topic) { try { - kafkaAdminClient.describeTopics(Arrays.asList(topic)).all().get(); + kafkaAdminClient.describeTopics(Arrays.asList(topic)).allTopicNames().get(); return true; } catch (Exception e) { return false; diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java index 67b214b..658a1c9 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java @@ -190,7 +190,10 @@ public class KafkaSourceExternalContext implements DataStreamSourceExternalConte final Set<String> topics = adminClient.listTopics().names().get(); if (topics.contains(topicName)) { final Map<String, TopicDescription> topicDescriptions = - adminClient.describeTopics(Collections.singletonList(topicName)).all().get(); + adminClient + .describeTopics(Collections.singletonList(topicName)) + .allTopicNames() + .get(); final int numPartitions = topicDescriptions.get(topicName).partitions().size(); LOG.info("Creating partition {} for topic '{}'", numPartitions + 1, topicName); adminClient diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 0ef16f7..c5bc3b0 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -180,7 +180,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { topicDescriptions = adminClient .describeTopics(Collections.singleton(topic)) - .all() + .allTopicNames() .get(REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("Exception caught when describing Kafka topics", e); @@ -331,7 +331,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { public int getLeaderToShutDown(String topic) throws Exception { try (final AdminClient client = AdminClient.create(getStandardProperties())) { TopicDescription result = - client.describeTopics(Collections.singleton(topic)).all().get().get(topic); + client.describeTopics(Collections.singleton(topic)) + .allTopicNames() + .get() + .get(topic); return result.partitions().get(0).leader().id(); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java index 10b11ab..f80a54f 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java @@ -199,7 +199,7 @@ public abstract class KafkaTableTestBase extends AbstractTestBase { .map(TopicListing::name) .collect(Collectors.toList()); - return adminClient.describeTopics(topics).all().get(); + return adminClient.describeTopics(topics).allTopicNames().get(); } catch (Exception e) { throw new RuntimeException("Failed to list Kafka topics", e); }