[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-521152419 @zsxwing yep, not much to do for now. Thanks for your help finding this out. @HeartSaVioR thanks for initiating + pushing the discussion forward. I'm going to open a new PR when the API is available... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-520830078 To make a small extract about the discussion in the Kafka dev channel the proposal where tends to be agreement is the following: * Call `AdminClient.listTopics` to get possible topic for subscribe pattern (in case of assign and subscribe such step not needed). Here there are 2 possibilities. 1. Do the filtering in Spark code 2. Kafka adds regex to the mentioned API Worth to mention such filtering is happening on the client side in `Consumer` at the moment so this would not cause any unwanted extra memory consumption. * [KIP-396](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97551484) would add `AdminClient.listOffsets` API from where offsets can be obtained without data polling. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-518654674 To resolve this situation I've considered mainly 2 things: * Increase `kafkaConsumer.pollTimeoutMs` * Introduce an additional timeout for partition assignment The second approach is more like an overkill from my point of view. Such configuration can be added later if we see the need. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-518652788 First of all thanks @HeartSaVioR for the deep look, it helped! @zsxwing I've had another round with the Kafka guys on this and here are the conclusions: * The approach to call `poll(Duration...` in a loop is the suggested solution * The second approach failed because of the following: Originally the partition assignment was synchronous in the old API (and may hang infinitely) but with the new implementation proper timeout applied. In the mentioned tests `"kafkaConsumer.pollTimeoutMs" -> "1000"` is set. Since the partition assignment happens on my machine (and seems like on Jenkins as well) around 1000 ms it was mainly race luck whether the test passed or not. The first approach was more on the success side but the second was more on the failing side. Here is the client log: ``` 19/08/05 06:55:27.650 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE NetworkClient: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Sending JOIN_GROUP {group_id=spark-kafka-source-42d70d54-e5ce-45$ 19/08/05 06:55:28.758 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE KafkaConsumer: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Closing the Kafka consumer 19/08/05 06:55:28.758 kafka-coordinator-heartbeat-thread | spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2 DEBUG AbstractCoordinator: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Heartbeat thread has closed 19/08/05 06:55:28.758 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE NetworkClient: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Completed receive from node 0 for METADATA with correlation id 6$ 19/08/05 06:55:28.759 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE Metadata: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Determining if we should replace existing epoch 0 with new epoch 0 19/08/05 06:55:28.759 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] DEBUG Metadata: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Updating last seen epoch from 0 to 0 for partition failOnDataLoss-0-0 19/08/05 06:55:28.759 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] DEBUG Metadata: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Updated cluster metadata updateVersion 3 to MetadataCache{cluster=Clu$ 19/08/05 06:55:28.760 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE NetworkClient: [Consumer clientId=consumer-522, groupId=spark-kafka-source-42d70d54-e5ce-453a-a84c-40cd8126a3ff--1337036482-driver-2] Completed receive from node 2147483647 for JOIN_GROUP with corre$ 19/08/05 06:55:28.760 stream execution thread for DontFailOnDataLoss [id = 9396b4b5-bc30-4e1e-8c6a-1d3a9d890ccf, runId = 5b1ee39c-c174-4c42-b0d5-1c4b59061ab3] TRACE Metrics: Removed metric named MetricName [name=connection-count, group=consumer-metrics, description=The current number of active connections., tags={client-id=consumer-522}] 19/08/05 06:55:28.760 data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-1 DEBUG Selector: [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:96) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at kafka.netw
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-512821931 > It feels me like consumer.poll(jt.Duration.ZERO) just doesn't work. If that would be true several users would protest with blocker bugs on Kafka side but haven't seen such situation (polling without assignment is not possible). I've seen several heavy users who is doing the polling (and nothing other magic) with `consumer.poll(Duration.ZERO)`. The use-case what I've seen is something like this: ``` wile (!interrupted) { val data = consumer.poll(Duration.ZERO) process(data) } ``` Anyway, I'm going to have 2 weeks vacation but after that I'm going to sit together with the Kafka guys and having a deeper look at this (again, only couple of Spark tests are failing consistently and it may happen we're using the API not properly). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-512703520 @zsxwing the main approach is created in agreement with the Kafka guys. The only thing which bothers me is why producing the following approaches different results (maybe an edge case hit with 0?!). Works: ``` ... consumer.poll(jt.Duration.ZERO) var partitions = consumer.assignment() while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { consumer.poll(jt.Duration.ofMillis(100)) partitions = consumer.assignment() } ... ``` Fails consistently and no flakyness (worth to mention only couple of tests, so maybe tests are wrongly implemented): ``` ... var partitions = Set.empty[TopicPartition].asJava while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs < pollTimeoutMs) { consumer.poll(jt.Duration.ZERO) partitions = consumer.assignment() if (partitions.isEmpty) { Thread.sleep(100) } } ... ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-512162339 Little bit explanation to the last change: * While I was debugging the issue I've realized `requirement failed` message is not really talkative so added a meaningful message * When `consumer.poll(Duration.ZERO)` with `Thread.sleep(100)` used then under some circumstances even 2 minutes was not enough to get the assignment. To test this I've put the standalone application into a 1k execution loop. The end result is that with non-zero timeout (including `Duration.ofMillis(1)`) everything works fine. So the end judgement is that the actual implementation works but as a side track I'm going to analyze the `consumer.poll(Duration.ZERO)` behavior with the Kafka guys. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-511796946 Checking the issue... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-511771601 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-511757383 Created a quick fix https://github.com/apache/spark/pull/25171 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-511750029 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-511359259 > Btw, just wondering, if you managed to test with situation delaying updating metadata (finally timed-out), then what would be occurred? Query fails? Or reattempt of relevant tasks happen? Presume you're interested in what is the new behavior. If exception arrives `withRetriesWithoutInterrupt` catches all `NonFatal` and takes care of the reattempt. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector
gaborgsomogyi commented on issue #25135: [SPARK-28367][SS] Use new KafkaConsumer.poll API in Kafka connector URL: https://github.com/apache/spark/pull/25135#issuecomment-510871772 cc @HeartSaVioR This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org