2015-06-18 15:17 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>:
> I was thinking exactly the same. I'm going to try it, It doesn't really > matter if I lose an executor, since its sketch will be lost, but then > reexecuted somewhere else. > > I mean that between the action that will update the sketches and the action to collect/merge them you can loose an executor. So outside of sparks control. But it's probably an acceptable risk. > And anyway, it's an approximate data structure, and what matters are > ratios, not exact values. > > I mostly need to take care of the concurrency problem for my sketch. > I think you could do something like: - Have this singleton that holds N sketch instances (one for each executor core) - Inside an operation over partitions (like forEachPartition/mapPartitions) - at the begin you ask the singleton to provide you with one instance to use, in a sync. fashion and pick it out from the N available instances or mark it as in use - when the iterator over the partition doesn't have more elements then you release this sketch - Then you can do something like sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), but you will have to make sure that this will be executed over each executor (not sure if a dataset < than executor num, will trigger an action on every executor) Please let me know what you end up doing, sounds interesting :) Eugen > > Guillaume > > Yeah thats the problem. There is probably some "perfect" num of > partitions that provides the best balance between partition size and memory > and merge overhead. Though it's not an ideal solution :( > > There could be another way but very hacky... for example if you store one > sketch in a singleton per jvm (so per executor). Do a first pass over your > data and update those. Then you trigger some other dummy operation that > will just retrieve the sketches. > Thats kind of a hack but should work. > > Note that if you loose an executor in between, then that doesn't work > anymore, probably you could detect it and recompute the sketches, but it > would become over complicated. > > > > 2015-06-18 14:27 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>: > >> Hi, >> >> Thank you for this confirmation. >> >> Coalescing is what we do now. It creates, however, very big partitions. >> >> Guillaume >> >> Hey, >> >> I am not 100% sure but from my understanding accumulators are per >> partition (so per task as its the same) and are sent back to the driver >> with the task result and merged. When a task needs to be run n times >> (multiple rdds depend on this one, some partition loss later in the chain >> etc) then the accumulator will count n times the values from that task. >> So in short I don't think you'd win from using an accumulator over what >> you are doing right now. >> >> You could maybe coalesce your rdd to num-executors without a shuffle and >> then update the sketches. You should endup with 1 partition per executor >> thus 1 sketch per executor. You could then increase the number of threads >> per task if you can use the sketches concurrently. >> >> Eugen >> >> 2015-06-18 13:36 GMT+02:00 Guillaume Pitel <guillaume.pi...@exensa.com>: >> >>> Hi, >>> >>> I'm trying to figure out the smartest way to implement a global >>> count-min-sketch on accumulators. For now, we are doing that with RDDs. It >>> works well, but with one sketch per partition, merging takes too long. >>> >>> As you probably know, a count-min sketch is a big mutable array of array >>> of ints. To distribute it, all sketches must have the same size. Since it >>> can be big, and since merging is not free, I would like to minimize the >>> number of sketches and maximize reuse and conccurent use of the sketches. >>> Ideally, I would like to just have one sketch per worker. >>> >>> I think accumulables might be the right structures for that, but it >>> seems that they are not shared between executors, or even between tasks. >>> >>> >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala >>> (289) >>> /** >>> * This thread-local map holds per-task copies of accumulators; it is >>> used to collect the set >>> * of accumulator updates to send back to the driver when tasks >>> complete. After tasks complete, >>> * this map is cleared by `Accumulators.clear()` (see Executor.scala). >>> */ >>> private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, >>> _]]]() { >>> override protected def initialValue() = Map[Long, Accumulable[_, _]]() >>> } >>> The localAccums stores an accumulator for each task (it's thread local, >>> so I assume each task have a unique thread on executors) >>> >>> If I understand correctly, each time a task starts, an accumulator is >>> initialized locally, updated, then sent back to the driver for merging ? >>> >>> So I guess, accumulators may not be the way to go, finally. >>> >>> Any advice ? >>> Guillaume >>> -- >>> [image: eXenSa] >>> *Guillaume PITEL, Président* >>> +33(0)626 222 431 >>> >>> eXenSa S.A.S. <http://www.exensa.com/> >>> 41, rue Périer - 92120 Montrouge - FRANCE >>> Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >>> >> >> >> >> -- >> [image: eXenSa] >> *Guillaume PITEL, Président* >> +33(0)626 222 431 >> >> eXenSa S.A.S. <http://www.exensa.com/> >> 41, rue Périer - 92120 Montrouge - FRANCE >> Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >> > > > > -- > [image: eXenSa] > *Guillaume PITEL, Président* > +33(0)626 222 431 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >