[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-08-14 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-521152419
 
 
   @zsxwing yep, not much to do for now. Thanks for your help finding this out.
   @HeartSaVioR thanks for initiating + pushing the discussion forward.
   I'm going to open a new PR when the API is available...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-08-13 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-520830078
 
 
   To make a small extract about the discussion in the Kafka dev channel the 
proposal where tends to be agreement is the following:
   * Call `AdminClient.listTopics` to get possible topic for subscribe pattern 
(in case of assign and subscribe such step not needed). Here there are 2 
possibilities.
1. Do the filtering in Spark code
2. Kafka adds regex to the mentioned API
   
   Worth to mention such filtering is happening on the client side in 
`Consumer` at the moment so this would not cause any unwanted extra memory 
consumption.
   * 
[KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484)
 would add `AdminClient.listOffsets` API from where offsets can be obtained 
without data polling.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-08-06 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-518654674
 
 
   To resolve this situation I've considered mainly 2 things:
   * Increase `kafkaConsumer.pollTimeoutMs`
   * Introduce an additional timeout for partition assignment
   
   The second approach is more like an overkill from my point of view. Such 
configuration can be added later if we see the need.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-08-06 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-518652788
 
 
   First of all thanks @HeartSaVioR for the deep look, it helped!
   
   @zsxwing I've had another round with the Kafka guys on this and here are the 
conclusions:
   * The approach to call `poll(Duration...` in a loop is the suggested solution
   * The second approach failed because of the following: Originally the 
partition assignment was synchronous in the old API (and may hang infinitely) 
but with the new implementation proper timeout applied. In the mentioned tests 
`"kafkaConsumer.pollTimeoutMs" -> "1000"` is set. Since the partition 
assignment happens on my machine (and seems like on Jenkins as well) around 
1000 ms it was mainly race luck whether the test passed or not. The first 
approach was more on the success side but the second was more on the failing 
side.
   
   Here is the client log:
   ```
   19/08/05 06:55:27.650 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE NetworkClient: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Sending JOIN_GROUP {group_id=spark-kafka-source-42d70d54-e5ce-45$
   19/08/05 06:55:28.758 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE KafkaConsumer: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Closing the Kafka consumer
   19/08/05 06:55:28.758 kafka-coordinator-heartbeat-thread | 
spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2 
DEBUG AbstractCoordinator: [Consumer clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Heartbeat thread has closed
   19/08/05 06:55:28.758 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE NetworkClient: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Completed receive from node 0 for METADATA with correlation id 6$
   19/08/05 06:55:28.759 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE Metadata: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Determining if we should replace existing epoch 0 with new epoch 0
   19/08/05 06:55:28.759 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] DEBUG Metadata: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Updating last seen epoch from 0 to 0 for partition failOnDataLoss-0-0
   19/08/05 06:55:28.759 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] DEBUG Metadata: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Updated cluster metadata updateVersion 3 to MetadataCache{cluster=Clu$
   19/08/05 06:55:28.760 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE NetworkClient: [Consumer 
clientId=consumer-522, 
groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2]
 Completed receive from node 2147483647 for JOIN_GROUP with corre$
   19/08/05 06:55:28.760 stream execution thread for DontFailOnDataLoss [id = 
9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 
5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE Metrics: Removed metric named 
MetricName [name=connection-count, group=consumer-metrics, description=The 
current number of active connections., tags={client-id=consumer-522}]
   19/08/05 06:55:28.760 
data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-1 DEBUG 
Selector: [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected
   java.io.EOFException
   at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96)
   at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
   at 
org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
   at 
org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
   at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
   at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
   at kafka.netw

[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-18 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-512821931
 
 
   > It feels me like consumer.poll(jt.Duration.ZERO) just doesn't work.
   
   If that would be true several users would protest with blocker bugs on Kafka 
side but haven't seen such situation (polling without assignment is not 
possible). I've seen several heavy users who is doing the polling (and nothing 
other magic) with `consumer.poll(Duration.ZERO)`. The use-case what I've seen 
is something like this:
   ```
   wile (!interrupted) {
 val data = consumer.poll(Duration.ZERO)
 process(data)
   }
   ```
   
   Anyway, I'm going to have 2 weeks vacation but after that I'm going to sit 
together with the Kafka guys and having a deeper look at this (again, only 
couple of Spark tests are failing consistently and it may happen we're using 
the API not properly).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-18 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-512703520
 
 
   @zsxwing the main approach is created in agreement with the Kafka guys. The 
only thing which bothers me is why producing the following approaches different 
results (maybe an edge case hit with 0?!).
   Works:
   ```
   ...
   consumer.poll(jt.Duration.ZERO)
   var partitions = consumer.assignment()
   while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
 consumer.poll(jt.Duration.ofMillis(100))
 partitions = consumer.assignment()
   }
   ...
   ```
   Fails consistently and no flakyness (worth to mention only couple of tests, 
so maybe tests are wrongly implemented):
   ```
   ...
   var partitions = Set.empty[TopicPartition].asJava
   while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < 
pollTimeoutMs) {
 consumer.poll(jt.Duration.ZERO)
 partitions = consumer.assignment()
 if (partitions.isEmpty) {
   Thread.sleep(100)
 }
   }
   ...
   ```
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-17 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-512162339
 
 
   Little bit explanation to the last change:
   * While I was debugging the issue I've realized `requirement failed` message 
is not really talkative so added a meaningful message
   * When `consumer.poll(Duration.ZERO)` with `Thread.sleep(100)` used then 
under some circumstances even 2 minutes was not enough to get the assignment. 
To test this I've put the standalone application into a 1k execution loop. The 
end result is that with non-zero timeout (including `Duration.ofMillis(1)`) 
everything works fine.
   
   So the end judgement is that the actual implementation works but as a side 
track I'm going to analyze the `consumer.poll(Duration.ZERO)` behavior with the 
Kafka guys.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-511796946
 
 
   Checking the issue...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-511771601
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-511757383
 
 
   Created a quick fix https://github.com/apache/spark/pull/25171


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-16 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-511750029
 
 
   retest this please


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-15 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-511359259
 
 
   > Btw, just wondering, if you managed to test with situation delaying 
updating metadata (finally timed-out), then what would be occurred? Query 
fails? Or reattempt of relevant tasks happen?
   
   Presume you're interested in what is the new behavior. If exception arrives 
`withRetriesWithoutInterrupt` catches all `NonFatal` and takes care of the 
reattempt.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector

2019-07-12 Thread GitBox
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new 
KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#issuecomment-510871772
 
 
   cc @HeartSaVioR 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org