numStreams is 5 in my case.

 List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new
ArrayList<>(numStreams);
    for (int i = 0; i < numStreams; i++) {
      kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class,
byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf,
topicMap, StorageLevel.MEMORY_ONLY_SER()));
    }
    JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0),
kafkaStreams);

On Wed, Jan 21, 2015 at 3:19 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> Hi Mukesh,
>
> How are you creating your receivers? Could you post the (relevant) code?
>
> -kr, Gerard.
>
> On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha <me.mukesh....@gmail.com>
> wrote:
>
>> Hello Guys,
>>
>> I've re partitioned my kafkaStream so that it gets evenly distributed
>> among the executors and the results are better.
>> Still from the executors page it seems that only 1 executors all 8 cores
>> are getting used and other executors are using just 1 core.
>>
>> Is this the correct interpretation based on the below data? If so how can
>> we fix this?
>>
>> [image: Inline image 1]
>>
>> On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Thats is kind of expected due to data locality. Though you should see
>>> some tasks running on the executors as the data gets replicated to
>>> other nodes and can therefore run tasks based on locality. You have
>>> two solutions
>>>
>>> 1. kafkaStream.repartition() to explicitly repartition the received
>>> data across the cluster.
>>> 2. Create multiple kafka streams and union them together.
>>>
>>> See
>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch
>>>
>>> On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha <me.mukesh....@gmail.com>
>>> wrote:
>>> > Thanks Sandy, It was the issue with the no of cores.
>>> >
>>> > Another issue I was facing is that tasks are not getting distributed
>>> evenly
>>> > among all executors and are running on the NODE_LOCAL locality level
>>> i.e.
>>> > all the tasks are running on the same executor where my
>>> kafkareceiver(s) are
>>> > running even though other executors are idle.
>>> >
>>> > I configured spark.locality.wait=50 instead of the default 3000 ms,
>>> which
>>> > forced the task rebalancing among nodes, let me know if there is a
>>> better
>>> > way to deal with this.
>>> >
>>> >
>>> > On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha <me.mukesh....@gmail.com>
>>> > wrote:
>>> >>
>>> >> Makes sense, I've also tries it in standalone mode where all 3
>>> workers &
>>> >> driver were running on the same 8 core box and the results were
>>> similar.
>>> >>
>>> >> Anyways I will share the results in YARN mode with 8 core yarn
>>> containers.
>>> >>
>>> >> On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza <sandy.r...@cloudera.com
>>> >
>>> >> wrote:
>>> >>>
>>> >>> When running in standalone mode, each executor will be able to use
>>> all 8
>>> >>> cores on the box.  When running on YARN, each executor will only
>>> have access
>>> >>> to 2 cores.  So the comparison doesn't seem fair, no?
>>> >>>
>>> >>> -Sandy
>>> >>>
>>> >>> On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha <
>>> me.mukesh....@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> Nope, I am setting 5 executors with 2  cores each. Below is the
>>> command
>>> >>>> that I'm using to submit in YARN mode. This starts up 5 executor
>>> nodes and a
>>> >>>> drives as per the spark  application master UI.
>>> >>>>
>>> >>>> spark-submit --master yarn-cluster --num-executors 5 --driver-memory
>>> >>>> 1024m --executor-memory 1024m --executor-cores 2 --class
>>> >>>> com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
>>> vm.cloud.com:2181/kafka
>>> >>>> spark-yarn avro 1 5000
>>> >>>>
>>> >>>> On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza <
>>> sandy.r...@cloudera.com>
>>> >>>> wrote:
>>> >>>>>
>>> >>>>> *oops, I mean are you setting --executor-cores to 8
>>> >>>>>
>>> >>>>> On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza <
>>> sandy.r...@cloudera.com>
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Are you setting --num-executors to 8?
>>> >>>>>>
>>> >>>>>> On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha <
>>> me.mukesh....@gmail.com>
>>> >>>>>> wrote:
>>> >>>>>>>
>>> >>>>>>> Sorry Sandy, The command is just for reference but I can confirm
>>> that
>>> >>>>>>> there are 4 executors and a driver as shown in the spark UI page.
>>> >>>>>>>
>>> >>>>>>> Each of these machines is a 8 core box with ~15G of ram.
>>> >>>>>>>
>>> >>>>>>> On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza
>>> >>>>>>> <sandy.r...@cloudera.com> wrote:
>>> >>>>>>>>
>>> >>>>>>>> Hi Mukesh,
>>> >>>>>>>>
>>> >>>>>>>> Based on your spark-submit command, it looks like you're only
>>> >>>>>>>> running with 2 executors on YARN.  Also, how many cores does
>>> each machine
>>> >>>>>>>> have?
>>> >>>>>>>>
>>> >>>>>>>> -Sandy
>>> >>>>>>>>
>>> >>>>>>>> On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha
>>> >>>>>>>> <me.mukesh....@gmail.com> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> Hello Experts,
>>> >>>>>>>>> I'm bench-marking Spark on YARN
>>> >>>>>>>>> (https://spark.apache.org/docs/latest/running-on-yarn.html)
>>> vs a standalone
>>> >>>>>>>>> spark cluster (
>>> https://spark.apache.org/docs/latest/spark-standalone.html).
>>> >>>>>>>>> I have a standalone cluster with 3 executors, and a spark app
>>> >>>>>>>>> running on yarn with 4 executors as shown below.
>>> >>>>>>>>>
>>> >>>>>>>>> The spark job running inside yarn is 10x slower than the one
>>> >>>>>>>>> running on the standalone cluster (even though the yarn has
>>> more number of
>>> >>>>>>>>> workers), also in both the case all the executors are in the
>>> same datacenter
>>> >>>>>>>>> so there shouldn't be any latency. On YARN each 5sec batch is
>>> reading data
>>> >>>>>>>>> from kafka and processing it in 5sec & on the standalone
>>> cluster each 5sec
>>> >>>>>>>>> batch is getting processed in 0.4sec.
>>> >>>>>>>>> Also, In YARN mode all the executors are not getting used up
>>> evenly
>>> >>>>>>>>> as vm-13 & vm-14 are running most of the tasks whereas in the
>>> standalone
>>> >>>>>>>>> mode all the executors are running the tasks.
>>> >>>>>>>>>
>>> >>>>>>>>> Do I need to set up some configuration to evenly distribute the
>>> >>>>>>>>> tasks? Also do you have any pointers on the reasons the yarn
>>> job is 10x
>>> >>>>>>>>> slower than the standalone job?
>>> >>>>>>>>> Any suggestion is greatly appreciated, Thanks in advance.
>>> >>>>>>>>>
>>> >>>>>>>>> YARN(5 workers + driver)
>>> >>>>>>>>> ========================
>>> >>>>>>>>> Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT
>>> Input
>>> >>>>>>>>> ShuffleRead ShuffleWrite Thread Dump
>>> >>>>>>>>> 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms
>>> 0.0 B
>>> >>>>>>>>> 2047.0 B 1710.0 B Thread Dump
>>> >>>>>>>>> 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427
>>> 5.5 m
>>> >>>>>>>>> 0.0 B 0.0 B 0.0 B Thread Dump
>>> >>>>>>>>> 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379
>>> 5.2 m
>>> >>>>>>>>> 0.0 B 1368.0 B 2.8 KB Thread Dump
>>> >>>>>>>>> 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms
>>> 0.0 B
>>> >>>>>>>>> 1368.0 B 1026.0 B Thread Dump
>>> >>>>>>>>> 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms
>>> 0.0 B
>>> >>>>>>>>> 1881.0 B 2.8 KB Thread Dump
>>> >>>>>>>>> <driver> vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms
>>> 0.0 B
>>> >>>>>>>>> 0.0 B 0.0 B Thread Dump
>>> >>>>>>>>>
>>> >>>>>>>>> /homext/spark/bin/spark-submit
>>> >>>>>>>>> --master yarn-cluster --num-executors 2 --driver-memory 512m
>>> >>>>>>>>> --executor-memory 512m --executor-cores 2
>>> >>>>>>>>> --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
>>> >>>>>>>>> vm.cloud.com:2181/kafka spark-yarn avro 1 5000
>>> >>>>>>>>>
>>> >>>>>>>>> STANDALONE(3 workers + driver)
>>> >>>>>>>>> ==============================
>>> >>>>>>>>> Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT
>>> Input
>>> >>>>>>>>> ShuffleRead ShuffleWrite Thread Dump
>>> >>>>>>>>> 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069
>>> 6.0 m
>>> >>>>>>>>> 0.0 B 1534.0 B 3.0 KB Thread Dump
>>> >>>>>>>>> 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057
>>> 5.9 m
>>> >>>>>>>>> 0.0 B 1368.0 B 4.0 KB Thread Dump
>>> >>>>>>>>> 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060
>>> 5.9 m
>>> >>>>>>>>> 0.0 B 2.0 KB 1368.0 B Thread Dump
>>> >>>>>>>>> <driver> vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms
>>> 0.0 B
>>> >>>>>>>>> 0.0 B 0.0 B Thread Dump
>>> >>>>>>>>>
>>> >>>>>>>>> /homext/spark/bin/spark-submit
>>> >>>>>>>>> --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077
>>> >>>>>>>>> --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar
>>> >>>>>>>>> vm.cloud.com:2181/kafka spark-standalone avro 1 5000
>>> >>>>>>>>>
>>> >>>>>>>>> PS: I did go through the spark website and
>>> >>>>>>>>> http://www.virdata.com/tuning-spark/, but was out of any luck.
>>> >>>>>>>>>
>>> >>>>>>>>> --
>>> >>>>>>>>> Cheers,
>>> >>>>>>>>> Mukesh Jha
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> --
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> Thanks & Regards,
>>> >>>>>>>
>>> >>>>>>> Mukesh Jha
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> --
>>> >>>>
>>> >>>>
>>> >>>> Thanks & Regards,
>>> >>>>
>>> >>>> Mukesh Jha
>>> >>>
>>> >>>
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >>
>>> >>
>>> >> Thanks & Regards,
>>> >>
>>> >> Mukesh Jha
>>> >
>>> >
>>> >
>>> >
>>> > --
>>> >
>>> >
>>> > Thanks & Regards,
>>> >
>>> > Mukesh Jha
>>>
>>
>>
>>
>> --
>>
>>
>> Thanks & Regards,
>>
>> *Mukesh Jha <me.mukesh....@gmail.com>*
>>
>
>


-- 


Thanks & Regards,

*Mukesh Jha <me.mukesh....@gmail.com>*

Reply via email to