Hi Alexander, Yes - the Spark ImperativeAggregate API has the traditional initialize/update/merge/eval methods, but the framework doesn't seem to make guarantees about whether serialization will occur between consecutive update calls or only between update and merge calls. In debugging my initial implementation, I saw a lot of serialization/deserialization occurring so wanted to investigate having the framework pass around a byte buffer (rather than serializing/deserializing on-heap HllSketch instances).
Regarding the Union question - here's the exception thrown in Union.writableWrap <https://github.com/apache/datasketches-java/blob/fbe2fd440353caebba21aebef41c63bdb30b5e0e/src/main/java/org/apache/datasketches/hll/Union.java#L134-L136> when the underlying sketch is not of type HLL_8. I'm wondering if this is the expected process when we've configured the use of HLL_4 sketches (assuming sketches are all configured the same way) ? override def sketchMerge(buffer1: WritableMemory, buffer2: WritableMemory): Unit = { val sketch1 = HllSketch.writableWrap(buffer1) val sketch2 = HllSketch.writableWrap(buffer2) if (sketch1.getTgtHllType == TgtHllType.HLL_8) { Union.writableWrap(buffer1).update(sketch2) } else { val union = new Union(sketch1.getLgConfigK) union.update(sketch1) union.update(sketch2) val newBuffer = union.getResult(sketch1.getTgtHllType).toUpdatableByteArray buffer.putByteArray(0, newBuffer, 0, newBuffer.length) } } Thanks for the Druid references, I'll take a look! Ryan Berti Senior Data Engineer | Ads DE M 7023217573 5808 W Sunset Blvd | Los Angeles, CA 90028 On Tue, Mar 14, 2023 at 8:28 PM Alexander Saydakov <[email protected]> wrote: > First I would like to talk about the overall approach to aggregation. > Usually it is best to arrange a 2-phase process (known as map-reduce or > scatter-gather). The first phase is processing each data partition on some > workers, and the second phase is merging the results on some other workers. > And network shuffle in between. With this in mind, serialization should > happen only at the end of the first phase to transfer the intermediate > results to the second phase workers. Another serialization might be desired > at the end of the whole process if the results contain sketches to be saved > for future use. > We don't have much experience with Spark. There is an example with Theta > sketches on the web site here: > https://datasketches.apache.org/docs/Theta/ThetaSparkExample.html > It might be outdated if Spark API changed since this example was worked > out some years ago. > Regarding memory wrapping. I believe the idea is to have sketches live in > some region of memory that is owned and managed by somebody else. So if, > let's say, during the first phase of aggregation the state of the > aggregation is passed around from update to update as a chunk of memory, > then a sketch or union can recreate itself from that chunk with minimal > overhead by wrapping it. But if you could arrange passing around a sketch > or union object representing that aggregation state, that would be even > better. > I believe there are some static methods to get the required memory size > upfront. Yes, they give an upper bound, so there might be > some overallocation. > I am not sure I understand your question about the union and having to do > something on-heap. > You may want to have a look at how off-heap HLL aggregation is done in > Druid. > > https://github.com/apache/druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java > > https://github.com/apache/druid/blob/master/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java > > > > On Tue, Mar 14, 2023 at 5:33 PM Ryan Berti <[email protected]> > wrote: > >> Hello! >> >> I'm working on integrating Datasketches' HllSketch into Apache Spark, >> such that we have the ability to write out + reaggregate intermediate >> sketches (not currently supported via approx_count_distinct's HLL++ >> implementation). I had a few questions about best practices. >> >> I'm working on an implementation that utilizes a static length byte array >> within Spark's aggregation buffer, wrapped within a WritableMemory >> instance. I'm then wrapping that within a HllSketch instance when I want to >> update the sketch, or wrapping it in a Union instance when I want to merge >> sketches. Hoping someone can give me some guidance on the following: >> >> - I initially was having the HllSketch instances operate 'on-heap' >> and then serializing them out / heapifying them back into existence as >> often as is required by Spark. My bet is that passing around a raw byte >> array (and wrapping with WriteableMemory/HllSketch/Union instances as >> needed) will reduce serialization/deserialization/garbage collection >> overhead. Can someone confirm this is the intended usage/benefit of the >> writeableWrap() functionality? >> - Utilizing the raw byte array requires that I initialize a >> max-sized buffer (given the HllSketch config) up-front, so it seems the >> tradeoff here is that I'm allocating more memory up-front than I may need. >> Is my understanding of the tradeoff correct? >> - The Union implementation will only wrap a HLL_8 typed buffer; right >> now I'm having to have the Union merge sketches 'on-heap' and then >> overwrite the Spark byte buffer with the Union's updateableByteArray when >> the HllSketches aren't configured as HLL_8. I think this is expected, but >> wanted to confirm? >> >> I have a few follow-up questions about Theta sketches, but figured I'd >> start with the HllSketch before broadening the implementation. >> >> Thanks! >> >> Ryan Berti >> >> Senior Data Engineer | Ads DE >> >> M 7023217573 >> >> 5808 W Sunset Blvd | Los Angeles, CA 90028 >> >>
