Fengzdadi commented on code in PR #104:
URL: https://github.com/apache/datasketches-go/pull/104#discussion_r2703450095


##########
sampling/reservoir_items_union.go:
##########
@@ -36,73 +46,137 @@ func NewReservoirItemsUnion[T any](maxK int) 
(*ReservoirItemsUnion[T], error) {
                return nil, errors.New("maxK must be at least 1")
        }
 
-       gadget, err := NewReservoirItemsSketch[T](maxK)
-       if err != nil {
-               return nil, err
-       }
-
        return &ReservoirItemsUnion[T]{
                maxK:   maxK,
-               gadget: gadget,
+               gadget: nil, // Start with nil gadget, will be initialized on 
first update
        }, nil
 }
 
 // Update adds a single item to the union.
 func (u *ReservoirItemsUnion[T]) Update(item T) {
+       if u.gadget == nil {
+               u.gadget, _ = NewReservoirItemsSketch[T](u.maxK)
+       }
        u.gadget.Update(item)
 }
 
 // UpdateSketch merges another sketch into the union.
+// This implements Java's update(ReservoirItemsSketch) with 
twoWayMergeInternal logic.
 func (u *ReservoirItemsUnion[T]) UpdateSketch(sketch *ReservoirItemsSketch[T]) 
{
        if sketch == nil || sketch.IsEmpty() {
                return
        }
 
-       samples := sketch.Samples()
-       srcN := sketch.N()
+       // Downsample if input K > maxK
+       ris := sketch
+       if sketch.K() > u.maxK {
+               ris = sketch.DownsampledCopy(u.maxK)
+       }
 
-       if u.gadget.IsEmpty() {
-               // If gadget is empty, copy the source directly
-               for _, v := range samples {
-                       u.gadget.data = append(u.gadget.data, v)
-               }
-               u.gadget.n = srcN
+       // Initialize gadget if empty
+       if u.gadget == nil || u.gadget.IsEmpty() {
+               u.createNewGadget(ris)
                return
        }
 
-       // Merge using weighted sampling
-       gadgetN := u.gadget.N()
-       totalN := gadgetN + srcN
-       gadgetK := u.gadget.NumSamples()
-       targetK := u.maxK
-
-       for _, item := range samples {
-               if u.gadget.NumSamples() < targetK {
-                       u.gadget.data = append(u.gadget.data, item)
-               } else {
-                       j := rand.Int63n(totalN)
-                       if j < int64(targetK) {
-                               u.gadget.data[j%int64(gadgetK)] = item
-                       }
+       u.twoWayMergeInternal(ris)
+}
+
+// UpdateFromRaw creates a sketch from raw components and merges it.
+// Useful in distributed environments. Items slice is used directly, not 
copied.
+func (u *ReservoirItemsUnion[T]) UpdateFromRaw(n int64, k int, items []T) {
+       if len(items) == 0 {
+               return
+       }
+
+       sketch := &ReservoirItemsSketch[T]{
+               k:    k,
+               n:    n,
+               data: items,
+       }
+
+       u.UpdateSketch(sketch)
+}
+
+// createNewGadget initializes the gadget based on the source sketch.
+// If source is in exact mode with K < maxK: upgrade to maxK.
+// Otherwise: preserve source's K.
+func (u *ReservoirItemsUnion[T]) createNewGadget(source 
*ReservoirItemsSketch[T]) {
+       if source.K() < u.maxK && source.N() <= int64(source.K()) {
+
+               u.gadget, _ = NewReservoirItemsSketch[T](u.maxK)
+               for i := 0; i < source.NumSamples(); i++ {

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to