[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r311406498 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,21 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +consumer.poll(jt.Duration.ZERO) +var partitions = consumer.assignment() +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Good point. While Kafka doc says the behavior of such hack has been indeterministic and Kafka never support it officially, we expect such behavior in any way. I've initiated thread to ask about viable alternatives of `poll(0)` and possibility of adding public API to update metadata only. https://lists.apache.org/thread.html/017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.apache.org%3E This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r311406498 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,21 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +consumer.poll(jt.Duration.ZERO) +var partitions = consumer.assignment() +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Good point. While Kafka doc says the behavior of such hack is indeterministic and Kafka never support it officially, we expect such behavior in any way. I've initiated thread to ask about viable alternatives of `poll(0)` and possibility of adding public API to update metadata only. https://lists.apache.org/thread.html/017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.apache.org%3E This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303804331 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Please correct me if I'm missing here. (Not an expert of Kafka, may miss some details.) It may return after 100ms if there's no record to consume even metadata is ready. Here we only need metadata but once we call poll, the request is bound to the records. To be clear, we would like to call `poll(0)` with explicitly putting sleep (to avoid coupling with records), but it would be also OK to let `consumer.poll` wait instead. Explicit sleep may sleep more accurately if there's a case record is ready to poll but metadata is not ready (I'm not sure this can be possible). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303804331 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Please correct me if I'm missing here. (Not an expert of Kafka, may miss some details.) It may return after 100ms if there's no record to consume even metadata is ready. Here we only need metadata but once we call poll, the request is bound to the records. To be clear, we would like to call `poll(0)` with explicitly putting sleep (to avoid coupling with records), but it would be also OK to let `consumer.poll` wait instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303804331 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Please correct me if I'm missing here. (Not an expert of Kafka, may miss some details.) It may return after 100ms if there's no record to consume even metadata is ready. Here we only need metadata but once we call poll, the request is bound to the records. To be clear, we would also like to call `poll(0)` with explicitly putting sleep (to avoid coupling with records), but it would be also OK to let `consumer.poll` wait instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303804331 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: Please correct me if I'm missing here. (Not an expert of Kafka, may miss some details.) It may return after 100ms if there's no record to consume even metadata is ready. Here we only need metadata but once we call poll, the request is bound to the records. To be clear, we would like to call `poll(0)` with explicitly putting sleep (to avoid coupling with records), but it would be also OK to let `consumer.poll` wait instead. Explicit sleep may sleep more properly if there's a case record is ready to poll but metadata is not ready (I'm not sure this can be possible). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303792433 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: No you just need to start the method with below: ``` consumer.poll(jt.Duration.Zero) var partitions = consumer.assignment() ``` instead of initializing partitions as empty set. If `consumer.poll(0)` works and partitions are filled with assignment, loop will not be executed. Does it work for you? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303755232 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { + // Poll to get the latest assigned partitions + consumer.poll(jt.Duration.ofMillis(100)) Review comment: I'm sorry for back and forth on this, but it would be better if we can add timeout only when it's retrying. So try once with no delay, and retry with some timeouts (effectively sleep) only if it fails. This would avoid introducing additional timeout when the metadata is available, but still adding some delay between retrying. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303154822 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala ## @@ -33,6 +33,12 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester { private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs) private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger) + override protected def beforeEach(): Unit = { Review comment: If I'm understanding correctly, due to employ of `pollTimeoutMs` in KafkaOffsetReader, non-microbatch mode now requires SparkEnv. Right? Just to verify my understanding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303153832 ## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ## @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader( stopConsumer() _consumer = null // will automatically get reinitialized again } + + private def getPartitions(): ju.Set[TopicPartition] = { +var partitions = Set.empty[TopicPartition].asJava +val startTimeMs = System.currentTimeMillis() +while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { Review comment: Better to put some sleep in a loop if poll(0) doesn't employ any sleep. If the duration is somewhat big value, exponential sleep would be ideal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#discussion_r303155155 ## File path: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala ## @@ -310,6 +309,20 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L offsets } + private def getPartitions(consumer: KafkaConsumer[String, String]): JSet[TopicPartition] = { Review comment: Duplicated code. Maybe we could put this in utility class or somewhere and pass timeout as parameter? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org