[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-08-07 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r311406498
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,21 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+consumer.poll(jt.Duration.ZERO)
+var partitions = consumer.assignment()
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   Good point. While Kafka doc says the behavior of such hack has been 
indeterministic and Kafka never support it officially, we expect such behavior 
in any way.
   
   I've initiated thread to ask about viable alternatives of `poll(0)` and 
possibility of adding public API to update metadata only.
   
https://lists.apache.org/thread.html/017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.apache.org%3E


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-08-07 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r311406498
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,21 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+consumer.poll(jt.Duration.ZERO)
+var partitions = consumer.assignment()
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   Good point. While Kafka doc says the behavior of such hack is 
indeterministic and Kafka never support it officially, we expect such behavior 
in any way.
   
   I've initiated thread to ask about viable alternatives of `poll(0)` and 
possibility of adding public API to update metadata only.
   
https://lists.apache.org/thread.html/017cf631ef981ab1b494b1249be5c11d7edfe5f4867770a18188ebdc@%3Cdev.kafka.apache.org%3E


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303804331
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   Please correct me if I'm missing here. (Not an expert of Kafka, may miss 
some details.) 
   
   It may return after 100ms if there's no record to consume even metadata is 
ready. Here we only need metadata but once we call poll, the request is bound 
to the records. 
   
   To be clear, we would like to call `poll(0)` with explicitly putting sleep 
(to avoid coupling with records), but it would be also OK to let 
`consumer.poll` wait instead. Explicit sleep may sleep more accurately if 
there's a case record is ready to poll but metadata is not ready (I'm not sure 
this can be possible).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303804331
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   Please correct me if I'm missing here. (Not an expert of Kafka, may miss 
some details.) 
   
   It may return after 100ms if there's no record to consume even metadata is 
ready. Here we only need metadata but once we call poll, the request is bound 
to the records. 
   
   To be clear, we would like to call `poll(0)` with explicitly putting sleep 
(to avoid coupling with records), but it would be also OK to let 
`consumer.poll` wait instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303804331
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   Please correct me if I'm missing here. (Not an expert of Kafka, may miss 
some details.) 
   
   It may return after 100ms if there's no record to consume even metadata is 
ready. Here we only need metadata but once we call poll, the request is bound 
to the records. 
   
   To be clear, we would also like to call `poll(0)` with explicitly putting 
sleep (to avoid coupling with records), but it would be also OK to let 
`consumer.poll` wait instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303804331
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   Please correct me if I'm missing here. (Not an expert of Kafka, may miss 
some details.) 
   
   It may return after 100ms if there's no record to consume even metadata is 
ready. Here we only need metadata but once we call poll, the request is bound 
to the records. 
   
   To be clear, we would like to call `poll(0)` with explicitly putting sleep 
(to avoid coupling with records), but it would be also OK to let 
`consumer.poll` wait instead. Explicit sleep may sleep more properly if there's 
a case record is ready to poll but metadata is not ready (I'm not sure this can 
be possible).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303792433
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   No you just need to start the method with below:
   
   ```
   consumer.poll(jt.Duration.Zero)
   var partitions = consumer.assignment()
   ```
   
   instead of initializing partitions as empty set.
   
   If `consumer.poll(0)` works and partitions are filled with assignment, loop 
will not be executed. Does it work for you?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303755232
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
+  // Poll to get the latest assigned partitions
+  consumer.poll(jt.Duration.ofMillis(100))
 
 Review comment:
   I'm sorry for back and forth on this, but it would be better if we can add 
timeout only when it's retrying. So try once with no delay, and retry with some 
timeouts (effectively sleep) only if it fails. 
   
   This would avoid introducing additional timeout when the metadata is 
available, but still adding some delay between retrying. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-12 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303154822
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala
 ##
 @@ -33,6 +33,12 @@ class KafkaSourceProviderSuite extends SparkFunSuite with 
PrivateMethodTester {
   private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs)
   private val maxOffsetsPerTriggerMethod = 
PrivateMethod[Option[Long]]('maxOffsetsPerTrigger)
 
+  override protected def beforeEach(): Unit = {
 
 Review comment:
   If I'm understanding correctly, due to employ of `pollTimeoutMs` in 
KafkaOffsetReader, non-microbatch mode now requires SparkEnv. Right? Just to 
verify my understanding.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-12 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303153832
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##
 @@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
 stopConsumer()
 _consumer = null  // will automatically get reinitialized again
   }
+
+  private def getPartitions(): ju.Set[TopicPartition] = {
+var partitions = Set.empty[TopicPartition].asJava
+val startTimeMs = System.currentTimeMillis()
+while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
 
 Review comment:
   Better to put some sleep in a loop if poll(0) doesn't employ any sleep. If 
the duration is somewhat big value, exponential sleep would be ideal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-12 Thread GitBox
HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use 
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303155155
 
 

 ##
 File path: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ##
 @@ -310,6 +309,20 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] 
= Map.empty) extends L
 offsets
   }
 
+  private def getPartitions(consumer: KafkaConsumer[String, String]): 
JSet[TopicPartition] = {
 
 Review comment:
   Duplicated code. Maybe we could put this in utility class or somewhere and 
pass timeout as parameter?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org