[ 
https://issues.apache.org/jira/browse/SPARK-7053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Platon Potapov updated SPARK-7053:
----------------------------------
    Description: 
I have a simple spark streaming application that reads messages from a Kafka 
topic, applies trivial transforms (window, map, mapValues, foreachRDD), and 
throws away the results. A test application, really. Streaming batch duration 
is 2 seconds.

The kafka topic is steadily populated by 5000 messages each second by a data 
generator application. Each message is small (a few double values).

The problem:
* in case KafkaUtils.createStream is used to create a receiver fetching 
messages from Kafka, "Processing Time" (as seen at 
http://localhost:4040/streaming) starts with hundreds of milliseconds, but 
steadily increases over time, eventually resulting in scheduling delays and out 
of memory. Note that memory consumption (by the JVM) and garbage collection 
pauses grow over time.
* in case KafkaUtils.createDirectStream is used (the rest of the setup is 
exactly the same), processing time starts at about 5 second (compare to 
hundreds of milliseconds in case of kafka receiver), but remains at this level.
* in case the input is substituted to an in-memory queue populated by the same 
messages at the same rate (streamingContext.queueStream(queue)), processing 
time is again hundreds of milliseconds, and remains stable.

This leads me to conclude that the setup with KafkaUtils.createStream receiver 
leaks resources.

Attached are the graphs depicting each of the three scenarios.


  was:
I have a simple spark streaming application that reads messages from a Kafka 
topic, applies trivial transforms (window, map, mapValues, foreachRDD), and 
throws away the results. A test application, really. Streaming batch duration 
is 2 seconds.

The kafka topic is steadily populated by 5000 messages each second by a data 
generator application. Each message is small (a few double values).

The problem:
* in case KafkaUtils.createStream is used to create a receiver fetching 
messages from Kafka, "Processing Time" (as seen at 
http://localhost:4040/streaming) starts with hundreds of milliseconds, but 
steadily increases over time, eventually resulting in scheduling delays and out 
of memory.
* in case KafkaUtils.createDirectStream is used (the rest of the setup is the 
same), processing time starts at about 5 second (compare to hundreds of 
milliseconds in case of kafka receiver), but remains at this level.
* in case the input is substituted to an in-memory queue populated by the same 
messages at the same rate (streamingContext.queueStream(queue)), processing 
time is again hundreds of milliseconds, and remains stable.

This leads me to conclude that the setup with KafkaUtils.createStream receiver 
leaks resources.

Attached are the graphs depicting each of the three scenarios.



> KafkaUtils.createStream leaks resources
> ---------------------------------------
>
>                 Key: SPARK-7053
>                 URL: https://issues.apache.org/jira/browse/SPARK-7053
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.1
>         Environment: Spark Streaming 1.3.1, standalone mode running on just 1 
> box: Ubuntu 14.04.2 LTS, 4 cores, 8GB RAM, java version "1.8.0_40"
>            Reporter: Platon Potapov
>         Attachments: from.kafka.direct.stream.gif, from.kafka.receiver.gif, 
> from.queue.gif
>
>
> I have a simple spark streaming application that reads messages from a Kafka 
> topic, applies trivial transforms (window, map, mapValues, foreachRDD), and 
> throws away the results. A test application, really. Streaming batch duration 
> is 2 seconds.
> The kafka topic is steadily populated by 5000 messages each second by a data 
> generator application. Each message is small (a few double values).
> The problem:
> * in case KafkaUtils.createStream is used to create a receiver fetching 
> messages from Kafka, "Processing Time" (as seen at 
> http://localhost:4040/streaming) starts with hundreds of milliseconds, but 
> steadily increases over time, eventually resulting in scheduling delays and 
> out of memory. Note that memory consumption (by the JVM) and garbage 
> collection pauses grow over time.
> * in case KafkaUtils.createDirectStream is used (the rest of the setup is 
> exactly the same), processing time starts at about 5 second (compare to 
> hundreds of milliseconds in case of kafka receiver), but remains at this 
> level.
> * in case the input is substituted to an in-memory queue populated by the 
> same messages at the same rate (streamingContext.queueStream(queue)), 
> processing time is again hundreds of milliseconds, and remains stable.
> This leads me to conclude that the setup with KafkaUtils.createStream 
> receiver leaks resources.
> Attached are the graphs depicting each of the three scenarios.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to