This is an automated email from the ASF dual-hosted git repository.

alsay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-go.git

commit a6a057a0ec15446b5a04c9d2553510f2ed88b64c
Author: Pierre Lacave <[email protected]>
AuthorDate: Mon Dec 18 22:13:36 2023 +0100

    Start port of long frequency sketch
---
 common/utils.go                                 |  10 +-
 frequencies/long_sketch.go                      | 134 +++++++++++++
 frequencies/long_sketch_test.go                 |  35 ++++
 frequencies/reverse_purge_long_hash_map.go      | 249 ++++++++++++++++++++++++
 frequencies/reverse_purge_long_hash_map_test.go |  39 ++++
 frequencies/utils.go                            |  40 ++++
 hll/hll_sketch_test.go                          |   2 +-
 hll/preamble_utils.go                           |   4 +-
 8 files changed, 505 insertions(+), 8 deletions(-)

diff --git a/common/utils.go b/common/utils.go
index 8e39047..0919843 100644
--- a/common/utils.go
+++ b/common/utils.go
@@ -44,15 +44,15 @@ func CeilPowerOf2(n int) int {
        return int(math.Pow(2, math.Ceil(math.Log2(float64(n)))))
 }
 
-func ExactLog2OfLong(powerOf2 uint64) (int, error) {
-       if !isLongPowerOf2(powerOf2) {
+func ExactLog2(powerOf2 int) (int, error) {
+       if !isPowerOf2(powerOf2) {
                return 0, fmt.Errorf("argument 'powerOf2' must be a positive 
power of 2")
        }
-       return bits.TrailingZeros64(powerOf2), nil
+       return bits.TrailingZeros64(uint64(powerOf2)), nil
 }
 
-// isLongPowerOf2 returns true if the given number is a power of 2.
-func isLongPowerOf2(powerOf2 uint64) bool {
+// isPowerOf2 returns true if the given number is a power of 2.
+func isPowerOf2(powerOf2 int) bool {
        return powerOf2 > 0 && (powerOf2&(powerOf2-1)) == 0
 }
 
diff --git a/frequencies/long_sketch.go b/frequencies/long_sketch.go
new file mode 100644
index 0000000..b7c1ef8
--- /dev/null
+++ b/frequencies/long_sketch.go
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package frequencies
+
+import (
+       "fmt"
+       "github.com/apache/datasketches-go/common"
+)
+
+type LongSketch struct {
+       // Log2 Maximum length of the arrays internal to the hash map supported 
by the data
+       // structure.
+       lgMaxMapSize int
+       // The current number of counters supported by the hash map.
+       curMapCap int //the threshold to purge
+       // Tracks the total of decremented counts.
+       offset int64
+       // The sum of all frequencies of the stream so far.
+       streamWeight int64
+       // The maximum number of samples used to compute approximate median of 
counters when doing
+       // decrement
+       sampleSize int
+       // Hash map mapping stored items to approximate counts
+       hashMap *reversePurgeLongHashMap
+}
+
+const (
+       strPreambleTokens = 6
+)
+
+/**
+ * Construct this sketch with parameter lgMapMapSize and lgCurMapSize. This 
internal
+ * constructor is used when deserializing the sketch.
+ *
+ * @param lgMaxMapSize Log2 of the physical size of the internal hash map 
managed by this
+ * sketch. The maximum capacity of this internal hash map is 0.75 times 
2^lgMaxMapSize.
+ * Both the ultimate accuracy and size of this sketch are a function of 
lgMaxMapSize.
+ *
+ * @param lgCurMapSize Log2 of the starting (current) physical size of the 
internal hash
+ * map managed by this sketch.
+ */
+
+// NewLongSketch returns a new LongSketch with the given lgMaxMapSize and 
lgCurMapSize.
+// lgMaxMapSize is the log2 of the physical size of the internal hash map 
managed by this
+// sketch. The maximum capacity of this internal hash map is 0.75 times 
2^lgMaxMapSize.
+// Both the ultimate accuracy and size of this sketch are a function of 
lgMaxMapSize.
+// lgCurMapSize is the log2 of the starting (current) physical size of the 
internal hash
+// map managed by this sketch.
+func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) {
+       //set initial size of hash map
+       lgMaxMapSize = max(lgMaxMapSize, lgMinMapSize)
+       lgCurMapSize = max(lgCurMapSize, lgMinMapSize)
+       hashMap, err := NewReversePurgeLongHashMap(1 << lgCurMapSize)
+       if err != nil {
+               return nil, err
+       }
+       curMapCap := hashMap.getCapacity()
+       maxMapCap := int(float64(uint64((1 << lgMaxMapSize))) * loadFactor)
+       offset := int64(0)
+       sampleSize := min(sampleSize, maxMapCap)
+       return &LongSketch{
+               lgMaxMapSize: int(lgMaxMapSize),
+               curMapCap:    curMapCap,
+               offset:       offset,
+               sampleSize:   sampleSize,
+               hashMap:      hashMap,
+       }, nil
+}
+
+// NewLongSketchWithDefault constructs a new LongSketch with the given 
maxMapSize and the
+// default initialMapSize (8).
+// maxMapSize determines the physical size of the internal hash map managed by 
this
+// sketch and must be a power of 2.  The maximum capacity of this internal 
hash map is
+// 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch 
are a
+// function of maxMapSize.
+func NewLongSketchWithDefault(maxMapSize int) (*LongSketch, error) {
+       log2OfInt, err := common.ExactLog2(maxMapSize)
+       if err != nil {
+               return nil, fmt.Errorf("maxMapSize, %e", err)
+       }
+       return NewLongSketch(log2OfInt, lgMinMapSize)
+}
+
+func (s *LongSketch) getNumActiveItems() int {
+       return s.hashMap.numActive
+}
+
+// getMaximumMapCapacity returns the maximum number of counters the sketch is 
configured to
+// support.
+func (s *LongSketch) getMaximumMapCapacity() int {
+       return int(float64(uint64(1<<s.lgMaxMapSize)) * loadFactor)
+}
+
+func (s *LongSketch) Update(item int64, count int64) error {
+       if count == 0 {
+               return nil
+       }
+       if count < 0 {
+               return fmt.Errorf("count may not be negative")
+       }
+       s.streamWeight += count
+       s.hashMap.adjustOrPutValue(item, count)
+
+       if s.hashMap.numActive > s.curMapCap {
+               // Over the threshold, we need to do something
+               if s.hashMap.lgLength < s.lgMaxMapSize {
+                       // Below tgt size, we can grow
+                       s.hashMap.resize(2 * len(s.hashMap.keys))
+                       s.curMapCap = s.hashMap.getCapacity()
+               } else {
+                       // At tgt size, must purge
+                       s.offset += s.hashMap.purge(s.sampleSize)
+                       if s.getNumActiveItems() > s.getMaximumMapCapacity() {
+                               return fmt.Errorf("purge did not reduce active 
items")
+                       }
+               }
+       }
+       return nil
+}
diff --git a/frequencies/long_sketch_test.go b/frequencies/long_sketch_test.go
new file mode 100644
index 0000000..56e3568
--- /dev/null
+++ b/frequencies/long_sketch_test.go
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package frequencies
+
+import (
+       "github.com/stretchr/testify/assert"
+       "testing"
+)
+
+func TestFrequentItsemsStringSerialTest(t *testing.T) {
+       sketch, err := NewLongSketchWithDefault(8)
+       assert.NoError(t, err)
+       //sketch2, err := NewLongSketchWithDefault(128)
+       //assert.NoError(t, err)
+       sketch.Update(10, 100)
+       sketch.Update(10, 100)
+       sketch.Update(15, 3443)
+       sketch.Update(1000001, 1010230)
+       sketch.Update(1000002, 1010230)
+}
diff --git a/frequencies/reverse_purge_long_hash_map.go 
b/frequencies/reverse_purge_long_hash_map.go
new file mode 100644
index 0000000..99b9a29
--- /dev/null
+++ b/frequencies/reverse_purge_long_hash_map.go
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package frequencies
+
+import (
+       "fmt"
+       "github.com/apache/datasketches-go/common"
+       "math/bits"
+       "strconv"
+       "strings"
+)
+
+type reversePurgeLongHashMap struct {
+       lgLength      int
+       loadThreshold int
+       keys          []int64
+       values        []int64
+       states        []int16
+       numActive     int
+}
+
+const (
+       loadFactor = float64(0.75)
+       driftLimit = 1024 //used only in stress testing
+)
+
+// NewReversePurgeLongHashMap constructs a new reversePurgeLongHashMap.
+// It will create arrays of length mapSize, which must be a power of two.
+// This restriction was made to ensure fast hashing.
+// The member loadThreshold is then set to the largest value that
+// will not overload the hash table.
+func NewReversePurgeLongHashMap(mapSize int) (*reversePurgeLongHashMap, error) 
{
+       lgLength, err := common.ExactLog2(mapSize)
+       if err != nil {
+               return nil, fmt.Errorf("mapSize: %e", err)
+       }
+       loadThreshold := int(float64(mapSize) * loadFactor)
+       keys := make([]int64, mapSize)
+       values := make([]int64, mapSize)
+       states := make([]int16, mapSize)
+       return &reversePurgeLongHashMap{
+               lgLength:      lgLength,
+               loadThreshold: loadThreshold,
+               keys:          keys,
+               values:        values,
+               states:        states,
+       }, nil
+}
+
+// getCapacity returns the current capacity of the hash map (i.e., max number 
of keys that can be stored).
+func (r *reversePurgeLongHashMap) getCapacity() int {
+       return r.loadThreshold
+}
+
+// adjustOrPutValue adjusts the value associated with the given key.
+// Increments the value mapped to the key if the key is present in the map. 
Otherwise,
+// the key is inserted with the putAmount.
+//
+// key the key of the value to increment
+// adjustAmount the amount by which to increment the value
+func (r *reversePurgeLongHashMap) adjustOrPutValue(key int64, adjustAmount 
int64) {
+       var (
+               arrayMask = len(r.keys) - 1
+               probe     = hash(key) & int64(arrayMask)
+               drift     = 1
+       )
+       for r.states[probe] != 0 && r.keys[probe] != key {
+               probe = (probe + 1) & int64(arrayMask)
+               drift++
+               if drift >= driftLimit {
+                       panic("drift >= driftLimit")
+               }
+       }
+       //found either an empty slot or the key
+       if r.states[probe] == 0 { //found empty slot
+               // adding the key and value to the table
+               if r.numActive >= r.loadThreshold {
+                       panic("numActive >= loadThreshold")
+               }
+               r.keys[probe] = key
+               r.values[probe] = adjustAmount
+               r.states[probe] = int16(drift) //how far off we are
+               r.numActive++
+       } else { //found the key, adjust the value
+               if r.keys[probe] != key {
+                       panic("keys[probe] != key")
+               }
+               r.values[probe] += adjustAmount
+       }
+}
+
+func (r *reversePurgeLongHashMap) resize(newSize int) {
+       oldKeys := r.keys
+       oldValues := r.values
+       oldStates := r.states
+       r.keys = make([]int64, newSize)
+       r.values = make([]int64, newSize)
+       r.states = make([]int16, newSize)
+       r.loadThreshold = int(float64(newSize) * loadFactor)
+       r.lgLength = bits.TrailingZeros(uint(newSize))
+       r.numActive = 0
+       for i := 0; i < len(oldKeys); i++ {
+               if oldStates[i] > 0 {
+                       r.adjustOrPutValue(oldKeys[i], oldValues[i])
+               }
+       }
+
+}
+
+func (r *reversePurgeLongHashMap) purge(sampleSize int) int64 {
+       limit := min(sampleSize, r.numActive)
+       numSamples := 0
+       i := 0
+       samples := make([]int64, limit)
+       for numSamples < limit {
+               if r.states[i] > 0 {
+                       samples[numSamples] = r.values[i]
+                       numSamples++
+               }
+               i++
+       }
+
+       // TODO implement quickSelect
+       //val := quickSelect(samples, 0, numSamples-1, limit/2)
+       val := int64(0)
+       r.adjustAllValuesBy(-1 * val)
+       r.keepOnlyPositiveCounts()
+       return val
+}
+
+func (r *reversePurgeLongHashMap) serializeToString() string {
+       var sb strings.Builder
+       sb.WriteString(fmt.Sprintf("%d,%d,", r.numActive, len(r.keys)))
+       for i := 0; i < len(r.keys); i++ {
+               if r.states[i] != 0 {
+                       sb.WriteString(fmt.Sprintf("%d,%d,", r.keys[i], 
r.values[i]))
+               }
+       }
+       return sb.String()
+}
+
+// adjustAllValuesBy adjust amount value by which to shift all values. Only 
keys corresponding to positive
+// values are retained.
+func (r *reversePurgeLongHashMap) adjustAllValuesBy(adjustAmount int64) {
+       for i := len(r.keys); i > 0; {
+               i--
+               r.values[i] += adjustAmount
+       }
+}
+
+func (r *reversePurgeLongHashMap) keepOnlyPositiveCounts() {
+       // Starting from the back, find the first empty cell, which marks a 
boundary between clusters.
+       firstProbe := len(r.keys) - 1
+       for r.states[firstProbe] > 0 {
+               firstProbe--
+       }
+       //Work towards the front; delete any non-positive entries.
+       for probe := firstProbe; probe > 0; {
+               // When we find the next non-empty cell, we know we are at the 
high end of a cluster,
+               //  which is tracked by firstProbe.
+               if r.states[probe] > 0 && r.values[probe] <= 0 {
+                       r.hashDelete(probe) //does the work of deletion and 
moving higher items towards the front.
+                       r.numActive--
+               }
+       }
+       //now work on the first cluster that was skipped.
+       for probe := len(r.keys); probe > firstProbe; {
+               if r.states[probe] > 0 && r.values[probe] <= 0 {
+                       r.hashDelete(probe)
+                       r.numActive--
+               }
+       }
+}
+
+func (r *reversePurgeLongHashMap) hashDelete(deleteProbe int) {
+       // Looks ahead in the table to search for another item to move to this 
location.
+       // If none are found, the status is changed
+       r.states[deleteProbe] = 0 //mark as empty
+       drift := 1
+       arrayMask := len(r.keys) - 1
+       probe := (deleteProbe + drift) & arrayMask //map length must be a power 
of 2
+       // advance until you find a free location replacing locations as needed
+       for r.states[probe] != 0 {
+               if r.states[probe] > int16(drift) {
+                       // move current element
+                       r.keys[deleteProbe] = r.keys[probe]
+                       r.values[deleteProbe] = r.values[probe]
+                       r.states[deleteProbe] = r.states[probe] - int16(drift)
+                       // marking the current probe location as deleted
+                       r.states[probe] = 0
+                       drift = 0
+                       deleteProbe = probe
+               }
+               probe = (probe + 1) & arrayMask
+               drift++
+               //only used for theoretical analysis
+               if drift >= driftLimit {
+                       panic("drift >= driftLimit")
+               }
+       }
+}
+
+func deserializeReversePurgeLongHashMapFromString(string string) 
(*reversePurgeLongHashMap, error) {
+       tokens := strings.Split(string, ",")
+       if len(tokens) < 2 {
+               panic("len(tokens) < 2")
+       }
+       numActive, err := strconv.Atoi(tokens[0])
+       if err != nil {
+               return nil, err
+       }
+       length, err := strconv.Atoi(tokens[1])
+       if err != nil {
+               return nil, err
+       }
+       table, err := NewReversePurgeLongHashMap(length)
+       if err != nil {
+               return nil, err
+       }
+       j := 2
+       for i := 0; i < numActive; i++ {
+               key, err := strconv.Atoi(tokens[j])
+               if err != nil {
+                       return nil, err
+               }
+               value, err := strconv.Atoi(tokens[j+1])
+               if err != nil {
+                       return nil, err
+               }
+               table.adjustOrPutValue(int64(key), int64(value))
+               j += 2
+       }
+       return table, nil
+}
diff --git a/frequencies/reverse_purge_long_hash_map_test.go 
b/frequencies/reverse_purge_long_hash_map_test.go
new file mode 100644
index 0000000..c157541
--- /dev/null
+++ b/frequencies/reverse_purge_long_hash_map_test.go
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package frequencies
+
+import (
+       "github.com/stretchr/testify/assert"
+       "testing"
+)
+
+func TestHashMapSerial(t *testing.T) {
+       mp, err := NewReversePurgeLongHashMap(8)
+       assert.NoError(t, err)
+       mp.adjustOrPutValue(10, 15)
+       mp.adjustOrPutValue(10, 5)
+       mp.adjustOrPutValue(1, 1)
+       mp.adjustOrPutValue(2, 3)
+       strMp := mp.serializeToString()
+
+       newMp, err := deserializeReversePurgeLongHashMapFromString(strMp)
+       assert.NoError(t, err)
+       newStrMp := newMp.serializeToString()
+       assert.Equal(t, strMp, newStrMp)
+
+}
diff --git a/frequencies/utils.go b/frequencies/utils.go
new file mode 100644
index 0000000..05b2d57
--- /dev/null
+++ b/frequencies/utils.go
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package frequencies
+
+const (
+       lgMinMapSize = 5
+       // This constant is large enough so that computing the median of 
SAMPLE_SIZE
+       // randomly selected entries from a list of numbers and outputting
+       // the empirical median will give a constant-factor approximation to the
+       // true median with high probability.
+       sampleSize = 1024
+)
+
+// hash returns an index into the hash table.
+// This hash function is taken from the internals of Austin Appleby's 
MurmurHash3 algorithm.
+// It is also used by the Trove for Java libraries.
+func hash(okey int64) int64 {
+       key := uint64(okey)
+       key ^= key >> 33
+       key *= 0xff51afd7ed558ccd
+       key ^= key >> 33
+       key *= 0xc4ceb9fe1a85ec53
+       key ^= key >> 33
+       return int64(key)
+}
diff --git a/hll/hll_sketch_test.go b/hll/hll_sketch_test.go
index 6de958d..bb9fd02 100644
--- a/hll/hll_sketch_test.go
+++ b/hll/hll_sketch_test.go
@@ -246,7 +246,7 @@ func TestHLLDataSketchT(b *testing.T) {
        }
        est, err := hll.GetEstimate()
        assert.NoError(b, err)
-       assert.InDelta(b, 1000000, est, float64(1000000)*0.03)
+       assert.InDelta(b, 1000000, est, float64(1000000)*0.00028)
 
 }
 
diff --git a/hll/preamble_utils.go b/hll/preamble_utils.go
index 9100647..e903a40 100644
--- a/hll/preamble_utils.go
+++ b/hll/preamble_utils.go
@@ -174,11 +174,11 @@ func computeLgArr(byteArr []byte, couponCount int, 
lgConfigK int) (int, error) {
                ceilPwr2 <<= 1
        }
        if curMode == curModeSet {
-               v, err := common.ExactLog2OfLong(uint64(ceilPwr2))
+               v, err := common.ExactLog2(uint64(ceilPwr2))
                return max(lgInitSetSize, v), err
        }
        //only used for HLL4
-       v, err := common.ExactLog2OfLong(uint64(ceilPwr2))
+       v, err := common.ExactLog2(uint64(ceilPwr2))
        return max(lgAuxArrInts[lgConfigK], v), err
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to