[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21685 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user sidhavratha commented on the issue: https://github.com/apache/spark/pull/21685 Our kafka team have resolved issue regarding 40 sec poll delay, due to some faulty hardware. However, these changes still make sense to get better throughput per batch. As you know kafka already pre-fetches records, however it pre-fetch only one additional poll. If a batch requires number of records which takes many polls to fetch, processing will be blocked on kafka poll. This blocking kafka poll can be avoided based on configuration, and better throughput can be acheived, if we maintain configured buffer on executors. Attaching 2 files kafka_streaming_without_buffer.pdf : Streaming job without buffer is able to achieve ~50K throughput per batch kafka_streaming_with_buffer.pdf: Streaming job with buffer is able to maintain ~70K throughput per batch For my test job Batch duration = 10 sec Kafka Topic Partitions = 15 Cores = 5 Average Processing time per record = 0.7 ms Hence, Expected number of records per batch = (Batch duration * Kafka topic partitions) / (Ceil (Kafka partitions / Cores) * Time per record) = (10 * 1000 * 15) / (Ceil(15/5) * 0.7) = 100 * 1000 * (15/3) * (1/7) = 10 * (5/7) = 71428.57 Expected number of records per batch = 71428.57 = ~71K As per calculation ~71K is maximum throughput that can be achieved. Without buffer throughput is less since some time is wasted on blocked kafka poll. In this PR I am making kafka polling strategy to be configurable so that it is possible to plugin external strategy as well. Class name to create kafka consumer can be defined as "spark.streaming.kafka.consumer.builder.name" parameter. Few points which you have asked last time : * Tests with recievers : This change is not applicable for recievers. Changes are only for direct approach. * Guarantees when driver dies : I have tested with the scenario when driver dies. Since new started driver starts from last committed offset it works as expected and does not loose any data. * Message size : Yes, pervious issue (40 sec poll) was more appearant when message size were big > 10KB. However, since "max.partition.fetch.bytes" is fixed as 1MB it should not be causing more poll time, since it have to carry same number of bytes irrespective of per record size. (However, I am not sure of Kafka server internals regarding this). * Testing on 2.4.0 cluster : I am still working on creating 2.4.0 cluster. Will update test result on that setup as well. [kafka_streaming_without_buffer.pdf](https://github.com/apache/spark/files/2246522/kafka_streaming_without_buffer.pdf) [kafka_streaming_with_buffer.pdf](https://github.com/apache/spark/files/2246523/kafka_streaming_with_buffer.pdf) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/21685 In the meantime came something into my mind (the most obvious question). What is the size of kafka events which is processed? Big events could end up in high polling time. Maybe some of the events are significantly bigger than others. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user sidhavratha commented on the issue: https://github.com/apache/spark/pull/21685 And yes, both application are tested on same dataset, with only additional buffer logic applied, and consumer group-id changed. In before case scheduling delay is increasing because of kafka poll time during few batches, which is exactly the problem I have tried to solve in this PR. We are still trying to figure out reason for frequent high poll time, however, we should remove dependency of a batch processing time from kafka poll, which can be done due to executor stickiness for each kafka partition. This will provide benefit in terms of kafka poll time even if it is few seconds. Also, the configuration I am adding is optional, without which job will behave as usual (blocking on poll). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user sidhavratha commented on the issue: https://github.com/apache/spark/pull/21685 If batch duration is 10 second, every 10 second 1 new batch will start irrespective of last batch was completed or not. If a particular batch (10 second duration - which is supposed to complete in 10 second), takes more time to complete (for ex. 50 second in attached screenshot) that additional 40 sec will get added as scheduling delay of next batch. If poll time is included in processing time it can cause this sudden jump of scheduling delays of batches. These scheduling delay will get cleared if some batches take less than 10 sec. For ex. first batch in screenshot had 4s scheduled delay which got cleared for next batch as that batch took only 5s to process. We are using backpressure to automatically control record count based of batch speed. https://user-images.githubusercontent.com/2279976/42166788-c1c3890a-7e29-11e8-8d74-c2c251c7a6a1.png";> --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/21685 What I can't really understand is why the `Scheduler Delay` is so different. ` Scheduler delay includes time to ship the task from the scheduler to the executor, and time to send the task result from the executor to the scheduler. If scheduler delay is large, consider decreasing the size of tasks or decreasing the size of task results. ` According to this in the `before` case (5min 20sec) either the source or the result set is quite big which is not the case in `after` (Avg. 160 ms). Is the the same application tested on the same dataset? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user sidhavratha commented on the issue: https://github.com/apache/spark/pull/21685 Thanks a lot for looking into this. Please find comments in [] below every points. - You're trying to commit something into 2.4 but in the test result I see with 2.1.0 version. Have you tested it with 2.4? This part of the code has significantly changed. Results with this version would be better. [We do not have 2.4.0 cluster handy. Will try to spawn a 2.4.0 cluster and test the same.] - In the before case the input rate was approximately the same just like in the after case constantly. After the initial good performance something wrong happened and decreased the rate significantly. What happened exactly there? Maybe memory filled up and not able to poll things without GC (just guessing)? [Kafka poll usually bring more records than one batch can process. In my case it bring ~500 records. That records will be in buffer for 4-5 batches, after which next poll will happen resulting in increased processing time. Also, not all kafka poll takes long time. We have raised issue with our kafka team, but it is inconclusive so far.] [I looked at GC time on executor (through Spark UI), which was insignificant. I will enable GC logs and run the job again.] - Have you considered/tested when driver/receiver dies? Guarantees are quite important. [I will test this scenario. Basically, Am I supposed to test if driver dies it should start from same place when it comes back up?] - Have you tested it with receivers? Some results would be excellent. [ I will get results with receivers as well.] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user gaborgsomogyi commented on the issue: https://github.com/apache/spark/pull/21685 In general `KafkaConsumer.poll` should take couple of seconds but 10+ is extreme high. The question `why it takes so long?` has to be answered first. In the processing time chart I see a trend which shows peeks periodically. I have a couple of questions without checking the code: * You're trying to commit something into 2.4 but in the test result I see with 2.1.0 version. Have you tested it with 2.4? This part of the code has significantly changed. Results with this version would be better. * In the `before` case the input rate was approximately the same just like in the `after` case constantly. After the initial good performance something wrong happened and decreased the rate significantly. What happened exactly there? Maybe memory filled up and not able to poll things without GC (just guessing)? * Have you considered/tested when driver/receiver dies? Guarantees are quite important. * Have you tested it with receivers? Some results would be excellent. All in all IMO we haven't reached the root cause and because of that not able to judge whether it's the right solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user sidhavratha commented on the issue: https://github.com/apache/spark/pull/21685 @gaborgsomogyi Can you please review this PR and approve for test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21685 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21685 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21685 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org