You have to call the checkpoint regularly on rdd0 to cut the dependency
chain, otherwise you will meet such problem as you mentioned, even stack
overflow finally. This is a classic problem for high iterative job, you
could google it for the fix solution.

On Tue, Oct 13, 2015 at 7:09 PM, 张仪yf1 <zhangyi...@hikvision.com> wrote:

>
>
>
>
>
>
> Hi,there
>
>          I problem an issue when using the zippartition, first I created a
> rdd from a seq,then created another one,and zippartitioned them with rdd3,
> then cached the rdd3,then created a new rdd ,and zippartitioned it with
> rdd3.I repeat this operation many times, and I found that,
>
> The task serialized became bigger and bigger, the serialize time cost
> became bigger too.Does anyone else encounter the same problem ,please help.
>
> Code:
>
> *def* main(args: Array[String]) {
>   *var *i: Int = 0
>   *var *seq = *Seq*[String]()
>   *while *(i < 720) {
>     {
>       seq = seq.+:(
> *"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"*
> )
>     }
>     i = i + 1
>   }
>   *val *sc = *new *SparkContext(*"spark://hdh010016146205:7077"*,
> *"ScalaTest"*)
>   *var *rdd0: RDD[String] =
> *null   var *j = 0;
>   *while *(j < 200) {
>     j = j + 1
>     *val *rdd1 = sc.parallelize(seq, 72)
>     *if *(rdd0 == *null*) {
>       rdd0 = rdd1
>       rdd0.cache()
>       rdd0.count()
>     } *else *{
>       *val *rdd2 = *new *BillZippedPartitionsRDD2[String, String,
> String](sc, { (thisIter, otherIter) =>
>
> *//        val rdd2 = rdd0.zipPartitions(rdd1, true)({ (thisIter,
> otherIter) =>         **new *Iterator[String] {
>           *def *hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) 
> *match
> *{
>             *case *(*true*, *true*) =>
> *true             case *(*false*, *false*) =>
> *false             case *_ => *throw new *SparkException(*"Can only zip
> RDDs with " *+
>               *"same number of elements in each partition"*)
>           }
>           *def *next(): String = (thisIter.next() + *"--" *+
> otherIter.next())
>         }
>       }, rdd0, rdd1, *false*)
>       rdd2.cache()
>       rdd2.count()
>       rdd0 = rdd2
>     }
>   }
> }
>
>
>

Reply via email to