This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datasketches-go.git
commit a6a057a0ec15446b5a04c9d2553510f2ed88b64c Author: Pierre Lacave <[email protected]> AuthorDate: Mon Dec 18 22:13:36 2023 +0100 Start port of long frequency sketch --- common/utils.go | 10 +- frequencies/long_sketch.go | 134 +++++++++++++ frequencies/long_sketch_test.go | 35 ++++ frequencies/reverse_purge_long_hash_map.go | 249 ++++++++++++++++++++++++ frequencies/reverse_purge_long_hash_map_test.go | 39 ++++ frequencies/utils.go | 40 ++++ hll/hll_sketch_test.go | 2 +- hll/preamble_utils.go | 4 +- 8 files changed, 505 insertions(+), 8 deletions(-) diff --git a/common/utils.go b/common/utils.go index 8e39047..0919843 100644 --- a/common/utils.go +++ b/common/utils.go @@ -44,15 +44,15 @@ func CeilPowerOf2(n int) int { return int(math.Pow(2, math.Ceil(math.Log2(float64(n))))) } -func ExactLog2OfLong(powerOf2 uint64) (int, error) { - if !isLongPowerOf2(powerOf2) { +func ExactLog2(powerOf2 int) (int, error) { + if !isPowerOf2(powerOf2) { return 0, fmt.Errorf("argument 'powerOf2' must be a positive power of 2") } - return bits.TrailingZeros64(powerOf2), nil + return bits.TrailingZeros64(uint64(powerOf2)), nil } -// isLongPowerOf2 returns true if the given number is a power of 2. -func isLongPowerOf2(powerOf2 uint64) bool { +// isPowerOf2 returns true if the given number is a power of 2. +func isPowerOf2(powerOf2 int) bool { return powerOf2 > 0 && (powerOf2&(powerOf2-1)) == 0 } diff --git a/frequencies/long_sketch.go b/frequencies/long_sketch.go new file mode 100644 index 0000000..b7c1ef8 --- /dev/null +++ b/frequencies/long_sketch.go @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package frequencies + +import ( + "fmt" + "github.com/apache/datasketches-go/common" +) + +type LongSketch struct { + // Log2 Maximum length of the arrays internal to the hash map supported by the data + // structure. + lgMaxMapSize int + // The current number of counters supported by the hash map. + curMapCap int //the threshold to purge + // Tracks the total of decremented counts. + offset int64 + // The sum of all frequencies of the stream so far. + streamWeight int64 + // The maximum number of samples used to compute approximate median of counters when doing + // decrement + sampleSize int + // Hash map mapping stored items to approximate counts + hashMap *reversePurgeLongHashMap +} + +const ( + strPreambleTokens = 6 +) + +/** + * Construct this sketch with parameter lgMapMapSize and lgCurMapSize. This internal + * constructor is used when deserializing the sketch. + * + * @param lgMaxMapSize Log2 of the physical size of the internal hash map managed by this + * sketch. The maximum capacity of this internal hash map is 0.75 times 2^lgMaxMapSize. + * Both the ultimate accuracy and size of this sketch are a function of lgMaxMapSize. + * + * @param lgCurMapSize Log2 of the starting (current) physical size of the internal hash + * map managed by this sketch. + */ + +// NewLongSketch returns a new LongSketch with the given lgMaxMapSize and lgCurMapSize. +// lgMaxMapSize is the log2 of the physical size of the internal hash map managed by this +// sketch. The maximum capacity of this internal hash map is 0.75 times 2^lgMaxMapSize. +// Both the ultimate accuracy and size of this sketch are a function of lgMaxMapSize. +// lgCurMapSize is the log2 of the starting (current) physical size of the internal hash +// map managed by this sketch. +func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) { + //set initial size of hash map + lgMaxMapSize = max(lgMaxMapSize, lgMinMapSize) + lgCurMapSize = max(lgCurMapSize, lgMinMapSize) + hashMap, err := NewReversePurgeLongHashMap(1 << lgCurMapSize) + if err != nil { + return nil, err + } + curMapCap := hashMap.getCapacity() + maxMapCap := int(float64(uint64((1 << lgMaxMapSize))) * loadFactor) + offset := int64(0) + sampleSize := min(sampleSize, maxMapCap) + return &LongSketch{ + lgMaxMapSize: int(lgMaxMapSize), + curMapCap: curMapCap, + offset: offset, + sampleSize: sampleSize, + hashMap: hashMap, + }, nil +} + +// NewLongSketchWithDefault constructs a new LongSketch with the given maxMapSize and the +// default initialMapSize (8). +// maxMapSize determines the physical size of the internal hash map managed by this +// sketch and must be a power of 2. The maximum capacity of this internal hash map is +// 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch are a +// function of maxMapSize. +func NewLongSketchWithDefault(maxMapSize int) (*LongSketch, error) { + log2OfInt, err := common.ExactLog2(maxMapSize) + if err != nil { + return nil, fmt.Errorf("maxMapSize, %e", err) + } + return NewLongSketch(log2OfInt, lgMinMapSize) +} + +func (s *LongSketch) getNumActiveItems() int { + return s.hashMap.numActive +} + +// getMaximumMapCapacity returns the maximum number of counters the sketch is configured to +// support. +func (s *LongSketch) getMaximumMapCapacity() int { + return int(float64(uint64(1<<s.lgMaxMapSize)) * loadFactor) +} + +func (s *LongSketch) Update(item int64, count int64) error { + if count == 0 { + return nil + } + if count < 0 { + return fmt.Errorf("count may not be negative") + } + s.streamWeight += count + s.hashMap.adjustOrPutValue(item, count) + + if s.hashMap.numActive > s.curMapCap { + // Over the threshold, we need to do something + if s.hashMap.lgLength < s.lgMaxMapSize { + // Below tgt size, we can grow + s.hashMap.resize(2 * len(s.hashMap.keys)) + s.curMapCap = s.hashMap.getCapacity() + } else { + // At tgt size, must purge + s.offset += s.hashMap.purge(s.sampleSize) + if s.getNumActiveItems() > s.getMaximumMapCapacity() { + return fmt.Errorf("purge did not reduce active items") + } + } + } + return nil +} diff --git a/frequencies/long_sketch_test.go b/frequencies/long_sketch_test.go new file mode 100644 index 0000000..56e3568 --- /dev/null +++ b/frequencies/long_sketch_test.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package frequencies + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestFrequentItsemsStringSerialTest(t *testing.T) { + sketch, err := NewLongSketchWithDefault(8) + assert.NoError(t, err) + //sketch2, err := NewLongSketchWithDefault(128) + //assert.NoError(t, err) + sketch.Update(10, 100) + sketch.Update(10, 100) + sketch.Update(15, 3443) + sketch.Update(1000001, 1010230) + sketch.Update(1000002, 1010230) +} diff --git a/frequencies/reverse_purge_long_hash_map.go b/frequencies/reverse_purge_long_hash_map.go new file mode 100644 index 0000000..99b9a29 --- /dev/null +++ b/frequencies/reverse_purge_long_hash_map.go @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package frequencies + +import ( + "fmt" + "github.com/apache/datasketches-go/common" + "math/bits" + "strconv" + "strings" +) + +type reversePurgeLongHashMap struct { + lgLength int + loadThreshold int + keys []int64 + values []int64 + states []int16 + numActive int +} + +const ( + loadFactor = float64(0.75) + driftLimit = 1024 //used only in stress testing +) + +// NewReversePurgeLongHashMap constructs a new reversePurgeLongHashMap. +// It will create arrays of length mapSize, which must be a power of two. +// This restriction was made to ensure fast hashing. +// The member loadThreshold is then set to the largest value that +// will not overload the hash table. +func NewReversePurgeLongHashMap(mapSize int) (*reversePurgeLongHashMap, error) { + lgLength, err := common.ExactLog2(mapSize) + if err != nil { + return nil, fmt.Errorf("mapSize: %e", err) + } + loadThreshold := int(float64(mapSize) * loadFactor) + keys := make([]int64, mapSize) + values := make([]int64, mapSize) + states := make([]int16, mapSize) + return &reversePurgeLongHashMap{ + lgLength: lgLength, + loadThreshold: loadThreshold, + keys: keys, + values: values, + states: states, + }, nil +} + +// getCapacity returns the current capacity of the hash map (i.e., max number of keys that can be stored). +func (r *reversePurgeLongHashMap) getCapacity() int { + return r.loadThreshold +} + +// adjustOrPutValue adjusts the value associated with the given key. +// Increments the value mapped to the key if the key is present in the map. Otherwise, +// the key is inserted with the putAmount. +// +// key the key of the value to increment +// adjustAmount the amount by which to increment the value +func (r *reversePurgeLongHashMap) adjustOrPutValue(key int64, adjustAmount int64) { + var ( + arrayMask = len(r.keys) - 1 + probe = hash(key) & int64(arrayMask) + drift = 1 + ) + for r.states[probe] != 0 && r.keys[probe] != key { + probe = (probe + 1) & int64(arrayMask) + drift++ + if drift >= driftLimit { + panic("drift >= driftLimit") + } + } + //found either an empty slot or the key + if r.states[probe] == 0 { //found empty slot + // adding the key and value to the table + if r.numActive >= r.loadThreshold { + panic("numActive >= loadThreshold") + } + r.keys[probe] = key + r.values[probe] = adjustAmount + r.states[probe] = int16(drift) //how far off we are + r.numActive++ + } else { //found the key, adjust the value + if r.keys[probe] != key { + panic("keys[probe] != key") + } + r.values[probe] += adjustAmount + } +} + +func (r *reversePurgeLongHashMap) resize(newSize int) { + oldKeys := r.keys + oldValues := r.values + oldStates := r.states + r.keys = make([]int64, newSize) + r.values = make([]int64, newSize) + r.states = make([]int16, newSize) + r.loadThreshold = int(float64(newSize) * loadFactor) + r.lgLength = bits.TrailingZeros(uint(newSize)) + r.numActive = 0 + for i := 0; i < len(oldKeys); i++ { + if oldStates[i] > 0 { + r.adjustOrPutValue(oldKeys[i], oldValues[i]) + } + } + +} + +func (r *reversePurgeLongHashMap) purge(sampleSize int) int64 { + limit := min(sampleSize, r.numActive) + numSamples := 0 + i := 0 + samples := make([]int64, limit) + for numSamples < limit { + if r.states[i] > 0 { + samples[numSamples] = r.values[i] + numSamples++ + } + i++ + } + + // TODO implement quickSelect + //val := quickSelect(samples, 0, numSamples-1, limit/2) + val := int64(0) + r.adjustAllValuesBy(-1 * val) + r.keepOnlyPositiveCounts() + return val +} + +func (r *reversePurgeLongHashMap) serializeToString() string { + var sb strings.Builder + sb.WriteString(fmt.Sprintf("%d,%d,", r.numActive, len(r.keys))) + for i := 0; i < len(r.keys); i++ { + if r.states[i] != 0 { + sb.WriteString(fmt.Sprintf("%d,%d,", r.keys[i], r.values[i])) + } + } + return sb.String() +} + +// adjustAllValuesBy adjust amount value by which to shift all values. Only keys corresponding to positive +// values are retained. +func (r *reversePurgeLongHashMap) adjustAllValuesBy(adjustAmount int64) { + for i := len(r.keys); i > 0; { + i-- + r.values[i] += adjustAmount + } +} + +func (r *reversePurgeLongHashMap) keepOnlyPositiveCounts() { + // Starting from the back, find the first empty cell, which marks a boundary between clusters. + firstProbe := len(r.keys) - 1 + for r.states[firstProbe] > 0 { + firstProbe-- + } + //Work towards the front; delete any non-positive entries. + for probe := firstProbe; probe > 0; { + // When we find the next non-empty cell, we know we are at the high end of a cluster, + // which is tracked by firstProbe. + if r.states[probe] > 0 && r.values[probe] <= 0 { + r.hashDelete(probe) //does the work of deletion and moving higher items towards the front. + r.numActive-- + } + } + //now work on the first cluster that was skipped. + for probe := len(r.keys); probe > firstProbe; { + if r.states[probe] > 0 && r.values[probe] <= 0 { + r.hashDelete(probe) + r.numActive-- + } + } +} + +func (r *reversePurgeLongHashMap) hashDelete(deleteProbe int) { + // Looks ahead in the table to search for another item to move to this location. + // If none are found, the status is changed + r.states[deleteProbe] = 0 //mark as empty + drift := 1 + arrayMask := len(r.keys) - 1 + probe := (deleteProbe + drift) & arrayMask //map length must be a power of 2 + // advance until you find a free location replacing locations as needed + for r.states[probe] != 0 { + if r.states[probe] > int16(drift) { + // move current element + r.keys[deleteProbe] = r.keys[probe] + r.values[deleteProbe] = r.values[probe] + r.states[deleteProbe] = r.states[probe] - int16(drift) + // marking the current probe location as deleted + r.states[probe] = 0 + drift = 0 + deleteProbe = probe + } + probe = (probe + 1) & arrayMask + drift++ + //only used for theoretical analysis + if drift >= driftLimit { + panic("drift >= driftLimit") + } + } +} + +func deserializeReversePurgeLongHashMapFromString(string string) (*reversePurgeLongHashMap, error) { + tokens := strings.Split(string, ",") + if len(tokens) < 2 { + panic("len(tokens) < 2") + } + numActive, err := strconv.Atoi(tokens[0]) + if err != nil { + return nil, err + } + length, err := strconv.Atoi(tokens[1]) + if err != nil { + return nil, err + } + table, err := NewReversePurgeLongHashMap(length) + if err != nil { + return nil, err + } + j := 2 + for i := 0; i < numActive; i++ { + key, err := strconv.Atoi(tokens[j]) + if err != nil { + return nil, err + } + value, err := strconv.Atoi(tokens[j+1]) + if err != nil { + return nil, err + } + table.adjustOrPutValue(int64(key), int64(value)) + j += 2 + } + return table, nil +} diff --git a/frequencies/reverse_purge_long_hash_map_test.go b/frequencies/reverse_purge_long_hash_map_test.go new file mode 100644 index 0000000..c157541 --- /dev/null +++ b/frequencies/reverse_purge_long_hash_map_test.go @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package frequencies + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestHashMapSerial(t *testing.T) { + mp, err := NewReversePurgeLongHashMap(8) + assert.NoError(t, err) + mp.adjustOrPutValue(10, 15) + mp.adjustOrPutValue(10, 5) + mp.adjustOrPutValue(1, 1) + mp.adjustOrPutValue(2, 3) + strMp := mp.serializeToString() + + newMp, err := deserializeReversePurgeLongHashMapFromString(strMp) + assert.NoError(t, err) + newStrMp := newMp.serializeToString() + assert.Equal(t, strMp, newStrMp) + +} diff --git a/frequencies/utils.go b/frequencies/utils.go new file mode 100644 index 0000000..05b2d57 --- /dev/null +++ b/frequencies/utils.go @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package frequencies + +const ( + lgMinMapSize = 5 + // This constant is large enough so that computing the median of SAMPLE_SIZE + // randomly selected entries from a list of numbers and outputting + // the empirical median will give a constant-factor approximation to the + // true median with high probability. + sampleSize = 1024 +) + +// hash returns an index into the hash table. +// This hash function is taken from the internals of Austin Appleby's MurmurHash3 algorithm. +// It is also used by the Trove for Java libraries. +func hash(okey int64) int64 { + key := uint64(okey) + key ^= key >> 33 + key *= 0xff51afd7ed558ccd + key ^= key >> 33 + key *= 0xc4ceb9fe1a85ec53 + key ^= key >> 33 + return int64(key) +} diff --git a/hll/hll_sketch_test.go b/hll/hll_sketch_test.go index 6de958d..bb9fd02 100644 --- a/hll/hll_sketch_test.go +++ b/hll/hll_sketch_test.go @@ -246,7 +246,7 @@ func TestHLLDataSketchT(b *testing.T) { } est, err := hll.GetEstimate() assert.NoError(b, err) - assert.InDelta(b, 1000000, est, float64(1000000)*0.03) + assert.InDelta(b, 1000000, est, float64(1000000)*0.00028) } diff --git a/hll/preamble_utils.go b/hll/preamble_utils.go index 9100647..e903a40 100644 --- a/hll/preamble_utils.go +++ b/hll/preamble_utils.go @@ -174,11 +174,11 @@ func computeLgArr(byteArr []byte, couponCount int, lgConfigK int) (int, error) { ceilPwr2 <<= 1 } if curMode == curModeSet { - v, err := common.ExactLog2OfLong(uint64(ceilPwr2)) + v, err := common.ExactLog2(uint64(ceilPwr2)) return max(lgInitSetSize, v), err } //only used for HLL4 - v, err := common.ExactLog2OfLong(uint64(ceilPwr2)) + v, err := common.ExactLog2(uint64(ceilPwr2)) return max(lgAuxArrInts[lgConfigK], v), err } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
