Hi Tathagata,

It seems repartition does not necessarily force Spark to distribute the
data into different executors. I have launched a new job which uses
repartition right after I received data from Kafka. For the first two
batches, the reduce stage used more than 80 executors. Starting from the
third batch, there were always only 2 executors in the reduce task
(combineByKey). Even with the first batch which used more than 80
executors, it took 2.4 mins to finish the reduce stage for a very small
amount of data.


On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <tathagata.das1...@gmail.com
> wrote:

> After using repartition(300), how many executors did it run on? By the
> way, repartitions(300) means it will divide the shuffled data into 300
> partitions. Since there are many cores on each of the 300
> machines/executors, these partitions (each requiring a core) may not be
> spread all 300 executors. Hence, if you really want spread it all 300
> executors, you may have to bump up the partitions even more. However,
> increasing the partitions to too high may not be beneficial, and you will
> have play around with the number to figure out sweet spot that reduces the
> time to process the stage / time to process the whole batch.
> TD
> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>> Hi Tathagata,
>> Do you mean that the data is not shuffled until the reduce stage? That
>> means groupBy still only uses 2 machines?
>> I think I used repartition(300) after I read the data from Kafka into
>> DStream. It seems that it did not guarantee that the map or reduce stages
>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>> around 80 machines for all the batches. However, this method will introduce
>> many dstreams. It will be good if we can control the number of executors in
>> the groupBy operation because the calculation needs to be finished within 1
>> minute for different size of input data based on our production need.
>> Thanks!
>> Bill
>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>> Aah, I get it now. That is because the input data streams is replicated
>>> on two machines, so by locality the data is processed on those two
>>> machines. So the "map" stage on the data uses 2 executors, but the "reduce"
>>> stage, (after groupByKey) the saveAsTextFiles would use 300 tasks. And the
>>> default parallelism takes into affect only when the data is explicitly
>>> shuffled around.
>>> You can fix this by explicitly repartitioning the data.
>>> inputDStream.repartition(partitions)
>>> This is covered in the streaming tuning guide
>>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
>>> .
>>> TD
>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>> wrote:
>>>> Hi folks,
>>>> I just ran another job that only received data from Kafka, did some
>>>> filtering, and then save as text files in HDFS. There was no reducing work
>>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>>> stage was also 2 although I specified 300 executors in the job submission.
>>>> As a result, the simple save file action took more than 2 minutes. Do you
>>>> have any idea how Spark determined the number of executors
>>>> for different stages?
>>>> Thanks!
>>>> Bill
>>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>>> wrote:
>>>>> Hi Tathagata,
>>>>> Below is my main function. I omit some filtering and data conversion
>>>>> functions. These functions are just a one-to-one mapping, which may not
>>>>> possible increase running time. The only reduce function I have here is
>>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>>>> have 240k lines each minute. And the other two topics have less than 30k
>>>>> lines per minute. The batch size is one minute and I specified 300
>>>>> executors in my spark-submit script. The default parallelism is 300.
>>>>>     val parition = 300
>>>>>     val zkQuorum = "zk1,zk2,zk3"
>>>>>     val group = "my-group-" + currentTime.toString
>>>>>     val topics = "topic1,topic2,topic3,topic4"
>>>>>     val numThreads = 4
>>>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>>>     val lines = lines1
>>>>>     lines.cache()
>>>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>>>     val mapData = jsonData.filter(_.isDefined)
>>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>>>     val validMapData = mapData.filter(isValidData(_))
>>>>>     val fields = validMapData.map(data => (data("id").toString,
>>>>> timestampToUTCUnix(data("time").toString),
>>>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>>>                                            data("id3").toString,
>>>>> data("log_type").toString, data("sub_log_type").toString))
>>>>>     val timeDiff = 3600L
>>>>>     val filteredFields = fields.filter(field => abs(field._2 -
>>>>> field._3) <= timeDiff)
>>>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>>>> fields._2, fields._4, fields._5, fields._7))
>>>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>>>> getWatchtimeTuple(fields))
>>>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>>>> fields._1)).groupByKey(partition)
>>>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>>>> value.toSet.size)}
>>>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>>> I have been working on this for several days. No findings why there
>>>>> are always 2 executors for the groupBy stage. Thanks a lot!
>>>>> Bill
>>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>> Can you show us the program that you are running. If you are setting
>>>>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>>>>> be 300 tasks for that stage, distributed on the 50 executors are 
>>>>>> allocated
>>>>>> to your context. However the data distribution may be skewed in which 
>>>>>> case,
>>>>>> you can use a repartition operation to redistributed the data more evenly
>>>>>> (both DStream and RDD have repartition).
>>>>>> TD
>>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <
>>>>>> bill.jaypeter...@gmail.com> wrote:
>>>>>>> Hi Tathagata,
>>>>>>> I also tried to use the number of partitions as parameters to the
>>>>>>> functions such as groupByKey. It seems the numbers of executors is 
>>>>>>> around
>>>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>>>> submission script. Moreover, the running time of different executors is
>>>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>>>> executors evenly so that the computation can be efficiently finished. I 
>>>>>>> am
>>>>>>> not sure how to achieve this.
>>>>>>> Thanks!
>>>>>>> Bill
>>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>> Can you try setting the number-of-partitions in all the
>>>>>>>> shuffle-based DStream operations, explicitly. It may be the case that 
>>>>>>>> the
>>>>>>>> default parallelism (that is, spark.default.parallelism) is probably 
>>>>>>>> not
>>>>>>>> being respected.
>>>>>>>> Regarding the unusual delay, I would look at the task details of
>>>>>>>> that stage in the Spark web ui. It will show break of time for each 
>>>>>>>> task,
>>>>>>>> including GC times, etc. That might give some indication.
>>>>>>>> TD
>>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <
>>>>>>>> bill.jaypeter...@gmail.com> wrote:
>>>>>>>>> Hi Tathagata,
>>>>>>>>> I set default parallelism as 300 in my configuration file.
>>>>>>>>> Sometimes there are more executors in a job. However, it is still 
>>>>>>>>> slow. And
>>>>>>>>> I further observed that most executors take less than 20 seconds but 
>>>>>>>>> two of
>>>>>>>>> them take much longer such as 2 minutes. The data size is very small 
>>>>>>>>> (less
>>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by
>>>>>>>>> operation takes more then 3 minutes.  Thanks!
>>>>>>>>> Bill
>>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then 
>>>>>>>>>> the
>>>>>>>>>> number of reducers used in the stages can keep changing across 
>>>>>>>>>> batches.
>>>>>>>>>> TD
>>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>>>> bill.jaypeter...@gmail.com> wrote:
>>>>>>>>>>> Hi all,
>>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data
>>>>>>>>>>> from Kafka and group the data by a certain field. The data size is 
>>>>>>>>>>> 480k
>>>>>>>>>>> lines per minute where the batch size is 1 minute.
>>>>>>>>>>> For some batches, the program sometimes take more than 3 minute
>>>>>>>>>>> to finish the groupBy operation, which seems slow to me. I 
>>>>>>>>>>> allocated 300
>>>>>>>>>>> workers and specify 300 as the partition number for groupby. When I 
>>>>>>>>>>> checked
>>>>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",*
>>>>>>>>>>> there are sometimes 2 executors allocated for this stage. However, 
>>>>>>>>>>> during
>>>>>>>>>>> other batches, the executors can be several hundred for the same 
>>>>>>>>>>> stage,
>>>>>>>>>>> which means the number of executors for the same operations change.
>>>>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>>>>> different stages and how to increase the efficiency for task? 
>>>>>>>>>>> Thanks!
>>>>>>>>>>> Bill

Reply via email to