proost commented on code in PR #104:
URL: https://github.com/apache/datasketches-go/pull/104#discussion_r2703183206


##########
sampling/reservoir_items_sketch.go:
##########
@@ -115,6 +115,62 @@ func (s *ReservoirItemsSketch[T]) Reset() {
        s.data = s.data[:0]
 }
 
+// GetImplicitSampleWeight returns N/K when in sampling mode, or 1.0 in exact 
mode.
+func (s *ReservoirItemsSketch[T]) GetImplicitSampleWeight() float64 {

Review Comment:
   can you remove `Get` prefix?



##########
sampling/reservoir_items_union.go:
##########
@@ -36,73 +46,137 @@ func NewReservoirItemsUnion[T any](maxK int) 
(*ReservoirItemsUnion[T], error) {
                return nil, errors.New("maxK must be at least 1")
        }
 
-       gadget, err := NewReservoirItemsSketch[T](maxK)
-       if err != nil {
-               return nil, err
-       }
-
        return &ReservoirItemsUnion[T]{
                maxK:   maxK,
-               gadget: gadget,
+               gadget: nil, // Start with nil gadget, will be initialized on 
first update
        }, nil
 }
 
 // Update adds a single item to the union.
 func (u *ReservoirItemsUnion[T]) Update(item T) {
+       if u.gadget == nil {
+               u.gadget, _ = NewReservoirItemsSketch[T](u.maxK)
+       }
        u.gadget.Update(item)
 }
 
 // UpdateSketch merges another sketch into the union.
+// This implements Java's update(ReservoirItemsSketch) with 
twoWayMergeInternal logic.
 func (u *ReservoirItemsUnion[T]) UpdateSketch(sketch *ReservoirItemsSketch[T]) 
{
        if sketch == nil || sketch.IsEmpty() {
                return
        }
 
-       samples := sketch.Samples()
-       srcN := sketch.N()
+       // Downsample if input K > maxK
+       ris := sketch
+       if sketch.K() > u.maxK {
+               ris = sketch.DownsampledCopy(u.maxK)
+       }
 
-       if u.gadget.IsEmpty() {
-               // If gadget is empty, copy the source directly
-               for _, v := range samples {
-                       u.gadget.data = append(u.gadget.data, v)
-               }
-               u.gadget.n = srcN
+       // Initialize gadget if empty
+       if u.gadget == nil || u.gadget.IsEmpty() {
+               u.createNewGadget(ris)
                return
        }
 
-       // Merge using weighted sampling
-       gadgetN := u.gadget.N()
-       totalN := gadgetN + srcN
-       gadgetK := u.gadget.NumSamples()
-       targetK := u.maxK
-
-       for _, item := range samples {
-               if u.gadget.NumSamples() < targetK {
-                       u.gadget.data = append(u.gadget.data, item)
-               } else {
-                       j := rand.Int63n(totalN)
-                       if j < int64(targetK) {
-                               u.gadget.data[j%int64(gadgetK)] = item
-                       }
+       u.twoWayMergeInternal(ris)
+}
+
+// UpdateFromRaw creates a sketch from raw components and merges it.
+// Useful in distributed environments. Items slice is used directly, not 
copied.
+func (u *ReservoirItemsUnion[T]) UpdateFromRaw(n int64, k int, items []T) {
+       if len(items) == 0 {
+               return
+       }
+
+       sketch := &ReservoirItemsSketch[T]{
+               k:    k,
+               n:    n,
+               data: items,
+       }
+
+       u.UpdateSketch(sketch)
+}
+
+// createNewGadget initializes the gadget based on the source sketch.
+// If source is in exact mode with K < maxK: upgrade to maxK.
+// Otherwise: preserve source's K.
+func (u *ReservoirItemsUnion[T]) createNewGadget(source 
*ReservoirItemsSketch[T]) {
+       if source.K() < u.maxK && source.N() <= int64(source.K()) {
+
+               u.gadget, _ = NewReservoirItemsSketch[T](u.maxK)
+               for i := 0; i < source.NumSamples(); i++ {

Review Comment:
   can you use "twoWayMergeInternalStandard" instead?



##########
sampling/reservoir_items_sketch.go:
##########
@@ -115,6 +115,62 @@ func (s *ReservoirItemsSketch[T]) Reset() {
        s.data = s.data[:0]
 }
 
+// GetImplicitSampleWeight returns N/K when in sampling mode, or 1.0 in exact 
mode.
+func (s *ReservoirItemsSketch[T]) GetImplicitSampleWeight() float64 {
+       if s.n < int64(s.k) {
+               return 1.0
+       }
+       return float64(s.n) / float64(s.k)
+}
+
+// Copy returns a deep copy of the sketch.
+func (s *ReservoirItemsSketch[T]) Copy() *ReservoirItemsSketch[T] {
+       dataCopy := make([]T, len(s.data))
+       copy(dataCopy, s.data)
+       return &ReservoirItemsSketch[T]{
+               k:    s.k,
+               n:    s.n,
+               data: dataCopy,
+       }
+}
+
+// DownsampledCopy returns a copy with a reduced reservoir size.
+// If newK >= current K, returns a regular copy.
+func (s *ReservoirItemsSketch[T]) DownsampledCopy(newK int) 
*ReservoirItemsSketch[T] {
+       if newK >= s.k {
+               return s.Copy()
+       }
+
+       result, _ := NewReservoirItemsSketch[T](newK)
+
+       samples := s.Samples()
+       for _, item := range samples {
+               result.Update(item)
+       }
+
+       // Adjust N to preserve correct implicit weights
+       if result.n < s.n {
+               result.forceIncrementItemsSeen(s.n - result.n)
+       }
+
+       return result
+}
+
+// getValueAtPosition returns the item at the given position.
+func (s *ReservoirItemsSketch[T]) getValueAtPosition(pos int) T {
+       return s.data[pos]
+}
+
+// insertValueAtPosition replaces the item at the given position.
+func (s *ReservoirItemsSketch[T]) insertValueAtPosition(item T, pos int) {
+       s.data[pos] = item
+}
+
+// forceIncrementItemsSeen adds delta to the items seen count.
+func (s *ReservoirItemsSketch[T]) forceIncrementItemsSeen(delta int64) {

Review Comment:
   We need to check that capacity is enough. If not we should return error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to