Hi Tathagata,

I have tried the repartition method. The reduce stage first had 2 executors
and then it had around 85 executors. I specified repartition(300) and each
of the executors were specified 2 cores when I submitted the job. This
shows repartition works to increase more executors. However, the running
time was still around 50 seconds although I only did a simple groupby
operation. I think repartition may consume part of the running time.
Considering the input source of Kafka, is there a way to make the program
even faster? Thanks!


On Mon, Jul 14, 2014 at 3:22 PM, Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Can you give me a screen shot of the stages page in the web ui, the spark
> logs, and the code that is causing this behavior. This seems quite weird to
> me.
>
> TD
>
>
> On Mon, Jul 14, 2014 at 2:11 PM, Bill Jay <bill.jaypeter...@gmail.com>
> wrote:
>
>> Hi Tathagata,
>>
>> It seems repartition does not necessarily force Spark to distribute the
>> data into different executors. I have launched a new job which uses
>> repartition right after I received data from Kafka. For the first two
>> batches, the reduce stage used more than 80 executors. Starting from the
>> third batch, there were always only 2 executors in the reduce task
>> (combineByKey). Even with the first batch which used more than 80
>> executors, it took 2.4 mins to finish the reduce stage for a very small
>> amount of data.
>>
>> Bill
>>
>>
>> On Mon, Jul 14, 2014 at 12:30 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> After using repartition(300), how many executors did it run on? By the
>>> way, repartitions(300) means it will divide the shuffled data into 300
>>> partitions. Since there are many cores on each of the 300
>>> machines/executors, these partitions (each requiring a core) may not be
>>> spread all 300 executors. Hence, if you really want spread it all 300
>>> executors, you may have to bump up the partitions even more. However,
>>> increasing the partitions to too high may not be beneficial, and you will
>>> have play around with the number to figure out sweet spot that reduces the
>>> time to process the stage / time to process the whole batch.
>>>
>>> TD
>>>
>>>
>>> On Fri, Jul 11, 2014 at 8:32 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tathagata,
>>>>
>>>> Do you mean that the data is not shuffled until the reduce stage? That
>>>> means groupBy still only uses 2 machines?
>>>>
>>>> I think I used repartition(300) after I read the data from Kafka into
>>>> DStream. It seems that it did not guarantee that the map or reduce stages
>>>> will be run on 300 machines. I am currently trying to initiate 100 DStream
>>>> from KafkaUtils.createDStream and union them. Now the reduce stages had
>>>> around 80 machines for all the batches. However, this method will introduce
>>>> many dstreams. It will be good if we can control the number of executors in
>>>> the groupBy operation because the calculation needs to be finished within 1
>>>> minute for different size of input data based on our production need.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> Bill
>>>>
>>>>
>>>> On Fri, Jul 11, 2014 at 7:29 PM, Tathagata Das <
>>>> tathagata.das1...@gmail.com> wrote:
>>>>
>>>>> Aah, I get it now. That is because the input data streams is
>>>>> replicated on two machines, so by locality the data is processed on those
>>>>> two machines. So the "map" stage on the data uses 2 executors, but the
>>>>> "reduce" stage, (after groupByKey) the saveAsTextFiles would use 300 
>>>>> tasks.
>>>>> And the default parallelism takes into affect only when the data is
>>>>> explicitly shuffled around.
>>>>>
>>>>> You can fix this by explicitly repartitioning the data.
>>>>>
>>>>> inputDStream.repartition(partitions)
>>>>>
>>>>> This is covered in the streaming tuning guide
>>>>> <http://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving>
>>>>> .
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jul 11, 2014 at 4:11 PM, Bill Jay <bill.jaypeter...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi folks,
>>>>>>
>>>>>> I just ran another job that only received data from Kafka, did some
>>>>>> filtering, and then save as text files in HDFS. There was no reducing 
>>>>>> work
>>>>>> involved. Surprisingly, the number of executors for the saveAsTextFiles
>>>>>> stage was also 2 although I specified 300 executors in the job 
>>>>>> submission.
>>>>>> As a result, the simple save file action took more than 2 minutes. Do you
>>>>>> have any idea how Spark determined the number of executors
>>>>>> for different stages?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> Bill
>>>>>>
>>>>>>
>>>>>> On Fri, Jul 11, 2014 at 2:01 PM, Bill Jay <bill.jaypeter...@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi Tathagata,
>>>>>>>
>>>>>>> Below is my main function. I omit some filtering and data conversion
>>>>>>> functions. These functions are just a one-to-one mapping, which may not
>>>>>>> possible increase running time. The only reduce function I have here is
>>>>>>> groupByKey. There are 4 topics in my Kafka brokers and two of the topics
>>>>>>> have 240k lines each minute. And the other two topics have less than 30k
>>>>>>> lines per minute. The batch size is one minute and I specified 300
>>>>>>> executors in my spark-submit script. The default parallelism is 300.
>>>>>>>
>>>>>>>
>>>>>>>     val parition = 300
>>>>>>>     val zkQuorum = "zk1,zk2,zk3"
>>>>>>>     val group = "my-group-" + currentTime.toString
>>>>>>>     val topics = "topic1,topic2,topic3,topic4"
>>>>>>>     val numThreads = 4
>>>>>>>     val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
>>>>>>>     ssc = new StreamingContext(conf, Seconds(batch))
>>>>>>>     ssc.checkpoint(hadoopOutput + "checkpoint")
>>>>>>>     val lines = lines1
>>>>>>>     lines.cache()
>>>>>>>     val jsonData = lines.map(JSON.parseFull(_))
>>>>>>>     val mapData = jsonData.filter(_.isDefined)
>>>>>>>
>>>>>>> .map(_.get.asInstanceOf[scala.collection.immutable.Map[String, Any]])
>>>>>>>     val validMapData = mapData.filter(isValidData(_))
>>>>>>>     val fields = validMapData.map(data => (data("id").toString,
>>>>>>> timestampToUTCUnix(data("time").toString),
>>>>>>>
>>>>>>>  timestampToUTCUnix(data("local_time").toString), data("id2").toString,
>>>>>>>                                            data("id3").toString,
>>>>>>> data("log_type").toString, data("sub_log_type").toString))
>>>>>>>     val timeDiff = 3600L
>>>>>>>     val filteredFields = fields.filter(field => abs(field._2 -
>>>>>>> field._3) <= timeDiff)
>>>>>>>
>>>>>>>     val watchTimeFields = filteredFields.map(fields => (fields._1,
>>>>>>> fields._2, fields._4, fields._5, fields._7))
>>>>>>>     val watchTimeTuples = watchTimeFields.map(fields =>
>>>>>>> getWatchtimeTuple(fields))
>>>>>>>     val programDuids = watchTimeTuples.map(fields => (fields._3,
>>>>>>> fields._1)).groupByKey(partition)
>>>>>>>     val programDuidNum = programDuids.map{case(key, value) => (key,
>>>>>>> value.toSet.size)}
>>>>>>>     programDuidNum.saveAsTextFiles(hadoopOutput+"result")
>>>>>>>
>>>>>>> I have been working on this for several days. No findings why there
>>>>>>> are always 2 executors for the groupBy stage. Thanks a lot!
>>>>>>>
>>>>>>> Bill
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Jul 11, 2014 at 1:50 PM, Tathagata Das <
>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Can you show us the program that you are running. If you are
>>>>>>>> setting number of partitions in the XYZ-ByKey operation as 300, then 
>>>>>>>> there
>>>>>>>> should be 300 tasks for that stage, distributed on the 50 executors are
>>>>>>>> allocated to your context. However the data distribution may be skewed 
>>>>>>>> in
>>>>>>>> which case, you can use a repartition operation to redistributed the 
>>>>>>>> data
>>>>>>>> more evenly (both DStream and RDD have repartition).
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jul 11, 2014 at 12:22 AM, Bill Jay <
>>>>>>>> bill.jaypeter...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Tathagata,
>>>>>>>>>
>>>>>>>>> I also tried to use the number of partitions as parameters to the
>>>>>>>>> functions such as groupByKey. It seems the numbers of executors is 
>>>>>>>>> around
>>>>>>>>> 50 instead of 300, which is the number of the executors I specified in
>>>>>>>>> submission script. Moreover, the running time of different executors 
>>>>>>>>> is
>>>>>>>>> skewed. The ideal case is that Spark can distribute the data into 300
>>>>>>>>> executors evenly so that the computation can be efficiently finished. 
>>>>>>>>> I am
>>>>>>>>> not sure how to achieve this.
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 10, 2014 at 5:59 PM, Tathagata Das <
>>>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Can you try setting the number-of-partitions in all the
>>>>>>>>>> shuffle-based DStream operations, explicitly. It may be the case 
>>>>>>>>>> that the
>>>>>>>>>> default parallelism (that is, spark.default.parallelism) is probably 
>>>>>>>>>> not
>>>>>>>>>> being respected.
>>>>>>>>>>
>>>>>>>>>> Regarding the unusual delay, I would look at the task details of
>>>>>>>>>> that stage in the Spark web ui. It will show break of time for each 
>>>>>>>>>> task,
>>>>>>>>>> including GC times, etc. That might give some indication.
>>>>>>>>>>
>>>>>>>>>> TD
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay <
>>>>>>>>>> bill.jaypeter...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Tathagata,
>>>>>>>>>>>
>>>>>>>>>>> I set default parallelism as 300 in my configuration file.
>>>>>>>>>>> Sometimes there are more executors in a job. However, it is still 
>>>>>>>>>>> slow. And
>>>>>>>>>>> I further observed that most executors take less than 20 seconds 
>>>>>>>>>>> but two of
>>>>>>>>>>> them take much longer such as 2 minutes. The data size is very 
>>>>>>>>>>> small (less
>>>>>>>>>>> than 480k lines with only 4 fields). I am not sure why the group by
>>>>>>>>>>> operation takes more then 3 minutes.  Thanks!
>>>>>>>>>>>
>>>>>>>>>>> Bill
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das <
>>>>>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you specifying the number of reducers in all the
>>>>>>>>>>>> DStream.****ByKey operations? If the reduce by key is not set, 
>>>>>>>>>>>> then the
>>>>>>>>>>>> number of reducers used in the stages can keep changing across 
>>>>>>>>>>>> batches.
>>>>>>>>>>>>
>>>>>>>>>>>> TD
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay <
>>>>>>>>>>>> bill.jaypeter...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a Spark streaming job running on yarn. It consume data
>>>>>>>>>>>>> from Kafka and group the data by a certain field. The data size 
>>>>>>>>>>>>> is 480k
>>>>>>>>>>>>> lines per minute where the batch size is 1 minute.
>>>>>>>>>>>>>
>>>>>>>>>>>>> For some batches, the program sometimes take more than 3
>>>>>>>>>>>>> minute to finish the groupBy operation, which seems slow to me. I 
>>>>>>>>>>>>> allocated
>>>>>>>>>>>>> 300 workers and specify 300 as the partition number for groupby. 
>>>>>>>>>>>>> When I
>>>>>>>>>>>>> checked the slow stage *"combineByKey at
>>>>>>>>>>>>> ShuffledDStream.scala:42",* there are sometimes 2 executors
>>>>>>>>>>>>> allocated for this stage. However, during other batches, the 
>>>>>>>>>>>>> executors can
>>>>>>>>>>>>> be several hundred for the same stage, which means the number of 
>>>>>>>>>>>>> executors
>>>>>>>>>>>>> for the same operations change.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does anyone know how Spark allocate the number of executors
>>>>>>>>>>>>> for different stages and how to increase the efficiency for task? 
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to