Ben Teeuwen created SPARK-13939:
-----------------------------------

             Summary: Kafka createDirectStream not parallelizing properly
                 Key: SPARK-13939
                 URL: https://issues.apache.org/jira/browse/SPARK-13939
             Project: Spark
          Issue Type: Bug
            Reporter: Ben Teeuwen


I’m trying to get a streaming app running using pyspark (1.6.0), Kafka and the 
receiverless direct approach ‘createDirectStream’. But it seemingly has 
problems with the degree of parallelism in Spark. I’ve written the app both in 
Scala and Pyspark; both exhibit the same behavior.

Context:
- stream with 10-30k events per 10 seconds batch size.
- kafka topic has 10 partitions.
- createDirectStream with kafkaparams only metadata.broker.list, containing 4 
brokers.
- 10 executors 2 cores each, 3gb ram + 3gb ram driver mem.
- backpressure on
- not using speculative execution
- simple logic: parse json, create key-value tuple, flatmap, reduceByKey, 
pprint to screen.

At the start of the streaming:
First thing I do is repartition to spread the events evenly over all the 
executors. Looking at the streaming tab > batch details > Input Metadata, I see 
it ingests only from 1 kafka partition:
Kafka direct stream [0] topic: test    partition: 1    offsets: 16630012 to 
16639226

One executor is doing the repartitioning, and is taking more than the batch 
interval time. So backpressure kicks in. The events ingested as trimmed down to 
a 100. That gets processed in 2 seconds. Then slowly, more Kafka partitions are 
being used. E.g. 10 minutes later:
Kafka direct stream [0] topic: test    partition: 9    offsets: 16262300 to 
16262400
                                topic: test    partition: 1    offsets: 
16683171 to 16683271

When running for a day, the amount of kafka partitions it ingests from 
stabilizes around 3-6 partitions. But it never ingests the full stream, though 
it has more partitions to ingest from in parallel and executors to utilize.

So it loses of a lot of events, and it processes older events in later batches. 
E.g. printing min/max timestamps shows very events going back almost to the 
start of the streaming app.

My take from the ‘Simplified Parallelism’ bullet in the docs 
(http://spark.apache.org/docs/latest/streaming-kafka-integration.html), is not 
to worry about parallellism, as long as I provide sufficient resources. And 10 
execs with 2 cores receiving from a kafka stream with 10 partitions, containing 
10-30k events per 10 seconds, seems plentiful.

(this was discussed during Amsterdam Spark Meetup March 14 2016 with 
[~holdenk_amp], and she advised to write it up in a ticket here).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to