[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-09-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21685
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-31 Thread sidhavratha
Github user sidhavratha commented on the issue:

https://github.com/apache/spark/pull/21685
  
Our kafka team have resolved issue regarding 40 sec poll delay, due to some 
faulty hardware.
However, these changes still make sense to get better throughput per batch. 
As you know kafka already pre-fetches records, however it pre-fetch only one 
additional poll. If a batch requires number of records which takes many polls 
to fetch, processing will be blocked on kafka poll.

This blocking kafka poll can be avoided based on configuration, and better 
throughput can be acheived, if we maintain configured buffer on executors.

Attaching 2 files
kafka_streaming_without_buffer.pdf : Streaming job without buffer is able 
to achieve ~50K throughput per batch
kafka_streaming_with_buffer.pdf: Streaming job with buffer is able to 
maintain ~70K throughput per batch

For my test job
Batch duration = 10 sec
Kafka Topic Partitions = 15
Cores = 5
Average Processing time per record = 0.7 ms

Hence, 
Expected number of records per batch = (Batch duration * Kafka topic 
partitions) / (Ceil (Kafka partitions / Cores) * Time per record)
= (10 * 1000 * 15) / (Ceil(15/5) * 0.7) = 100 * 1000 * (15/3) * 
(1/7) = 10 * (5/7) = 71428.57
Expected number of records per batch = 71428.57 = ~71K
As per calculation ~71K is maximum throughput that can be achieved. Without 
buffer throughput is less since some time is wasted on blocked kafka poll.

In this PR I am making kafka polling strategy to be configurable so that it 
is possible to plugin external strategy as well.
Class name to create kafka consumer can be defined as 
"spark.streaming.kafka.consumer.builder.name" parameter.


Few points which you have asked last time :

*   Tests with recievers : This change is not applicable for recievers. 
Changes are only for direct approach.
*   Guarantees when driver dies : I have tested with the scenario when 
driver dies. Since new started driver starts from last committed offset it 
works as expected and does not loose any data.
*   Message size : Yes, pervious issue (40 sec poll) was more appearant 
when message size were big > 10KB. However, since "max.partition.fetch.bytes" 
is fixed as 1MB it should not be causing more poll time, since it have to carry 
same number of bytes irrespective of per record size. (However, I am not sure 
of Kafka server internals regarding this).
*   Testing on 2.4.0 cluster : I am still working on creating 2.4.0 
cluster. Will update test result on that setup as well.


[kafka_streaming_without_buffer.pdf](https://github.com/apache/spark/files/2246522/kafka_streaming_without_buffer.pdf)

[kafka_streaming_with_buffer.pdf](https://github.com/apache/spark/files/2246523/kafka_streaming_with_buffer.pdf)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-10 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/21685
  
In the meantime came something into my mind (the most obvious question).
What is the size of kafka events which is processed? Big events could end 
up in high polling time.
Maybe some of the events are significantly bigger than others.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-02 Thread sidhavratha
Github user sidhavratha commented on the issue:

https://github.com/apache/spark/pull/21685
  
And yes, both application are tested on same dataset, with only additional 
buffer logic applied, and consumer group-id changed.

In before case scheduling delay is increasing because of kafka poll time 
during few batches, which is exactly the problem I have tried to solve in this 
PR.

We are still trying to figure out reason for frequent high poll time, 
however, we should remove dependency of a batch processing time from kafka 
poll, which can be done due to executor stickiness for each kafka partition. 
This will provide benefit in terms of kafka poll time even if it is few seconds.

Also, the configuration I am adding is optional, without which job will 
behave as usual (blocking on poll).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-02 Thread sidhavratha
Github user sidhavratha commented on the issue:

https://github.com/apache/spark/pull/21685
  
If batch duration is 10 second, every 10 second 1 new batch will start 
irrespective of last batch was completed or not.

If a particular batch (10 second duration - which is supposed to complete 
in 10 second), takes more time to complete (for ex. 50 second in attached 
screenshot) that additional 40 sec will get added as scheduling delay of next 
batch. If poll time is included in processing time it can cause this sudden 
jump of scheduling delays of batches.

These scheduling delay will get cleared if some batches take less than 10 
sec. For ex. first batch in screenshot had 4s scheduled delay which got cleared 
for next batch as that batch took only 5s to process.

We are using backpressure to automatically control record count based of 
batch speed.

https://user-images.githubusercontent.com/2279976/42166788-c1c3890a-7e29-11e8-8d74-c2c251c7a6a1.png";>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-02 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/21685
  
What I can't really understand is why the `Scheduler Delay` is so different.

`
Scheduler delay includes time to ship the task from the scheduler to the 
executor, and time to send the task result from the executor to the scheduler. 
If scheduler delay is large, consider decreasing the size of tasks or 
decreasing the size of task results.
`

According to this in the `before` case (5min 20sec) either the source or 
the result set is quite big which is not the case in `after` (Avg. 160 ms). Is 
the the same application tested on the same dataset?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-02 Thread sidhavratha
Github user sidhavratha commented on the issue:

https://github.com/apache/spark/pull/21685
  
Thanks a lot for looking into this. Please find comments in [] below every 
points.

- You're trying to commit something into 2.4 but in the test result I see 
with 2.1.0 version. Have you tested it with 2.4? This part of the code has 
significantly changed. Results with this version would be better. 
[We do not have 2.4.0 cluster handy. Will try to spawn a 2.4.0 cluster and 
test the same.]

- In the before case the input rate was approximately the same just like in 
the after case constantly. After the initial good performance something wrong 
happened and decreased the rate significantly. What happened exactly there? 
Maybe memory filled up and not able to poll things without GC (just guessing)? 
[Kafka poll usually bring more records than one batch can process. In my 
case it bring ~500 records. That records will be in buffer for 4-5 batches, 
after which next poll will happen resulting in increased processing time. Also, 
not all kafka poll takes long time. We have raised issue with our kafka team, 
but it is inconclusive so far.] 
[I looked at GC time on executor (through Spark UI), which was 
insignificant. I will enable GC logs and run the job again.]

- Have you considered/tested when driver/receiver dies? Guarantees are 
quite important.
[I will test this scenario. Basically, Am I supposed to test if driver dies 
it should start from same place when it comes back up?]
- Have you tested it with receivers? Some results would be excellent.
[ I will get results with receivers as well.]


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-02 Thread gaborgsomogyi
Github user gaborgsomogyi commented on the issue:

https://github.com/apache/spark/pull/21685
  
In general `KafkaConsumer.poll` should take couple of seconds but 10+ is 
extreme high. The question `why it takes so long?` has to be answered first. In 
the processing time chart I see a trend which shows peeks periodically.

I have a couple of questions without checking the code:
* You're trying to commit something into 2.4 but in the test result I see 
with 2.1.0 version. Have you tested it with 2.4? This part of the code has 
significantly changed. Results with this version would be better.
* In the `before` case the input rate was approximately the same just like 
in the `after` case constantly. After the initial good performance something 
wrong happened and decreased the rate significantly. What happened exactly 
there? Maybe memory filled up and not able to poll things without GC (just 
guessing)?
* Have you considered/tested when driver/receiver dies? Guarantees are 
quite important. 
* Have you tested it with receivers? Some results would be excellent.

All in all IMO we haven't reached the root cause and because of that not 
able to judge whether it's the right solution.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-07-01 Thread sidhavratha
Github user sidhavratha commented on the issue:

https://github.com/apache/spark/pull/21685
  
@gaborgsomogyi Can you please review this PR and approve for test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21685
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21685
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21685: [SPARK-24707][DSTREAMS] Enable spark-kafka-streaming to ...

2018-06-30 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/21685
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org