This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2fecc4a [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation) 2fecc4a is described below commit 2fecc4a3fe910163b51b6017406b3f1f05658787 Author: DylanGuedes <djmggue...@gmail.com> AuthorDate: Thu Mar 14 09:20:30 2019 -0500 [SPARK-27138][TESTS][KAFKA] Remove AdminUtils calls (fixes deprecation) ## What changes were proposed in this pull request? To change calls to AdminUtils, currently used to create and delete topics in Kafka tests. With this change, it will rely on adminClient, the recommended way from now on. ## How was this patch tested? I ran all unit tests and they are fine. Since it is already good tested, I thought that changes in the API wouldn't require new tests, as long as the current tests are working fine. Closes #24071 from DylanGuedes/spark-27138. Authored-by: DylanGuedes <djmggue...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index dacfffa..9fa88b4 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -20,20 +20,19 @@ package org.apache.spark.sql.kafka010 import java.io.{File, IOException} import java.lang.{Integer => JInt} import java.net.InetSocketAddress -import java.util.{Map => JMap, Properties, UUID} +import java.util.{Collections, Map => JMap, Properties, UUID} import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.language.postfixOps import scala.util.Random -import kafka.admin.AdminUtils import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions} +import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions, NewTopic} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition @@ -195,7 +194,8 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L var created = false while (!created) { try { - AdminUtils.createTopic(zkUtils, topic, partitions, 1) + val newTopic = new NewTopic(topic, partitions, 1) + adminClient.createTopics(Collections.singleton(newTopic)) created = true } catch { // Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and @@ -222,7 +222,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Delete a Kafka topic and wait until it is propagated to the whole cluster */ def deleteTopic(topic: String): Unit = { val partitions = zkUtils.getPartitionsForTopics(Seq(topic))(topic).size - AdminUtils.deleteTopic(zkUtils, topic) + adminClient.deleteTopics(Collections.singleton(topic)) verifyTopicDeletionWithRetries(zkUtils, topic, partitions, List(this.server)) } @@ -422,7 +422,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // As pushing messages into Kafka updates Zookeeper asynchronously, there is a small // chance that a topic will be recreated after deletion due to the asynchronous update. // Hence, delete the topic and retry. - AdminUtils.deleteTopic(zkUtils, topic) + adminClient.deleteTopics(Collections.singleton(topic)) throw e } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org