2015-06-18 15:17 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>:

>  I was thinking exactly the same. I'm going to try it, It doesn't really
> matter if I lose an executor, since its sketch will be lost, but then
> reexecuted somewhere else.
>
>
I mean that between the action that will update the sketches and the action
to collect/merge them you can loose an executor. So outside of sparks
control. But it's probably an acceptable risk.


> And anyway, it's an approximate data structure, and what matters are
> ratios, not exact values.
>
> I mostly need to take care of the concurrency problem for my sketch.
>

I think you could do something like:
  - Have this singleton that holds N sketch instances (one for each
executor core)
  - Inside an operation over partitions (like
forEachPartition/mapPartitions)
    - at the begin you ask the singleton to provide you with one instance
to use, in a sync. fashion and pick it out from the N available instances
or mark it as in use
    - when the iterator over the partition doesn't have more elements then
you release this sketch
  - Then you can do something like
sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla),
but you will have to make sure that this will be executed over each
executor (not sure if a dataset < than executor num, will trigger an action
on every executor)

Please let me know what you end up doing, sounds interesting :)

Eugen


>
> Guillaume
>
>   Yeah thats the problem. There is probably some "perfect" num of
> partitions that provides the best balance between partition size and memory
> and merge overhead. Though it's not an ideal solution :(
>
>  There could be another way but very hacky... for example if you store one
> sketch in a singleton per jvm (so per executor). Do a first pass over your
> data and update those. Then you trigger some other dummy operation that
> will just retrieve the sketches.
> Thats kind of a hack but should work.
>
>  Note that if you loose an executor in between, then that doesn't work
> anymore, probably you could detect it and recompute the sketches, but it
> would become over complicated.
>
>
>
> 2015-06-18 14:27 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>:
>
>>  Hi,
>>
>> Thank you for this confirmation.
>>
>> Coalescing is what we do now. It creates, however, very big partitions.
>>
>> Guillaume
>>
>>   Hey,
>>
>> I am not 100% sure but from my understanding accumulators are per
>> partition (so per task as its the same) and are sent back to the driver
>> with the task result and merged. When a task needs to be run n times
>> (multiple rdds depend on this one, some partition loss later in the chain
>> etc) then the accumulator will count n times the values from that task.
>>  So in short I don't think you'd win from using an accumulator over what
>> you are doing right now.
>>
>>  You could maybe coalesce your rdd to num-executors without a shuffle and
>> then update the sketches. You should endup with 1 partition per executor
>> thus 1 sketch per executor. You could then increase the number of threads
>> per task if you can use the sketches concurrently.
>>
>>  Eugen
>>
>> 2015-06-18 13:36 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>:
>>
>>>  Hi,
>>>
>>> I'm trying to figure out the smartest way to implement a global
>>> count-min-sketch on accumulators. For now, we are doing that with RDDs. It
>>> works well, but with one sketch per partition, merging takes too long.
>>>
>>> As you probably know, a count-min sketch is a big mutable array of array
>>> of ints. To distribute it, all sketches must have the same size. Since it
>>> can be big, and since merging is not free, I would like to minimize the
>>> number of sketches and maximize reuse and conccurent use of the sketches.
>>> Ideally, I would like to just have one sketch per worker.
>>>
>>> I think accumulables might be the right structures for that, but it
>>> seems that they are not shared between executors, or even between tasks.
>>>
>>>
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
>>> (289)
>>>   /**
>>>  * This thread-local map holds per-task copies of accumulators; it is
>>> used to collect the set
>>>  * of accumulator updates to send back to the driver when tasks
>>> complete. After tasks complete,
>>>  * this map is cleared by `Accumulators.clear()` (see Executor.scala).
>>>  */
>>>  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_,
>>> _]]]() {
>>>  override protected def initialValue() = Map[Long, Accumulable[_, _]]()
>>>  }
>>> The localAccums stores an accumulator for each task (it's thread local,
>>> so I assume each task have a unique thread on executors)
>>>
>>> If I understand correctly, each time a task starts, an accumulator is
>>> initialized locally, updated, then sent back to the driver for merging ?
>>>
>>> So I guess, accumulators may not be the way to go, finally.
>>>
>>> Any advice ?
>>> Guillaume
>>> --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)626 222 431
>>>
>>> eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>>
>>
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)626 222 431
>>
>> eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>>
>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>

Reply via email to