On Thu, Feb 13, 2014 at 10:27 PM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> 1. We are not setting any storage explicitly, hence I assume its using
> defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a
> NetworkStream it should be replicated. Correct me if I am wrong
>
No it wont. Use MEMORY_ONLY_2.


> 2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per
> worker)
>
You have to play around with this. But it should be comparable to the
number of cores in the cluster. But if this number if too big, then
performance may go down. So there is a sweet spot, you have to figure it
out by testing.


>
> 3. Batchduration for streaming context is set to 1 sec. I tried setting to
> 500 milli but did not help
>
> In the ui, only 2 types of stages are present - combineByKey and foreach.
> And combineByKey is taking much time compared to foreach
>
> By looking at stage ui as you suggested, i can see though foreach stage
> has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are
> per core which implies combineByKey is not utilizinfg all cores.
> What could be the reason for this?
>
>  I have attached the stage ui with sorted duration column
>
> Well it is clear that the combineByKey is taking the most amount of time
and 7 seconds. So you need to increase the number of reducers in the
reduceByKeyAndWindow operation. That should distribute the computation more
to use all the cores, and therefore speed up the processing of each batch.
However you have to set the batch interval such that batch interval >
processing time of each batch. Otherwise, the system is not able to process
as fast as batches of data are accumulating, so it is constantly getting
backlogged. So try increasing the number of reducers as well as increasing
the batch interval.

Also you can monitor the batch processing times and end-to-end delay using
the StreamingListener interface (see StreamingContext.addStreamingListener
in Spark 0.9). if the batch interval is not large enough you will find that
the the latency found with a streaming listener will keep growing.

Hope this helps.

TD


>
>
>
>
> On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Can you tell me more about the structure of your program? As in
>> 1) What storage levels are you using in the input stream?
>> 2) How many reducers are using for the reduceByKeyAndWindow?
>> 3) Batch interval and processing times seen with one machine vs two
>> machines.
>>
>> A good place to start debugging is the Spark web ui for the Spark
>> streaming application. It should running on the master at port 4040. There
>> if you look at the stage you should see patterns of stages repeatedly. You
>> can figure out the number of tasks in each stage, which stage is taking the
>> most amount of time (and is therefore the bottleneck) etc. You can drill
>> down and see where the tasks are running, is it using the 32 slots in the
>> new machine or not.
>>
>> TD
>>
>>
>> On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <
>> sourav.chan...@livestream.com> wrote:
>>
>>> Thanks TD.
>>>
>>> One more question:
>>>
>>> We are building real time analytics using spark streaming - We read from
>>> kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and
>>> then save to Cassandra (using DStream.foreachRDD).
>>> Initially I used a machine with 32 cores, 32 GB and performed load
>>> testing. with 1 master and 1 worker. in the same box. Later I added one
>>> more box and launched worker on that box (32 core 16GB). I set
>>> spark.executor.memory=10G in driver program
>>>
>>> I expected the performance should increase linearly as mentioned in
>>> spark streaming video but it did not help.
>>>
>>> Can you please explain why it is so? Also how can we increase?
>>>
>>> Thanks,
>>> Sourav
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> Answers inline. Hope these answer your questions.
>>>>
>>>> TD
>>>>
>>>>
>>>> On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <
>>>> sourav.chan...@livestream.com> wrote:
>>>>
>>>>> HI,
>>>>>
>>>>> I have couple of questions:
>>>>>
>>>>> 1. While going through the spark-streaming code, I found out there is
>>>>> one configuration in JobScheduler/Generator
>>>>> (spark.streaming.concurrentJobs) which is set to 1. There is no
>>>>> documentation for this parameter. After setting this to 1000 in driver
>>>>> program, our streaming application's performance is improved.
>>>>>
>>>>
>>>> That is a parameter that allows Spark Stremaing to launch multiple
>>>> Spark jobs simultaneously. While it can improve the performance in many
>>>> scenarios (as it has in your case), it can actually increase the processing
>>>> time of each batch and increase end-to-end latency in certain scenarios. So
>>>> it is something that needs to be used with caution. That said, we should
>>>> have definitely exposed it in the documentation.
>>>>
>>>>
>>>>> What is this variable used for? Is it safe to use/tweak this parameter?
>>>>>
>>>>> 2. Can someone explain the usage of MapOutputTracker, BlockManager
>>>>> component. I have gone through the youtube video of Matei about spark
>>>>> internals but this was not covered in detail.
>>>>>
>>>>
>>>> I am not sure if there is a detailed document anywhere that explains
>>>> but I can give you a high level overview of the both.
>>>>
>>>> BlockManager is like a distributed key-value store for large blobs
>>>> (called blocks) of data. It has a master-worker architecture (loosely it is
>>>> like the HDFS file system) where the BlockManager at the workers store the
>>>> data blocks and BlockManagerMaster stores the metadata for what blocks are
>>>> stored where. All the cached RDD's partitions and shuffle data are stored
>>>> and managed by the BlockManager. It also transfers the blocks between the
>>>> workers as needed (shuffles etc all happen through the block manager).
>>>> Specifically for spark streaming, the data received from outside is stored
>>>> in the BlockManager of the worker nodes, and the IDs of the blocks are
>>>> reported to the BlockManagerMaster.
>>>>
>>>> MapOutputTrackers is a simpler component that keeps track of the
>>>> location of the output of the map stage, so that workers running the reduce
>>>> stage knows which machines to pull the data from. That also has the
>>>> master-worker component - master has the full knowledge of the mapoutput
>>>> and the worker component on-demand pulls that knowledge from the master
>>>> component when the reduce tasks are executed on the worker.
>>>>
>>>>
>>>>
>>>>>
>>>>> 3. Can someone explain the usage of cache w.r.t spark streaming? For
>>>>> example if we do stream.cache(), will the cache remain constant with all
>>>>> the partitions of RDDs present across the nodes for that stream, OR will 
>>>>> it
>>>>> be regularly updated as in while new batch is coming?
>>>>>
>>>>> If you call DStream.persist (persist == cache = true), then all RDDs
>>>> generated by the DStream will be persisted in the cache (in the
>>>> BlockManager). As new RDDs are generated and persisted, old RDDs from the
>>>> same DStream will fall out of memory. either by LRU or explicitly if
>>>> spark.streaming.unpersist is set to true.
>>>>
>>>>
>>>>> Thanks,
>>>>> --
>>>>>
>>>>> Sourav Chandra
>>>>>
>>>>> Senior Software Engineer
>>>>>
>>>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>>>
>>>>> sourav.chan...@livestream.com
>>>>>
>>>>> o: +91 80 4121 8723
>>>>>
>>>>> m: +91 988 699 3746
>>>>>
>>>>> skype: sourav.chandra
>>>>>
>>>>> Livestream
>>>>>
>>>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main,
>>>>> 3rd Block, Koramangala Industrial Area,
>>>>>
>>>>> Bangalore 560034
>>>>>
>>>>> www.livestream.com
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> Sourav Chandra
>>>
>>> Senior Software Engineer
>>>
>>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>>
>>> sourav.chan...@livestream.com
>>>
>>> o: +91 80 4121 8723
>>>
>>> m: +91 988 699 3746
>>>
>>> skype: sourav.chandra
>>>
>>> Livestream
>>>
>>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>>> Block, Koramangala Industrial Area,
>>>
>>> Bangalore 560034
>>>
>>> www.livestream.com
>>>
>>
>>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>

Reply via email to