Any advise on how to tune the repartitionAndSortWithinPartitions stage?
Any particular metrics or parameter to look into? Basically Spark and MR
shuffles the same amount of data, cause we kinda copied MR implementation
into Spark.

Let us know if more info is needed.

On Fri, Oct 23, 2015 at 10:24 AM, 周千昊 <qhz...@apache.org> wrote:

> +kylin dev list
>
> 周千昊 <qhz...@apache.org>于2015年10月23日周五 上午10:20写道:
>
> > Hi, Reynold
> >       Using glom() is because it is easy to adapt to calculation logic
> > already implemented in MR. And o be clear, we are still in POC.
> >       Since the results shows there is almost no difference between this
> > glom stage and the MR mapper, using glom here might not be the issue.
> >       I was trying to monitor the network traffic when repartition
> > happens, and it showed that the traffic peek is about 200 - 300MB/s while
> > it stayed at speed of about 3-4MB/s for a long time. Have you guys got
> any
> > idea about it?
> >
> > Reynold Xin <r...@databricks.com>于2015年10月23日周五 上午2:43写道:
> >
> >> Why do you do a glom? It seems unnecessarily expensive to materialize
> >> each partition in memory.
> >>
> >>
> >> On Thu, Oct 22, 2015 at 2:02 AM, 周千昊 <qhz...@apache.org> wrote:
> >>
> >>> Hi, spark community
> >>>       I have an application which I try to migrate from MR to Spark.
> >>>       It will do some calculations from Hive and output to hfile which
> >>> will be bulk load to HBase Table, details as follow:
> >>>
> >>>      Rdd<Element> input = getSourceInputFromHive()
> >>>      Rdd<Tuple2<byte[], byte[]>> mapSideResult =
> >>> input.glom().mapPartitions(/*some calculation, equivalent to MR mapper
> >>> */)
> >>>      // PS: the result in each partition has already been sorted
> >>> according to the lexicographical order during the calculation
> >>>      mapSideResult.repartitionAndSortWithPartitions(/*partition with
> >>> byte[][] which is HTable split key, equivalent to MR shuffle
> */).map(/*transform
> >>> Tuple2<byte[], byte[]> to Tuple2<ImmutableBytesWritable,
> KeyValue>/*equivalent
> >>> to MR reducer without output*/).saveAsNewAPIHadoopFile(/*write to
> >>> hfile*/)
> >>>
> >>>       This all works fine on a small dataset, and spark outruns MR by
> >>> about 10%. However when I apply it on a dataset of 150 million
> records, MR
> >>> is about 100% faster than spark.(*MR 25min spark 50min*)
> >>>        After exploring into the application UI, it shows that in the
> >>> repartitionAndSortWithinPartitions stage is very slow, and in the
> shuffle
> >>> phase a 6GB size shuffle cost about 18min which is quite unreasonable
> >>>        *Can anyone help with this issue and give me some advice on
> >>> this? **It’s not iterative processing, however I believe Spark could be
> >>> the same fast at minimal.*
> >>>
> >>>       Here are the cluster info:
> >>>           vm: 8 nodes * (128G mem + 64 core)
> >>>           hadoop cluster: hdp 2.2.6
> >>>           spark running mode: yarn-client
> >>>           spark version: 1.5.1
> >>>
> >>>
> >>
>

Reply via email to