Hello ! I think I found a performant and nice solution based on take' source code :
def exists[T](rdd: RDD[T])(qualif: T => Boolean, num: Int): Boolean = { if (num == 0) { true } else { var count: Int = 0 val totalParts: Int = rdd.partitions.length var partsScanned: Int = 0 while (count < num && partsScanned < totalParts) { var numPartsToTry: Int = 1 if (partsScanned > 0) { if (count == 0) { numPartsToTry = partsScanned * 4 } else { numPartsToTry = Math.max((1.5 * num * partsScanned / count).toInt - partsScanned, 1) numPartsToTry = Math.min(numPartsToTry, partsScanned * 4) } } val left: Int = num - count val p: Range = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val res: Array[Int] = rdd.sparkContext.runJob(rdd, (it: Iterator[T]) => it.filter(qualif).take(left).size, p, allowLocal = true) count = count + res.sum partsScanned += numPartsToTry } count >= num } } //val all:RDD[Any] println(exists(all)(_ => {println(".") ; true}, 10)) It's super fast for small values of n and I think it parallelise nicely for large values. Please tell me what you think. Have a nice day, Jonathan On 5 August 2015 at 19:18, Jonathan Winandy <jonathan.wina...@gmail.com> wrote: > Hello ! > > You could try something like that : > > def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = { > > val context: SparkContext = rdd.sparkContext > val grp: String = Random.alphanumeric.take(10).mkString > context.setJobGroup(grp, "exist") > val count: Accumulator[Long] = context.accumulator(0L) > > val iteratorToInt: (Iterator[T]) => Int = { > iterator => > val i: Int = iterator.count(f) > count += i > i > } > > val t = new Thread { > override def run { > while (count.value < n) {} > context.cancelJobGroup(grp) > } > } > t.start() > try { > context.runJob(rdd, iteratorToInt) > n > } catch { > case e:SparkException => { > count.value > n > } > } finally { > t.stop() > } > > } > > > > It stops the computation if enough elements satisfying the condition are > witnessed. > > It is performant if the RDD is well partitioned. If this is a problem, you > could change iteratorToInt to : > > val iteratorToInt: (Iterator[T]) => Int = { > iterator => > val i: Int = iterator.count(x => { > if(f(x)) { > count += 1 > true > } else false > }) > i > > } > > > I am interested in a safer way to perform partial computation in spark. > > Cheers, > Jonathan > > On 5 August 2015 at 18:54, Feynman Liang <fli...@databricks.com> wrote: > >> qualifying_function() will be executed on each partition in parallel; >> stopping all parallel execution after the first instance satisfying >> qualifying_function() would mean that you would have to effectively make >> the computation sequential. >> >> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <sand...@knowbigdata.com> >> wrote: >> >>> Okay. I think I got it now. Yes take() does not need to be called more >>> than once. I got the impression that we wanted to bring elements to the >>> driver node and then run out qualifying_function on driver_node. >>> >>> Now, I am back to my question which I started with: Could there be an >>> approach where the qualifying_function() does not get called after an >>> element has been found? >>> >>> >>> Regards, >>> Sandeep Giri, >>> +1 347 781 4573 (US) >>> +91-953-899-8962 (IN) >>> >>> www.KnowBigData.com. <http://KnowBigData.com.> >>> Phone: +1-253-397-1945 (Office) >>> >>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image: >>> other site icon] <http://knowbigdata.com> [image: facebook icon] >>> <https://facebook.com/knowbigdata> [image: twitter icon] >>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData> >>> >>> >>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <so...@cloudera.com> wrote: >>> >>>> take only brings n elements to the driver, which is probably still a >>>> win if n is small. I'm not sure what you mean by only taking a count >>>> argument -- what else would be an arg to take? >>>> >>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <sand...@knowbigdata.com> >>>> wrote: >>>> >>>>> Yes, but in the take() approach we will be bringing the data to the >>>>> driver and is no longer distributed. >>>>> >>>>> Also, the take() takes only count as argument which means that every >>>>> time we would transferring the redundant elements. >>>>> >>>>> >>> >> >