Hello !

I think I found a performant and nice solution based on take' source code :

def exists[T](rdd: RDD[T])(qualif: T => Boolean, num: Int): Boolean = {
  if (num == 0) {
    true
  } else {
    var count: Int = 0
    val totalParts: Int = rdd.partitions.length
    var partsScanned: Int = 0
    while (count < num && partsScanned < totalParts) {
      var numPartsToTry: Int = 1
      if (partsScanned > 0) {
        if (count == 0) {
          numPartsToTry = partsScanned * 4
        } else {
          numPartsToTry = Math.max((1.5 * num * partsScanned /
count).toInt - partsScanned, 1)
          numPartsToTry = Math.min(numPartsToTry, partsScanned * 4)
        }
      }

      val left: Int = num - count
      val p: Range = partsScanned until math.min(partsScanned +
numPartsToTry, totalParts)
      val res: Array[Int] = rdd.sparkContext.runJob(rdd, (it:
Iterator[T]) => it.filter(qualif).take(left).size, p, allowLocal =
true)

      count = count + res.sum
      partsScanned += numPartsToTry
    }

    count >= num
  }
}

//val all:RDD[Any]
println(exists(all)(_ => {println(".") ; true}, 10))

It's super fast for small values of n and I think it parallelise
nicely for large values.

Please tell me what you think.


Have a nice day,

Jonathan





On 5 August 2015 at 19:18, Jonathan Winandy <jonathan.wina...@gmail.com>
wrote:

> Hello !
>
> You could try something like that :
>
> def exists[T](rdd:RDD[T])(f:T=>Boolean, n:Long):Boolean = {
>
>   val context: SparkContext = rdd.sparkContext
>   val grp: String = Random.alphanumeric.take(10).mkString
>   context.setJobGroup(grp, "exist")
>   val count: Accumulator[Long] = context.accumulator(0L)
>
>   val iteratorToInt: (Iterator[T]) => Int = {
>     iterator =>
>       val i: Int = iterator.count(f)
>       count += i
>       i
>   }
>
>   val t = new Thread {
>     override def run {
>       while (count.value < n) {}
>       context.cancelJobGroup(grp)
>     }
>   }
>   t.start()
>   try {
>     context.runJob(rdd, iteratorToInt) > n
>   } catch  {
>     case e:SparkException => {
>       count.value > n
>     }
>   } finally {
>     t.stop()
>   }
>
> }
>
>
>
> It stops the computation if enough elements satisfying the condition are
> witnessed.
>
> It is performant if the RDD is well partitioned. If this is a problem, you
> could change iteratorToInt to :
>
> val iteratorToInt: (Iterator[T]) => Int = {
>   iterator =>
>     val i: Int = iterator.count(x => {
>       if(f(x)) {
>         count += 1
>         true
>       } else false
>     })
>     i
>
> }
>
>
> I am interested in a safer way to perform partial computation in spark.
>
> Cheers,
> Jonathan
>
> On 5 August 2015 at 18:54, Feynman Liang <fli...@databricks.com> wrote:
>
>> qualifying_function() will be executed on each partition in parallel;
>> stopping all parallel execution after the first instance satisfying
>> qualifying_function() would mean that you would have to effectively make
>> the computation sequential.
>>
>> On Wed, Aug 5, 2015 at 9:05 AM, Sandeep Giri <sand...@knowbigdata.com>
>> wrote:
>>
>>> Okay. I think I got it now. Yes take() does not need to be called more
>>> than once. I got the impression that we wanted to bring elements to the
>>> driver node and then run out qualifying_function on driver_node.
>>>
>>> Now, I am back to my question which I started with: Could there be an
>>> approach where the qualifying_function() does not get called after an
>>> element has been found?
>>>
>>>
>>> Regards,
>>> Sandeep Giri,
>>> +1 347 781 4573 (US)
>>> +91-953-899-8962 (IN)
>>>
>>> www.KnowBigData.com. <http://KnowBigData.com.>
>>> Phone: +1-253-397-1945 (Office)
>>>
>>> [image: linkedin icon] <https://linkedin.com/company/knowbigdata> [image:
>>> other site icon] <http://knowbigdata.com>  [image: facebook icon]
>>> <https://facebook.com/knowbigdata> [image: twitter icon]
>>> <https://twitter.com/IKnowBigData> <https://twitter.com/IKnowBigData>
>>>
>>>
>>> On Wed, Aug 5, 2015 at 9:21 PM, Sean Owen <so...@cloudera.com> wrote:
>>>
>>>> take only brings n elements to the driver, which is probably still a
>>>> win if n is small. I'm not sure what you mean by only taking a count
>>>> argument -- what else would be an arg to take?
>>>>
>>>> On Wed, Aug 5, 2015 at 4:49 PM, Sandeep Giri <sand...@knowbigdata.com>
>>>> wrote:
>>>>
>>>>> Yes, but in the take() approach we will be bringing the data to the
>>>>> driver and is no longer distributed.
>>>>>
>>>>> Also, the take() takes only count as argument which means that every
>>>>> time we would transferring the redundant elements.
>>>>>
>>>>>
>>>
>>
>
  • [no subject] Sandeep Giri
    • Re: Sean Owen
      • Re: Sandeep Giri
        • Re: Feynman Liang
          • Re: Jonathan Winandy
            • Re: Jonathan Winandy
              • Re: Sandeep Giri

Reply via email to