Hi Tathagata, I have tried the repartition method. The reduce stage first had 2 executors and then it had around 85 executors. I specified repartition(300) and each of the executors were specified 2 cores when I submitted the job. This shows repartition works to increase more executors. However, the running time was still around 50 seconds although I only did a simple groupby operation. I think repartition may consume part of the running time. Considering the input source of Kafka, is there a way to make the program even faster? Thanks!
On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Can you give me a screen shot of the stages page in the web ui, the spark > logs, and the code that is causing this behavior. This seems quite weird to > me. > > TD > > > On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay <bill.jaypeter...@gmail.com> > wrote: > >> Hi Tathagata, >> >> It seems repartition does not necessarily force Spark to distribute the >> data into different executors. I have launched a new job which uses >> repartition right after I received data from Kafka. For the first two >> batches, the reduce stage used more than 80 executors. Starting from the >> third batch, there were always only 2 executors in the reduce task >> (combineByKey). Even with the first batch which used more than 80 >> executors, it took 2.4 mins to finish the reduce stage for a very small >> amount of data. >> >> Bill >> >> >> On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> After using repartition(300), how many executors did it run on? By the >>> way, repartitions(300) means it will divide the shuffled data into 300 >>> partitions. Since there are many cores on each of the 300 >>> machines/executors, these partitions (each requiring a core) may not be >>> spread all 300 executors. Hence, if you really want spread it all 300 >>> executors, you may have to bump up the partitions even more. However, >>> increasing the partitions to too high may not be beneficial, and you will >>> have play around with the number to figure out sweet spot that reduces the >>> time to process the stage / time to process the whole batch. >>> >>> TD >>> >>> >>> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bill.jaypeter...@gmail.com> >>> wrote: >>> >>>> Hi Tathagata, >>>> >>>> Do you mean that the data is not shuffled until the reduce stage? That >>>> means groupBy still only uses 2 machines? >>>> >>>> I think I used repartition(300) after I read the data from Kafka into >>>> DStream. It seems that it did not guarantee that the map or reduce stages >>>> will be run on 300 machines. I am currently trying to initiate 100 DStream >>>> from KafkaUtils.createDStream and union them. Now the reduce stages had >>>> around 80 machines for all the batches. However, this method will introduce >>>> many dstreams. It will be good if we can control the number of executors in >>>> the groupBy operation because the calculation needs to be finished within 1 >>>> minute for different size of input data based on our production need. >>>> >>>> Thanks! >>>> >>>> >>>> Bill >>>> >>>> >>>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> Aah, I get it now. That is because the input data streams is >>>>> replicated on two machines, so by locality the data is processed on those >>>>> two machines. So the "map" stage on the data uses 2 executors, but the >>>>> "reduce" stage, (after groupByKey) the saveAsTextFiles would use 300 >>>>> tasks. >>>>> And the default parallelism takes into affect only when the data is >>>>> explicitly shuffled around. >>>>> >>>>> You can fix this by explicitly repartitioning the data. >>>>> >>>>> inputDStream.repartition(partitions) >>>>> >>>>> This is covered in the streaming tuning guide >>>>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving> >>>>> . >>>>> >>>>> TD >>>>> >>>>> >>>>> >>>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bill.jaypeter...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi folks, >>>>>> >>>>>> I just ran another job that only received data from Kafka, did some >>>>>> filtering, and then save as text files in HDFS. There was no reducing >>>>>> work >>>>>> involved. Surprisingly, the number of executors for the saveAsTextFiles >>>>>> stage was also 2 although I specified 300 executors in the job >>>>>> submission. >>>>>> As a result, the simple save file action took more than 2 minutes. Do you >>>>>> have any idea how Spark determined the number of executors >>>>>> for different stages? >>>>>> >>>>>> Thanks! >>>>>> >>>>>> Bill >>>>>> >>>>>> >>>>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bill.jaypeter...@gmail.com >>>>>> > wrote: >>>>>> >>>>>>> Hi Tathagata, >>>>>>> >>>>>>> Below is my main function. I omit some filtering and data conversion >>>>>>> functions. These functions are just a one-to-one mapping, which may not >>>>>>> possible increase running time. The only reduce function I have here is >>>>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics >>>>>>> have 240k lines each minute. And the other two topics have less than 30k >>>>>>> lines per minute. The batch size is one minute and I specified 300 >>>>>>> executors in my spark-submit script. The default parallelism is 300. >>>>>>> >>>>>>> >>>>>>> val parition = 300 >>>>>>> val zkQuorum = "zk1,zk2,zk3" >>>>>>> val group = "my-group-" + currentTime.toString >>>>>>> val topics = "topic1,topic2,topic3,topic4" >>>>>>> val numThreads = 4 >>>>>>> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap >>>>>>> ssc = new StreamingContext(conf, Seconds(batch)) >>>>>>> ssc.checkpoint(hadoopOutput + "checkpoint") >>>>>>> val lines = lines1 >>>>>>> lines.cache() >>>>>>> val jsonData = lines.map(JSON.parseFull(_)) >>>>>>> val mapData = jsonData.filter(_.isDefined) >>>>>>> >>>>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]]) >>>>>>> val validMapData = mapData.filter(isValidData(_)) >>>>>>> val fields = validMapData.map(data => (data("id").toString, >>>>>>> timestampToUTCUnix(data("time").toString), >>>>>>> >>>>>>> timestampToUTCUnix(data("local_time").toString), data("id2").toString, >>>>>>> data("id3").toString, >>>>>>> data("log_type").toString, data("sub_log_type").toString)) >>>>>>> val timeDiff = 3600L >>>>>>> val filteredFields = fields.filter(field => abs(field._2 - >>>>>>> field._3) <= timeDiff) >>>>>>> >>>>>>> val watchTimeFields = filteredFields.map(fields => (fields._1, >>>>>>> fields._2, fields._4, fields._5, fields._7)) >>>>>>> val watchTimeTuples = watchTimeFields.map(fields => >>>>>>> getWatchtimeTuple(fields)) >>>>>>> val programDuids = watchTimeTuples.map(fields => (fields._3, >>>>>>> fields._1)).groupByKey(partition) >>>>>>> val programDuidNum = programDuids.map{case(key, value) => (key, >>>>>>> value.toSet.size)} >>>>>>> programDuidNum.saveAsTextFiles(hadoopOutput+"result") >>>>>>> >>>>>>> I have been working on this for several days. No findings why there >>>>>>> are always 2 executors for the groupBy stage. Thanks a lot! >>>>>>> >>>>>>> Bill >>>>>>> >>>>>>> >>>>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das < >>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>> >>>>>>>> Can you show us the program that you are running. If you are >>>>>>>> setting number of partitions in the XYZ-ByKey operation as 300, then >>>>>>>> there >>>>>>>> should be 300 tasks for that stage, distributed on the 50 executors are >>>>>>>> allocated to your context. However the data distribution may be skewed >>>>>>>> in >>>>>>>> which case, you can use a repartition operation to redistributed the >>>>>>>> data >>>>>>>> more evenly (both DStream and RDD have repartition). >>>>>>>> >>>>>>>> TD >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay < >>>>>>>> bill.jaypeter...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Tathagata, >>>>>>>>> >>>>>>>>> I also tried to use the number of partitions as parameters to the >>>>>>>>> functions such as groupByKey. It seems the numbers of executors is >>>>>>>>> around >>>>>>>>> 50 instead of 300, which is the number of the executors I specified in >>>>>>>>> submission script. Moreover, the running time of different executors >>>>>>>>> is >>>>>>>>> skewed. The ideal case is that Spark can distribute the data into 300 >>>>>>>>> executors evenly so that the computation can be efficiently finished. >>>>>>>>> I am >>>>>>>>> not sure how to achieve this. >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> >>>>>>>>> Bill >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das < >>>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Can you try setting the number-of-partitions in all the >>>>>>>>>> shuffle-based DStream operations, explicitly. It may be the case >>>>>>>>>> that the >>>>>>>>>> default parallelism (that is, spark.default.parallelism) is probably >>>>>>>>>> not >>>>>>>>>> being respected. >>>>>>>>>> >>>>>>>>>> Regarding the unusual delay, I would look at the task details of >>>>>>>>>> that stage in the Spark web ui. It will show break of time for each >>>>>>>>>> task, >>>>>>>>>> including GC times, etc. That might give some indication. >>>>>>>>>> >>>>>>>>>> TD >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay < >>>>>>>>>> bill.jaypeter...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Tathagata, >>>>>>>>>>> >>>>>>>>>>> I set default parallelism as 300 in my configuration file. >>>>>>>>>>> Sometimes there are more executors in a job. However, it is still >>>>>>>>>>> slow. And >>>>>>>>>>> I further observed that most executors take less than 20 seconds >>>>>>>>>>> but two of >>>>>>>>>>> them take much longer such as 2 minutes. The data size is very >>>>>>>>>>> small (less >>>>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by >>>>>>>>>>> operation takes more then 3 minutes. Thanks! >>>>>>>>>>> >>>>>>>>>>> Bill >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das < >>>>>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Are you specifying the number of reducers in all the >>>>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, >>>>>>>>>>>> then the >>>>>>>>>>>> number of reducers used in the stages can keep changing across >>>>>>>>>>>> batches. >>>>>>>>>>>> >>>>>>>>>>>> TD >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay < >>>>>>>>>>>> bill.jaypeter...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi all, >>>>>>>>>>>>> >>>>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data >>>>>>>>>>>>> from Kafka and group the data by a certain field. The data size >>>>>>>>>>>>> is 480k >>>>>>>>>>>>> lines per minute where the batch size is 1 minute. >>>>>>>>>>>>> >>>>>>>>>>>>> For some batches, the program sometimes take more than 3 >>>>>>>>>>>>> minute to finish the groupBy operation, which seems slow to me. I >>>>>>>>>>>>> allocated >>>>>>>>>>>>> 300 workers and specify 300 as the partition number for groupby. >>>>>>>>>>>>> When I >>>>>>>>>>>>> checked the slow stage *"combineByKey at >>>>>>>>>>>>> ShuffledDStream.scala:42",* there are sometimes 2 executors >>>>>>>>>>>>> allocated for this stage. However, during other batches, the >>>>>>>>>>>>> executors can >>>>>>>>>>>>> be several hundred for the same stage, which means the number of >>>>>>>>>>>>> executors >>>>>>>>>>>>> for the same operations change. >>>>>>>>>>>>> >>>>>>>>>>>>> Does anyone know how Spark allocate the number of executors >>>>>>>>>>>>> for different stages and how to increase the efficiency for task? >>>>>>>>>>>>> Thanks! >>>>>>>>>>>>> >>>>>>>>>>>>> Bill >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >