[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r203561847 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions -consumer.poll(0) +consumer.poll(JDuration.ofMillis(0)) --- End diff -- That's a good point. However, supporting all these versions are pretty cheap for Spark right now. Spark is using only APIs in 0.10. In addition, if the Kafka client version we pick up here has some critical issue, the user can just switch to an old version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r203256766 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions -consumer.poll(0) +consumer.poll(JDuration.ofMillis(0)) --- End diff -- @zsxwing Why do you want to support Kafka clients jars from 0.10 to 2.0? Since newer clients jars support older brokers, we recommend people use the latest Kafka clients jar whenever possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r203109788 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions -consumer.poll(0) +consumer.poll(JDuration.ofMillis(0)) --- End diff -- @tedyu just realized this is `ofMillis` rather than `toMillis`. We definitely cannot use it as this `poll` overload doesn't exist in previous versions and we want to support Kafka versions from 0.10 to 2.0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r203106522 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions -consumer.poll(0) +consumer.poll(JDuration.ofMillis(0)) --- End diff -- Depending on the Kafka release we agree upon, I can revert. Duration is recommended API for 2.0.0 release --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r203104176 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -115,7 +116,7 @@ private[kafka010] class KafkaOffsetReader( def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly { assert(Thread.currentThread().isInstanceOf[UninterruptibleThread]) // Poll to get the latest assigned partitions -consumer.poll(0) +consumer.poll(JDuration.ofMillis(0)) --- End diff -- Could you revert these changes? We don't use java.time.Duration in Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r203103522 --- Diff: external/kafka-0-10-sql/pom.xml --- @@ -74,6 +74,11 @@ ${kafka.version} test + + org.eclipse.jetty --- End diff -- Where does this come from? Or it can be just a test dependency? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user guozhangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r193574097 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Add new partitions to a Kafka topic */ def addPartitions(topic: String, partitions: Int): Unit = { -AdminUtils.addPartitions(zkUtils, topic, partitions) +val existingAssignment = zkClient.getReplicaAssignmentForTopics( + collection.immutable.Set(topic)).map { +case (topicPartition, replicas) => topicPartition.partition -> replicas +} --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user eric-maynard commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r193549547 --- Diff: external/kafka-0-10-sql/pom.xml --- @@ -29,7 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 -0.10.0.1 +2.0.0-SNAPSHOT jar Kafka 0.10 Source for Structured Streaming --- End diff -- We should change this line to reflect the change too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r192961847 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -203,7 +215,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L /** Add new partitions to a Kafka topic */ def addPartitions(topic: String, partitions: Int): Unit = { -AdminUtils.addPartitions(zkUtils, topic, partitions) +val existingAssignment = zkClient.getReplicaAssignmentForTopics( + collection.immutable.Set(topic)).map { +case (topicPartition, replicas) => topicPartition.partition -> replicas +} --- End diff -- We can get replica assignment information via AdminClient too. I think we should try to avoid the internal `ZkUtils` and `KafkaZkClient` as much as we can. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r192602632 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // Set up the Embedded Zookeeper server and get the proper Zookeeper port private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup -zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") +val zkSvr = s"$zkHost:$zkPort"; +zookeeper = new EmbeddedZookeeper(zkSvr) // Get the actual zookeeper binding port zkPort = zookeeper.actualPort -zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) +zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) +zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, Time.SYSTEM) +adminZkClient = new AdminZkClient(zkClient) --- End diff -- AdminClient.create gives you a concrete instance. createPartitions is the method you're looking for. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user tedyu commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r192601997 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // Set up the Embedded Zookeeper server and get the proper Zookeeper port private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup -zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") +val zkSvr = s"$zkHost:$zkPort"; +zookeeper = new EmbeddedZookeeper(zkSvr) // Get the actual zookeeper binding port zkPort = zookeeper.actualPort -zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) +zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) +zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, Time.SYSTEM) +adminZkClient = new AdminZkClient(zkClient) --- End diff -- AdminClient is abstract. KafkaAdminClient doesn't provide addPartitions. Mind giving some pointer ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user ijuma commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r192601219 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala --- @@ -96,10 +101,13 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L // Set up the Embedded Zookeeper server and get the proper Zookeeper port private def setupEmbeddedZookeeper(): Unit = { // Zookeeper server startup -zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort") +val zkSvr = s"$zkHost:$zkPort"; +zookeeper = new EmbeddedZookeeper(zkSvr) // Get the actual zookeeper binding port zkPort = zookeeper.actualPort -zkUtils = ZkUtils(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout, false) +zkUtils = ZkUtils(zkSvr, zkSessionTimeout, zkConnectionTimeout, false) +zkClient = KafkaZkClient(zkSvr, false, 6000, 1, Int.MaxValue, Time.SYSTEM) +adminZkClient = new AdminZkClient(zkClient) --- End diff -- Can we use the Java AdminClient instead of these internal classes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
GitHub user tedyu opened a pull request: https://github.com/apache/spark/pull/21488 SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0 ## What changes were proposed in this pull request? This PR upgrades to the Kafka 2.0.0 release where KIP-266 is integrated. ## How was this patch tested? This PR uses existing Kafka related unit tests (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tedyu/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21488.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21488 commit 0a22686d9a388a21d5dd38513854341d3f37f738 Author: tedyu Date: 2018-06-03T19:54:22Z SPARK-18057 Update structured streaming kafka from 0.10.0.1 to 2.0.0 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org