You have to call the checkpoint regularly on rdd0 to cut the dependency chain, otherwise you will meet such problem as you mentioned, even stack overflow finally. This is a classic problem for high iterative job, you could google it for the fix solution.
On Tue, Oct 13, 2015 at 7:09 PM, 张仪yf1 <zhangyi...@hikvision.com> wrote: > > > > > > > Hi,there > > I problem an issue when using the zippartition, first I created a > rdd from a seq,then created another one,and zippartitioned them with rdd3, > then cached the rdd3,then created a new rdd ,and zippartitioned it with > rdd3.I repeat this operation many times, and I found that, > > The task serialized became bigger and bigger, the serialize time cost > became bigger too.Does anyone else encounter the same problem ,please help. > > Code: > > *def* main(args: Array[String]) { > *var *i: Int = 0 > *var *seq = *Seq*[String]() > *while *(i < 720) { > { > seq = seq.+:( > *"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"* > ) > } > i = i + 1 > } > *val *sc = *new *SparkContext(*"spark://hdh010016146205:7077"*, > *"ScalaTest"*) > *var *rdd0: RDD[String] = > *null var *j = 0; > *while *(j < 200) { > j = j + 1 > *val *rdd1 = sc.parallelize(seq, 72) > *if *(rdd0 == *null*) { > rdd0 = rdd1 > rdd0.cache() > rdd0.count() > } *else *{ > *val *rdd2 = *new *BillZippedPartitionsRDD2[String, String, > String](sc, { (thisIter, otherIter) => > > *// val rdd2 = rdd0.zipPartitions(rdd1, true)({ (thisIter, > otherIter) => **new *Iterator[String] { > *def *hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) > *match > *{ > *case *(*true*, *true*) => > *true case *(*false*, *false*) => > *false case *_ => *throw new *SparkException(*"Can only zip > RDDs with " *+ > *"same number of elements in each partition"*) > } > *def *next(): String = (thisIter.next() + *"--" *+ > otherIter.next()) > } > }, rdd0, rdd1, *false*) > rdd2.cache() > rdd2.count() > rdd0 = rdd2 > } > } > } > > >