numStreams is 5 in my case. List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new ArrayList<>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0), kafkaStreams);
On Wed, Jan 21, 2015 at 3:19 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi Mukesh, > > How are you creating your receivers? Could you post the (relevant) code? > > -kr, Gerard. > > On Wed, Jan 21, 2015 at 9:42 AM, Mukesh Jha <me.mukesh....@gmail.com> > wrote: > >> Hello Guys, >> >> I've re partitioned my kafkaStream so that it gets evenly distributed >> among the executors and the results are better. >> Still from the executors page it seems that only 1 executors all 8 cores >> are getting used and other executors are using just 1 core. >> >> Is this the correct interpretation based on the below data? If so how can >> we fix this? >> >> [image: Inline image 1] >> >> On Wed, Dec 31, 2014 at 7:22 AM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Thats is kind of expected due to data locality. Though you should see >>> some tasks running on the executors as the data gets replicated to >>> other nodes and can therefore run tasks based on locality. You have >>> two solutions >>> >>> 1. kafkaStream.repartition() to explicitly repartition the received >>> data across the cluster. >>> 2. Create multiple kafka streams and union them together. >>> >>> See >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#reducing-the-processing-time-of-each-batch >>> >>> On Tue, Dec 30, 2014 at 1:43 AM, Mukesh Jha <me.mukesh....@gmail.com> >>> wrote: >>> > Thanks Sandy, It was the issue with the no of cores. >>> > >>> > Another issue I was facing is that tasks are not getting distributed >>> evenly >>> > among all executors and are running on the NODE_LOCAL locality level >>> i.e. >>> > all the tasks are running on the same executor where my >>> kafkareceiver(s) are >>> > running even though other executors are idle. >>> > >>> > I configured spark.locality.wait=50 instead of the default 3000 ms, >>> which >>> > forced the task rebalancing among nodes, let me know if there is a >>> better >>> > way to deal with this. >>> > >>> > >>> > On Tue, Dec 30, 2014 at 12:09 AM, Mukesh Jha <me.mukesh....@gmail.com> >>> > wrote: >>> >> >>> >> Makes sense, I've also tries it in standalone mode where all 3 >>> workers & >>> >> driver were running on the same 8 core box and the results were >>> similar. >>> >> >>> >> Anyways I will share the results in YARN mode with 8 core yarn >>> containers. >>> >> >>> >> On Mon, Dec 29, 2014 at 11:58 PM, Sandy Ryza <sandy.r...@cloudera.com >>> > >>> >> wrote: >>> >>> >>> >>> When running in standalone mode, each executor will be able to use >>> all 8 >>> >>> cores on the box. When running on YARN, each executor will only >>> have access >>> >>> to 2 cores. So the comparison doesn't seem fair, no? >>> >>> >>> >>> -Sandy >>> >>> >>> >>> On Mon, Dec 29, 2014 at 10:22 AM, Mukesh Jha < >>> me.mukesh....@gmail.com> >>> >>> wrote: >>> >>>> >>> >>>> Nope, I am setting 5 executors with 2 cores each. Below is the >>> command >>> >>>> that I'm using to submit in YARN mode. This starts up 5 executor >>> nodes and a >>> >>>> drives as per the spark application master UI. >>> >>>> >>> >>>> spark-submit --master yarn-cluster --num-executors 5 --driver-memory >>> >>>> 1024m --executor-memory 1024m --executor-cores 2 --class >>> >>>> com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar >>> vm.cloud.com:2181/kafka >>> >>>> spark-yarn avro 1 5000 >>> >>>> >>> >>>> On Mon, Dec 29, 2014 at 11:45 PM, Sandy Ryza < >>> sandy.r...@cloudera.com> >>> >>>> wrote: >>> >>>>> >>> >>>>> *oops, I mean are you setting --executor-cores to 8 >>> >>>>> >>> >>>>> On Mon, Dec 29, 2014 at 10:15 AM, Sandy Ryza < >>> sandy.r...@cloudera.com> >>> >>>>> wrote: >>> >>>>>> >>> >>>>>> Are you setting --num-executors to 8? >>> >>>>>> >>> >>>>>> On Mon, Dec 29, 2014 at 10:13 AM, Mukesh Jha < >>> me.mukesh....@gmail.com> >>> >>>>>> wrote: >>> >>>>>>> >>> >>>>>>> Sorry Sandy, The command is just for reference but I can confirm >>> that >>> >>>>>>> there are 4 executors and a driver as shown in the spark UI page. >>> >>>>>>> >>> >>>>>>> Each of these machines is a 8 core box with ~15G of ram. >>> >>>>>>> >>> >>>>>>> On Mon, Dec 29, 2014 at 11:23 PM, Sandy Ryza >>> >>>>>>> <sandy.r...@cloudera.com> wrote: >>> >>>>>>>> >>> >>>>>>>> Hi Mukesh, >>> >>>>>>>> >>> >>>>>>>> Based on your spark-submit command, it looks like you're only >>> >>>>>>>> running with 2 executors on YARN. Also, how many cores does >>> each machine >>> >>>>>>>> have? >>> >>>>>>>> >>> >>>>>>>> -Sandy >>> >>>>>>>> >>> >>>>>>>> On Mon, Dec 29, 2014 at 4:36 AM, Mukesh Jha >>> >>>>>>>> <me.mukesh....@gmail.com> wrote: >>> >>>>>>>>> >>> >>>>>>>>> Hello Experts, >>> >>>>>>>>> I'm bench-marking Spark on YARN >>> >>>>>>>>> (https://spark.apache.org/docs/latest/running-on-yarn.html) >>> vs a standalone >>> >>>>>>>>> spark cluster ( >>> https://spark.apache.org/docs/latest/spark-standalone.html). >>> >>>>>>>>> I have a standalone cluster with 3 executors, and a spark app >>> >>>>>>>>> running on yarn with 4 executors as shown below. >>> >>>>>>>>> >>> >>>>>>>>> The spark job running inside yarn is 10x slower than the one >>> >>>>>>>>> running on the standalone cluster (even though the yarn has >>> more number of >>> >>>>>>>>> workers), also in both the case all the executors are in the >>> same datacenter >>> >>>>>>>>> so there shouldn't be any latency. On YARN each 5sec batch is >>> reading data >>> >>>>>>>>> from kafka and processing it in 5sec & on the standalone >>> cluster each 5sec >>> >>>>>>>>> batch is getting processed in 0.4sec. >>> >>>>>>>>> Also, In YARN mode all the executors are not getting used up >>> evenly >>> >>>>>>>>> as vm-13 & vm-14 are running most of the tasks whereas in the >>> standalone >>> >>>>>>>>> mode all the executors are running the tasks. >>> >>>>>>>>> >>> >>>>>>>>> Do I need to set up some configuration to evenly distribute the >>> >>>>>>>>> tasks? Also do you have any pointers on the reasons the yarn >>> job is 10x >>> >>>>>>>>> slower than the standalone job? >>> >>>>>>>>> Any suggestion is greatly appreciated, Thanks in advance. >>> >>>>>>>>> >>> >>>>>>>>> YARN(5 workers + driver) >>> >>>>>>>>> ======================== >>> >>>>>>>>> Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT >>> Input >>> >>>>>>>>> ShuffleRead ShuffleWrite Thread Dump >>> >>>>>>>>> 1 vm-18.cloud.com:51796 0 0.0B/530.3MB 0.0 B 1 0 16 17 634 ms >>> 0.0 B >>> >>>>>>>>> 2047.0 B 1710.0 B Thread Dump >>> >>>>>>>>> 2 vm-13.cloud.com:57264 0 0.0B/530.3MB 0.0 B 0 0 1427 1427 >>> 5.5 m >>> >>>>>>>>> 0.0 B 0.0 B 0.0 B Thread Dump >>> >>>>>>>>> 3 vm-14.cloud.com:54570 0 0.0B/530.3MB 0.0 B 0 0 1379 1379 >>> 5.2 m >>> >>>>>>>>> 0.0 B 1368.0 B 2.8 KB Thread Dump >>> >>>>>>>>> 4 vm-11.cloud.com:56201 0 0.0B/530.3MB 0.0 B 0 0 10 10 625 ms >>> 0.0 B >>> >>>>>>>>> 1368.0 B 1026.0 B Thread Dump >>> >>>>>>>>> 5 vm-5.cloud.com:42958 0 0.0B/530.3MB 0.0 B 0 0 22 22 632 ms >>> 0.0 B >>> >>>>>>>>> 1881.0 B 2.8 KB Thread Dump >>> >>>>>>>>> <driver> vm.cloud.com:51847 0 0.0B/530.0MB 0.0 B 0 0 0 0 0 ms >>> 0.0 B >>> >>>>>>>>> 0.0 B 0.0 B Thread Dump >>> >>>>>>>>> >>> >>>>>>>>> /homext/spark/bin/spark-submit >>> >>>>>>>>> --master yarn-cluster --num-executors 2 --driver-memory 512m >>> >>>>>>>>> --executor-memory 512m --executor-cores 2 >>> >>>>>>>>> --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar >>> >>>>>>>>> vm.cloud.com:2181/kafka spark-yarn avro 1 5000 >>> >>>>>>>>> >>> >>>>>>>>> STANDALONE(3 workers + driver) >>> >>>>>>>>> ============================== >>> >>>>>>>>> Executor ID Address RDD Blocks Memory Used DU AT FT CT TT TT >>> Input >>> >>>>>>>>> ShuffleRead ShuffleWrite Thread Dump >>> >>>>>>>>> 0 vm-71.cloud.com:55912 0 0.0B/265.0MB 0.0 B 0 0 1069 1069 >>> 6.0 m >>> >>>>>>>>> 0.0 B 1534.0 B 3.0 KB Thread Dump >>> >>>>>>>>> 1 vm-72.cloud.com:40897 0 0.0B/265.0MB 0.0 B 0 0 1057 1057 >>> 5.9 m >>> >>>>>>>>> 0.0 B 1368.0 B 4.0 KB Thread Dump >>> >>>>>>>>> 2 vm-73.cloud.com:37621 0 0.0B/265.0MB 0.0 B 1 0 1059 1060 >>> 5.9 m >>> >>>>>>>>> 0.0 B 2.0 KB 1368.0 B Thread Dump >>> >>>>>>>>> <driver> vm.cloud.com:58299 0 0.0B/265.0MB 0.0 B 0 0 0 0 0 ms >>> 0.0 B >>> >>>>>>>>> 0.0 B 0.0 B Thread Dump >>> >>>>>>>>> >>> >>>>>>>>> /homext/spark/bin/spark-submit >>> >>>>>>>>> --master spark://chsnmvproc71vm3.usdc2.oraclecloud.com:7077 >>> >>>>>>>>> --class com.oracle.ci.CmsgK2H /homext/lib/MJ-ci-k2h.jar >>> >>>>>>>>> vm.cloud.com:2181/kafka spark-standalone avro 1 5000 >>> >>>>>>>>> >>> >>>>>>>>> PS: I did go through the spark website and >>> >>>>>>>>> http://www.virdata.com/tuning-spark/, but was out of any luck. >>> >>>>>>>>> >>> >>>>>>>>> -- >>> >>>>>>>>> Cheers, >>> >>>>>>>>> Mukesh Jha >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> -- >>> >>>>>>> >>> >>>>>>> >>> >>>>>>> Thanks & Regards, >>> >>>>>>> >>> >>>>>>> Mukesh Jha >>> >>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>>> >>> >>>> >>> >>>> -- >>> >>>> >>> >>>> >>> >>>> Thanks & Regards, >>> >>>> >>> >>>> Mukesh Jha >>> >>> >>> >>> >>> >> >>> >> >>> >> >>> >> -- >>> >> >>> >> >>> >> Thanks & Regards, >>> >> >>> >> Mukesh Jha >>> > >>> > >>> > >>> > >>> > -- >>> > >>> > >>> > Thanks & Regards, >>> > >>> > Mukesh Jha >>> >> >> >> >> -- >> >> >> Thanks & Regards, >> >> *Mukesh Jha <me.mukesh....@gmail.com>* >> > > -- Thanks & Regards, *Mukesh Jha <me.mukesh....@gmail.com>*