Hi Tathagata, It seems repartition does not necessarily force Spark to distribute the data into different executors. I have launched a new job which uses repartition right after I received data from Kafka. For the first two batches, the reduce stage used more than 80 executors. Starting from the third batch, there were always only 2 executors in the reduce task (combineByKey). Even with the first batch which used more than 80 executors, it took 2.4 mins to finish the reduce stage for a very small amount of data.
Bill On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <tathagata.das1...@gmail.com > wrote: > After using repartition(300), how many executors did it run on? By the > way, repartitions(300) means it will divide the shuffled data into 300 > partitions. Since there are many cores on each of the 300 > machines/executors, these partitions (each requiring a core) may not be > spread all 300 executors. Hence, if you really want spread it all 300 > executors, you may have to bump up the partitions even more. However, > increasing the partitions to too high may not be beneficial, and you will > have play around with the number to figure out sweet spot that reduces the > time to process the stage / time to process the whole batch. > > TD > > > On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bill.jaypeter...@gmail.com> > wrote: > >> Hi Tathagata, >> >> Do you mean that the data is not shuffled until the reduce stage? That >> means groupBy still only uses 2 machines? >> >> I think I used repartition(300) after I read the data from Kafka into >> DStream. It seems that it did not guarantee that the map or reduce stages >> will be run on 300 machines. I am currently trying to initiate 100 DStream >> from KafkaUtils.createDStream and union them. Now the reduce stages had >> around 80 machines for all the batches. However, this method will introduce >> many dstreams. It will be good if we can control the number of executors in >> the groupBy operation because the calculation needs to be finished within 1 >> minute for different size of input data based on our production need. >> >> Thanks! >> >> >> Bill >> >> >> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> Aah, I get it now. That is because the input data streams is replicated >>> on two machines, so by locality the data is processed on those two >>> machines. So the "map" stage on the data uses 2 executors, but the "reduce" >>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the >>> default parallelism takes into affect only when the data is explicitly >>> shuffled around. >>> >>> You can fix this by explicitly repartitioning the data. >>> >>> inputDStream.repartition(partitions) >>> >>> This is covered in the streaming tuning guide >>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving> >>> . >>> >>> TD >>> >>> >>> >>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bill.jaypeter...@gmail.com> >>> wrote: >>> >>>> Hi folks, >>>> >>>> I just ran another job that only received data from Kafka, did some >>>> filtering, and then save as text files in HDFS. There was no reducing work >>>> involved. Surprisingly, the number of executors for the saveAsTextFiles >>>> stage was also 2 although I specified 300 executors in the job submission. >>>> As a result, the simple save file action took more than 2 minutes. Do you >>>> have any idea how Spark determined the number of executors >>>> for different stages? >>>> >>>> Thanks! >>>> >>>> Bill >>>> >>>> >>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bill.jaypeter...@gmail.com> >>>> wrote: >>>> >>>>> Hi Tathagata, >>>>> >>>>> Below is my main function. I omit some filtering and data conversion >>>>> functions. These functions are just a one-to-one mapping, which may not >>>>> possible increase running time. The only reduce function I have here is >>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics >>>>> have 240k lines each minute. And the other two topics have less than 30k >>>>> lines per minute. The batch size is one minute and I specified 300 >>>>> executors in my spark-submit script. The default parallelism is 300. >>>>> >>>>> >>>>> val parition = 300 >>>>> val zkQuorum = "zk1,zk2,zk3" >>>>> val group = "my-group-" + currentTime.toString >>>>> val topics = "topic1,topic2,topic3,topic4" >>>>> val numThreads = 4 >>>>> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap >>>>> ssc = new StreamingContext(conf, Seconds(batch)) >>>>> ssc.checkpoint(hadoopOutput + "checkpoint") >>>>> val lines = lines1 >>>>> lines.cache() >>>>> val jsonData = lines.map(JSON.parseFull(_)) >>>>> val mapData = jsonData.filter(_.isDefined) >>>>> >>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) >>>>> val validMapData = mapData.filter(isValidData(_)) >>>>> val fields = validMapData.map(data => (data("id").toString, >>>>> timestampToUTCUnix(data("time").toString), >>>>> >>>>> timestampToUTCUnix(data("local_time").toString), data("id2").toString, >>>>> data("id3").toString, >>>>> data("log_type").toString, data("sub_log_type").toString)) >>>>> val timeDiff = 3600L >>>>> val filteredFields = fields.filter(field => abs(field._2 - >>>>> field._3) <= timeDiff) >>>>> >>>>> val watchTimeFields = filteredFields.map(fields => (fields._1, >>>>> fields._2, fields._4, fields._5, fields._7)) >>>>> val watchTimeTuples = watchTimeFields.map(fields => >>>>> getWatchtimeTuple(fields)) >>>>> val programDuids = watchTimeTuples.map(fields => (fields._3, >>>>> fields._1)).groupByKey(partition) >>>>> val programDuidNum = programDuids.map{case(key, value) => (key, >>>>> value.toSet.size)} >>>>> programDuidNum.saveAsTextFiles(hadoopOutput+"result") >>>>> >>>>> I have been working on this for several days. No findings why there >>>>> are always 2 executors for the groupBy stage. Thanks a lot! >>>>> >>>>> Bill >>>>> >>>>> >>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das < >>>>> tathagata.das1...@gmail.com> wrote: >>>>> >>>>>> Can you show us the program that you are running. If you are setting >>>>>> number of partitions in the XYZ-ByKey operation as 300, then there should >>>>>> be 300 tasks for that stage, distributed on the 50 executors are >>>>>> allocated >>>>>> to your context. However the data distribution may be skewed in which >>>>>> case, >>>>>> you can use a repartition operation to redistributed the data more evenly >>>>>> (both DStream and RDD have repartition). >>>>>> >>>>>> TD >>>>>> >>>>>> >>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay < >>>>>> bill.jaypeter...@gmail.com> wrote: >>>>>> >>>>>>> Hi Tathagata, >>>>>>> >>>>>>> I also tried to use the number of partitions as parameters to the >>>>>>> functions such as groupByKey. It seems the numbers of executors is >>>>>>> around >>>>>>> 50 instead of 300, which is the number of the executors I specified in >>>>>>> submission script. Moreover, the running time of different executors is >>>>>>> skewed. The ideal case is that Spark can distribute the data into 300 >>>>>>> executors evenly so that the computation can be efficiently finished. I >>>>>>> am >>>>>>> not sure how to achieve this. >>>>>>> >>>>>>> Thanks! >>>>>>> >>>>>>> Bill >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das < >>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>> >>>>>>>> Can you try setting the number-of-partitions in all the >>>>>>>> shuffle-based DStream operations, explicitly. It may be the case that >>>>>>>> the >>>>>>>> default parallelism (that is, spark.default.parallelism) is probably >>>>>>>> not >>>>>>>> being respected. >>>>>>>> >>>>>>>> Regarding the unusual delay, I would look at the task details of >>>>>>>> that stage in the Spark web ui. It will show break of time for each >>>>>>>> task, >>>>>>>> including GC times, etc. That might give some indication. >>>>>>>> >>>>>>>> TD >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay < >>>>>>>> bill.jaypeter...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Tathagata, >>>>>>>>> >>>>>>>>> I set default parallelism as 300 in my configuration file. >>>>>>>>> Sometimes there are more executors in a job. However, it is still >>>>>>>>> slow. And >>>>>>>>> I further observed that most executors take less than 20 seconds but >>>>>>>>> two of >>>>>>>>> them take much longer such as 2 minutes. The data size is very small >>>>>>>>> (less >>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by >>>>>>>>> operation takes more then 3 minutes. Thanks! >>>>>>>>> >>>>>>>>> Bill >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das < >>>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Are you specifying the number of reducers in all the >>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then >>>>>>>>>> the >>>>>>>>>> number of reducers used in the stages can keep changing across >>>>>>>>>> batches. >>>>>>>>>> >>>>>>>>>> TD >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay < >>>>>>>>>> bill.jaypeter...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>> >>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data >>>>>>>>>>> from Kafka and group the data by a certain field. The data size is >>>>>>>>>>> 480k >>>>>>>>>>> lines per minute where the batch size is 1 minute. >>>>>>>>>>> >>>>>>>>>>> For some batches, the program sometimes take more than 3 minute >>>>>>>>>>> to finish the groupBy operation, which seems slow to me. I >>>>>>>>>>> allocated 300 >>>>>>>>>>> workers and specify 300 as the partition number for groupby. When I >>>>>>>>>>> checked >>>>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* >>>>>>>>>>> there are sometimes 2 executors allocated for this stage. However, >>>>>>>>>>> during >>>>>>>>>>> other batches, the executors can be several hundred for the same >>>>>>>>>>> stage, >>>>>>>>>>> which means the number of executors for the same operations change. >>>>>>>>>>> >>>>>>>>>>> Does anyone know how Spark allocate the number of executors for >>>>>>>>>>> different stages and how to increase the efficiency for task? >>>>>>>>>>> Thanks! >>>>>>>>>>> >>>>>>>>>>> Bill >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >