Hi,

So I've done this "Node-centered accumulator", I've written a small piece about it : http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/

Hope it can help someone

Guillaume


2015-06-18 15:17 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com <mailto:guillaume.pi...@exensa.com>>:

    I was thinking exactly the same. I'm going to try it, It doesn't
    really matter if I lose an executor, since its sketch will be
    lost, but then reexecuted somewhere else.


I mean that between the action that will update the sketches and the action to collect/merge them you can loose an executor. So outside of sparks control. But it's probably an acceptable risk.

    And anyway, it's an approximate data structure, and what matters
    are ratios, not exact values.

    I mostly need to take care of the concurrency problem for my sketch.


I think you could do something like:
- Have this singleton that holds N sketch instances (one for each executor core) - Inside an operation over partitions (like forEachPartition/mapPartitions) - at the begin you ask the singleton to provide you with one instance to use, in a sync. fashion and pick it out from the N available instances or mark it as in use - when the iterator over the partition doesn't have more elements then you release this sketch - Then you can do something like sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), but you will have to make sure that this will be executed over each executor (not sure if a dataset < than executor num, will trigger an action on every executor)

Please let me know what you end up doing, sounds interesting :)

Eugen


    Guillaume
    Yeah thats the problem. There is probably some "perfect" num of
    partitions that provides the best balance between partition size
    and memory and merge overhead. Though it's not an ideal solution :(

    There could be another way but very hacky... for example if you
    store one sketch in a singleton per jvm (so per executor). Do a
    first pass over your data and update those. Then you trigger some
    other dummy operation that will just retrieve the sketches.
    Thats kind of a hack but should work.

    Note that if you loose an executor in between, then that doesn't
    work anymore, probably you could detect it and recompute the
    sketches, but it would become over complicated.



    2015-06-18 14:27 GMT+02:00 Guillaume Pitel
    <guillaume.pi...@exensa.com <mailto:guillaume.pi...@exensa.com>>:

        Hi,

        Thank you for this confirmation.

        Coalescing is what we do now. It creates, however, very big
        partitions.

        Guillaume
        Hey,

        I am not 100% sure but from my understanding accumulators
        are per partition (so per task as its the same) and are sent
        back to the driver with the task result and merged. When a
        task needs to be run n times (multiple rdds depend on this
        one, some partition loss later in the chain etc) then the
        accumulator will count n times the values from that task.
        So in short I don't think you'd win from using an
        accumulator over what you are doing right now.

        You could maybe coalesce your rdd to num-executors without a
        shuffle and then update the sketches. You should endup with
        1 partition per executor thus 1 sketch per executor. You
        could then increase the number of threads per task if you
        can use the sketches concurrently.

        Eugen

        2015-06-18 13:36 GMT+02:00 Guillaume Pitel
        <guillaume.pi...@exensa.com
        <mailto:guillaume.pi...@exensa.com>>:

            Hi,

            I'm trying to figure out the smartest way to implement a
            global count-min-sketch on accumulators. For now, we are
            doing that with RDDs. It works well, but with one sketch
            per partition, merging takes too long.

            As you probably know, a count-min sketch is a big
            mutable array of array of ints. To distribute it, all
            sketches must have the same size. Since it can be big,
            and since merging is not free, I would like to minimize
            the number of sketches and maximize reuse and conccurent
            use of the sketches. Ideally, I would like to just have
            one sketch per worker.

            I think accumulables might be the right structures for
            that, but it seems that they are not shared between
            executors, or even between tasks.

            
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
            (289)
            /**

                * This thread-local map holds per-task copies of
            accumulators; it is used to collect the set

                * of accumulator updates to send back to the driver
            when tasks complete. After tasks complete,

                * this map is cleared by `Accumulators.clear()` (see
            Executor.scala).

                */

                private val localAccums = new ThreadLocal[Map[Long,
            Accumulable[_, _]]]() {

                override protected def initialValue() = Map[Long,
            Accumulable[_, _]]()

                }


            The localAccums stores an accumulator for each task
            (it's thread local, so I assume each task have a unique
            thread on executors)

            If I understand correctly, each time a task starts, an
            accumulator is initialized locally, updated, then sent
            back to the driver for merging ?

            So I guess, accumulators may not be the way to go, finally.

            Any advice ?
            Guillaume
-- eXenSa

                
            *Guillaume PITEL, Président*
            +33(0)626 222 431

            eXenSa S.A.S. <http://www.exensa.com/>
            41, rue Périer - 92120 Montrouge - FRANCE
            Tel +33(0)184 163 677 / Fax +33(0)972 283 705




-- eXenSa

                
        *Guillaume PITEL, Président*
        +33(0)626 222 431

        eXenSa S.A.S. <http://www.exensa.com/>
        41, rue Périer - 92120 Montrouge - FRANCE
        Tel +33(0)184 163 677 / Fax +33(0)972 283 705




-- eXenSa

        
    *Guillaume PITEL, Président*
    +33(0)626 222 431

    eXenSa S.A.S. <http://www.exensa.com/>
    41, rue Périer - 92120 Montrouge - FRANCE
    Tel +33(0)184 163 677 / Fax +33(0)972 283 705




--
eXenSa

        
*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. <http://www.exensa.com/>
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705

Reply via email to