Repository: spark Updated Branches: refs/heads/master 8c14276c3 -> 4c27663cb
[SPARK-18057][FOLLOW-UP][SS] Update Kafka client version from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? Increase ZK timeout and harmonize configs across Kafka tests to resolâ¦ve potentially flaky test failure ## How was this patch tested? Existing tests Author: Sean Owen <sro...@gmail.com> Closes #21995 from srowen/SPARK-18057.3. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c27663c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c27663c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c27663c Branch: refs/heads/master Commit: 4c27663cb20f3cde7317ffcb2c9d42257a40057f Parents: 8c14276 Author: Sean Owen <sro...@gmail.com> Authored: Fri Aug 3 16:22:54 2018 -0700 Committer: Shixiong Zhu <zsxw...@gmail.com> Committed: Fri Aug 3 16:22:54 2018 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 1 + .../org/apache/spark/streaming/kafka010/KafkaTestUtils.scala | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4c27663c/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 8229490..d89cccd 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -304,6 +304,7 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L props.put("port", brokerPort.toString) props.put("log.dir", Utils.createTempDir().getAbsolutePath) props.put("zookeeper.connect", zkAddress) + props.put("zookeeper.connection.timeout.ms", "60000") props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") props.put("delete.topic.enable", "true") http://git-wip-us.apache.org/repos/asf/spark/blob/4c27663c/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 2315baf..eef4c55 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -32,6 +32,7 @@ import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.ZkUtils import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.serialization.StringSerializer import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} @@ -109,7 +110,7 @@ private[kafka010] class KafkaTestUtils extends Logging { brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup() - brokerPort = server.boundPort(brokerConf.interBrokerListenerName) + brokerPort = server.boundPort(new ListenerName("PLAINTEXT")) (server, brokerPort) }, new SparkConf(), "KafkaBroker") @@ -220,8 +221,11 @@ private[kafka010] class KafkaTestUtils extends Logging { props.put("port", brokerPort.toString) props.put("log.dir", brokerLogDir) props.put("zookeeper.connect", zkAddress) + props.put("zookeeper.connection.timeout.ms", "60000") props.put("log.flush.interval.messages", "1") props.put("replica.socket.timeout.ms", "1500") + props.put("delete.topic.enable", "true") + props.put("offsets.topic.num.partitions", "1") props.put("offsets.topic.replication.factor", "1") props.put("group.initial.rebalance.delay.ms", "10") props --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org