Aah, I get it now. That is because the input data streams is replicated on
two machines, so by locality the data is processed on those two machines.
So the "map" stage on the data uses 2 executors, but the "reduce" stage,
(after groupByKey) the saveAsTextFiles would use 300 tasks. And the default
parallelism takes into affect only when the data is explicitly shuffled
around.

You can fix this by explicitly repartitioning the data.

inputDStream.repartition(partitions)

This is covered in the streaming tuning guide
<http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
.

TD



On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bill.jaypeter...@gmail.com>
wrote:

> Hi folks,
>
> I just ran another job that only received data from Kafka, did some
> filtering, and then save as text files in HDFS. There was no reducing work
> involved. Surprisingly, the number of executors for the saveAsTextFiles
> stage was also 2 although I specified 300 executors in the job submission.
> As a result, the simple save file action took more than 2 minutes. Do you
> have any idea how Spark determined the number of executors
> for different stages?
>
> Thanks!
>
> Bill
>
>
> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> Below is my main function. I omit some filtering and data conversion
>> functions. These functions are just a one-to-one mapping, which may not
>> possible increase running time. The only reduce function I have here is
>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>> have 240k lines each minute. And the other two topics have less than 30k
>> lines per minute. The batch size is one minute and I specified 300
>> executors in my spark-submit script. The default parallelism is 300.
>>
>>
>>     val parition = 300
>>     val zkQuorum = "zk1,zk2,zk3"
>>     val group = "my-group-" + currentTime.toString
>>     val topics = "topic1,topic2,topic3,topic4"
>>     val numThreads = 4
>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>     ssc = new StreamingContext(conf, Seconds(batch))
>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>     val lines = lines1
>>     lines.cache()
>>     val jsonData = lines.map(JSON.parseFull(_))
>>     val mapData = jsonData.filter(_.isDefined)
>>
>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>     val validMapData = mapData.filter(isValidData(_))
>>     val fields = validMapData.map(data => (data("id").toString,
>> timestampToUTCUnix(data("time").toString),
>>
>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>                                            data("id3").toString,
>> data("log_type").toString, data("sub_log_type").toString))
>>     val timeDiff = 3600L
>>     val filteredFields = fields.filter(field => abs(field._2 - field._3)
>> <= timeDiff)
>>
>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>> fields._2, fields._4, fields._5, fields._7))
>>     val watchTimeTuples = watchTimeFields.map(fields =>
>> getWatchtimeTuple(fields))
>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>> fields._1)).groupByKey(partition)
>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>> value.toSet.size)}
>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>
>> I have been working on this for several days. No findings why there are
>> always 2 executors for the groupBy stage. Thanks a lot!
>>
>> Bill
>>
>>
>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Can you show us the program that you are running. If you are setting
>>> number of partitions in the XYZ-ByKey operation as 300, then there should
>>> be 300 tasks for that stage, distributed on the 50 executors are allocated
>>> to your context. However the data distribution may be skewed in which case,
>>> you can use a repartition operation to redistributed the data more evenly
>>> (both DStream and RDD have repartition).
>>>
>>> TD
>>>
>>>
>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <bill.jaypeter...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> I also tried to use the number of partitions as parameters to the
>>>> functions such as groupByKey. It seems the numbers of executors is around
>>>> 50 instead of 300, which is the number of the executors I specified in
>>>> submission script. Moreover, the running time of different executors is
>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>> executors evenly so that the computation can be efficiently finished. I am
>>>> not sure how to achieve this.
>>>>
>>>> Thanks!
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Can you try setting the number-of-partitions in all the shuffle-based
>>>>> DStream operations, explicitly. It may be the case that the default
>>>>> parallelism (that is, spark.default.parallelism) is probably not being
>>>>> respected.
>>>>>
>>>>> Regarding the unusual delay, I would look at the task details of that
>>>>> stage in the Spark web ui. It will show break of time for each task,
>>>>> including GC times, etc. That might give some indication.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Tathagata,
>>>>>>
>>>>>> I set default parallelism as 300 in my configuration file. Sometimes
>>>>>> there are more executors in a job. However, it is still slow. And I 
>>>>>> further
>>>>>> observed that most executors take less than 20 seconds but two of them 
>>>>>> take
>>>>>> much longer such as 2 minutes. The data size is very small (less than 
>>>>>> 480k
>>>>>> lines with only 4 fields). I am not sure why the group by operation takes
>>>>>> more then 3 minutes.  Thanks!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>
>>>>>>> Are you specifying the number of reducers in all the
>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, then the
>>>>>>> number of reducers used in the stages can keep changing across batches.
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <bill.jaypeter...@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I have a Spark streaming job running on yarn. It consume data from
>>>>>>>> Kafka and group the data by a certain field. The data size is 480k 
>>>>>>>> lines
>>>>>>>> per minute where the batch size is 1 minute.
>>>>>>>>
>>>>>>>> For some batches, the program sometimes take more than 3 minute to
>>>>>>>> finish the groupBy operation, which seems slow to me. I allocated 300
>>>>>>>> workers and specify 300 as the partition number for groupby. When I 
>>>>>>>> checked
>>>>>>>> the slow stage *"combineByKey at ShuffledDStream.scala:42",* there
>>>>>>>> are sometimes 2 executors allocated for this stage. However, during 
>>>>>>>> other
>>>>>>>> batches, the executors can be several hundred for the same stage, which
>>>>>>>> means the number of executors for the same operations change.
>>>>>>>>
>>>>>>>> Does anyone know how Spark allocate the number of executors for
>>>>>>>> different stages and how to increase the efficiency for task? Thanks!
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to