This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datasketches-go.git
commit 21bb5b544413d7fde4cc4c09f2f816d3274db148 Author: Pierre Lacave <[email protected]> AuthorDate: Thu Dec 21 09:08:57 2023 +0100 rename to LongsSketch and add more tests --- frequencies/dist_test.go | 13 ++++ frequencies/{long_sketch.go => longs_sketch.go} | 68 +++++++++------- .../{long_sketch_test.go => longs_sketch_test.go} | 90 +++++++++++++++++++++- frequencies/reverse_purge_long_hash_map.go | 12 +-- frequencies/row.go | 14 +++- 5 files changed, 161 insertions(+), 36 deletions(-) diff --git a/frequencies/dist_test.go b/frequencies/dist_test.go new file mode 100644 index 0000000..2b67ae0 --- /dev/null +++ b/frequencies/dist_test.go @@ -0,0 +1,13 @@ +package frequencies + +import ( + "math" + "math/rand" +) + +func randomGeometricDist(prob float64) int64 { + if prob <= 0.0 || prob >= 1.0 { + panic("prob must be in (0, 1)") + } + return int64(1 + math.Log(rand.Float64())/math.Log(1.0-prob)) +} diff --git a/frequencies/long_sketch.go b/frequencies/longs_sketch.go similarity index 85% rename from frequencies/long_sketch.go rename to frequencies/longs_sketch.go index 544a252..ecb51a2 100644 --- a/frequencies/long_sketch.go +++ b/frequencies/longs_sketch.go @@ -26,7 +26,7 @@ import ( "strings" ) -type LongSketch struct { +type LongsSketch struct { // Log2 Maximum length of the arrays internal to the hash map supported by the data // structure. lgMaxMapSize int @@ -59,13 +59,13 @@ const ( * map managed by this sketch. */ -// NewLongSketch returns a new LongSketch with the given lgMaxMapSize and lgCurMapSize. +// NewLongSketch returns a new LongsSketch with the given lgMaxMapSize and lgCurMapSize. // lgMaxMapSize is the log2 of the physical size of the internal hash map managed by this // sketch. The maximum capacity of this internal hash map is 0.75 times 2^lgMaxMapSize. // Both the ultimate accuracy and size of this sketch are a function of lgMaxMapSize. // lgCurMapSize is the log2 of the starting (current) physical size of the internal hash // map managed by this sketch. -func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) { +func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongsSketch, error) { //set initial size of hash map lgMaxMapSize = max(lgMaxMapSize, _LG_MIN_MAP_SIZE) lgCurMapSize = max(lgCurMapSize, _LG_MIN_MAP_SIZE) @@ -74,10 +74,10 @@ func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) { return nil, err } curMapCap := hashMap.getCapacity() - maxMapCap := int(float64(uint64(1<<lgMaxMapSize)) * loadFactor) + maxMapCap := int(float64(uint64(1<<lgMaxMapSize)) * reversePurgeLongHashMapLoadFactor) offset := int64(0) sampleSize := min(_SAMPLE_SIZE, maxMapCap) - return &LongSketch{ + return &LongsSketch{ lgMaxMapSize: int(lgMaxMapSize), curMapCap: curMapCap, offset: offset, @@ -86,13 +86,13 @@ func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) { }, nil } -// NewLongSketchWithMaxMapSize constructs a new LongSketch with the given maxMapSize and the +// NewLongSketchWithMaxMapSize constructs a new LongsSketch with the given maxMapSize and the // default initialMapSize (8). // maxMapSize determines the physical size of the internal hash map managed by this // sketch and must be a power of 2. The maximum capacity of this internal hash map is // 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch are a // function of maxMapSize. -func NewLongSketchWithMaxMapSize(maxMapSize int) (*LongSketch, error) { +func NewLongSketchWithMaxMapSize(maxMapSize int) (*LongsSketch, error) { log2OfInt, err := common.ExactLog2(maxMapSize) if err != nil { return nil, fmt.Errorf("maxMapSize, %e", err) @@ -100,7 +100,7 @@ func NewLongSketchWithMaxMapSize(maxMapSize int) (*LongSketch, error) { return NewLongSketch(log2OfInt, _LG_MIN_MAP_SIZE) } -func NewLongSketchFromSlice(slc []byte) (*LongSketch, error) { +func NewLongSketchFromSlice(slc []byte) (*LongsSketch, error) { pre0, err := checkPreambleSize(slc) if err != nil { return nil, err @@ -173,7 +173,7 @@ func NewLongSketchFromSlice(slc []byte) (*LongSketch, error) { return fls, nil } -func NewLongSketchFromString(str string) (*LongSketch, error) { +func NewLongSketchFromString(str string) (*LongsSketch, error) { if len(str) < 1 { return nil, fmt.Errorf("String is empty") } @@ -250,7 +250,7 @@ func NewLongSketchFromString(str string) (*LongSketch, error) { return sk, nil } -func (s *LongSketch) getEstimate(item int64) (int64, error) { +func (s *LongsSketch) getEstimate(item int64) (int64, error) { itemCount, err := s.hashMap.get(item) if err != nil { return 0, err @@ -258,12 +258,12 @@ func (s *LongSketch) getEstimate(item int64) (int64, error) { return itemCount + s.offset, nil } -func (s *LongSketch) getLowerBound(item int64) (int64, error) { +func (s *LongsSketch) getLowerBound(item int64) (int64, error) { // LB = itemCount return s.hashMap.get(item) } -func (s *LongSketch) getUpperBound(item int64) (int64, error) { +func (s *LongsSketch) getUpperBound(item int64) (int64, error) { itemCount, err := s.hashMap.get(item) if err != nil { return 0, err @@ -271,44 +271,44 @@ func (s *LongSketch) getUpperBound(item int64) (int64, error) { return itemCount + s.offset, nil } -func (s *LongSketch) getNumActiveItems() int { +func (s *LongsSketch) getNumActiveItems() int { return s.hashMap.numActive } // getMaximumMapCapacity returns the maximum number of counters the sketch is configured to // support. -func (s *LongSketch) getMaximumMapCapacity() int { - return int(float64(uint64(1<<s.lgMaxMapSize)) * loadFactor) +func (s *LongsSketch) getMaximumMapCapacity() int { + return int(float64(uint64(1<<s.lgMaxMapSize)) * reversePurgeLongHashMapLoadFactor) } -func (s *LongSketch) getStorageBytes() int { +func (s *LongsSketch) getStorageBytes() int { if s.isEmpty() { return 8 } return (4 * 8) + (16 * s.getNumActiveItems()) } -func (s *LongSketch) getCurrentMapCapacity() int { +func (s *LongsSketch) getCurrentMapCapacity() int { return s.curMapCap } -func (s *LongSketch) getMaximumError() int64 { +func (s *LongsSketch) getMaximumError() int64 { return s.offset } -func (s *LongSketch) getStreamLength() int64 { +func (s *LongsSketch) getStreamLength() int64 { return s.streamWeight } -func (s *LongSketch) isEmpty() bool { +func (s *LongsSketch) isEmpty() bool { return s.getNumActiveItems() == 0 } -func (s *LongSketch) Update(item int64) error { +func (s *LongsSketch) Update(item int64) error { return s.UpdateMany(item, 1) } -func (s *LongSketch) UpdateMany(item int64, count int64) error { +func (s *LongsSketch) UpdateMany(item int64, count int64) error { if count == 0 { return nil } @@ -338,7 +338,7 @@ func (s *LongSketch) UpdateMany(item int64, count int64) error { return nil } -func (s *LongSketch) merge(other *LongSketch) (*LongSketch, error) { +func (s *LongsSketch) merge(other *LongsSketch) (*LongsSketch, error) { if other == nil || other.isEmpty() { return s, nil } @@ -355,7 +355,7 @@ func (s *LongSketch) merge(other *LongSketch) (*LongSketch, error) { return s, nil } -func (s *LongSketch) serializeToString() (string, error) { +func (s *LongsSketch) serializeToString() (string, error) { var sb strings.Builder //start the string with parameters of the sketch serVer := _SER_VER //0 @@ -373,7 +373,7 @@ func (s *LongSketch) serializeToString() (string, error) { return sb.String(), nil } -func (s *LongSketch) toSlice() ([]byte, error) { +func (s *LongsSketch) toSlice() ([]byte, error) { emtpy := s.isEmpty() activeItems := s.getNumActiveItems() preLongs := 1 @@ -416,7 +416,7 @@ func (s *LongSketch) toSlice() ([]byte, error) { return outArr, nil } -func (s *LongSketch) Reset() { +func (s *LongsSketch) Reset() { hasMap, _ := NewReversePurgeLongHashMap(1 << _LG_MIN_MAP_SIZE) s.curMapCap = hasMap.getCapacity() s.offset = 0 @@ -424,6 +424,20 @@ func (s *LongSketch) Reset() { s.hashMap = hasMap } -func (s *LongSketch) getFrequentItems(errorType ErrorType) ([]*Row, error) { +/* + public Row[] getFrequentItems(final long threshold, final ErrorType errorType) { + return sortItems(threshold > getMaximumError() ? threshold : getMaximumError(), errorType); + } +*/ + +func (s *LongsSketch) getFrequentItemsWithThreshold(threshold int64, errorType ErrorType) ([]*Row, error) { + finalThreshold := s.getMaximumError() + if threshold > finalThreshold { + finalThreshold = threshold + } + return sortItems(s, finalThreshold, errorType) +} + +func (s *LongsSketch) getFrequentItems(errorType ErrorType) ([]*Row, error) { return sortItems(s, s.getMaximumError(), errorType) } diff --git a/frequencies/long_sketch_test.go b/frequencies/longs_sketch_test.go similarity index 81% rename from frequencies/long_sketch_test.go rename to frequencies/longs_sketch_test.go index 5ee6557..f0ec490 100644 --- a/frequencies/long_sketch_test.go +++ b/frequencies/longs_sketch_test.go @@ -19,6 +19,7 @@ package frequencies import ( "encoding/binary" + "github.com/apache/datasketches-go/common" "github.com/stretchr/testify/assert" "testing" ) @@ -228,7 +229,7 @@ func TestFreqLongStringSerDe(t *testing.T) { checkEquality(t, sk1, sk2) } -func checkEquality(t *testing.T, sk1, sk2 *LongSketch) { +func checkEquality(t *testing.T, sk1, sk2 *LongsSketch) { assert.Equal(t, sk1.getNumActiveItems(), sk2.getNumActiveItems()) assert.Equal(t, sk1.getCurrentMapCapacity(), sk2.getCurrentMapCapacity()) assert.Equal(t, sk1.getMaximumError(), sk2.getMaximumError()) @@ -293,7 +294,7 @@ func tryBadMem(t *testing.T, mem []byte, byteOffset, byteValue int) { } func TestFreqLongStringSerDeError(t *testing.T) { - //sk1, err := NewLongSketchWithMaxMapSize(8) + // sk1, err := NewLongSketchWithMaxMapSize(8) // str1 := sk1.serializeToString() // correct = "1,10,2,4,0,0,0,4,"; @@ -306,3 +307,88 @@ func tryBadString(t *testing.T, badString string) { _, err := NewLongSketchFromString(badString) assert.Error(t, err) } + +func TestFreqLongs(t *testing.T) { + numSketches := 1 + n := 2222 + errorTolerance := 1.0 / 100 + + sketches := make([]*LongsSketch, numSketches) + for h := 0; h < numSketches; h++ { + sketches[h], _ = newFrequencySketch(errorTolerance) + } + + prob := .001 + for i := 0; i < n; i++ { + item := randomGeometricDist(prob) + 1 + for h := 0; h < numSketches; h++ { + sketches[h].Update(item) + } + } + + for h := 0; h < numSketches; h++ { + threshold := sketches[h].getMaximumError() + rows, err := sketches[h].getFrequentItems(NO_FALSE_NEGATIVES) + assert.NoError(t, err) + for i := 0; i < len(rows); i++ { + assert.True(t, rows[i].getUpperBound() > threshold) + } + + rows, err = sketches[h].getFrequentItems(NO_FALSE_POSITIVES) + assert.NoError(t, err) + assert.Equal(t, len(rows), 0) + for i := 0; i < len(rows); i++ { + assert.True(t, rows[i].getLowerBound() > threshold) + } + + rows, err = sketches[h].getFrequentItems(NO_FALSE_NEGATIVES) + } +} + +func newFrequencySketch(eps float64) (*LongsSketch, error) { + maxMapSize := common.CeilPowerOf2(int(1.0 / (eps * reversePurgeLongHashMapLoadFactor))) + return NewLongSketchWithMaxMapSize(maxMapSize) +} + +func TestUpdateOneTime(t *testing.T) { + size := 100 + errorTolerance := 1.0 / float64(size) + //delta := .01 + numSketches := 1 + for h := 0; h < numSketches; h++ { + sketch, _ := newFrequencySketch(errorTolerance) + ub, err := sketch.getUpperBound(13) + assert.NoError(t, err) + assert.Equal(t, ub, int64(0)) + lb, err := sketch.getLowerBound(13) + assert.NoError(t, err) + assert.Equal(t, lb, int64(0)) + assert.Equal(t, sketch.getMaximumError(), int64(0)) + est, err := sketch.getEstimate(13) + assert.NoError(t, err) + assert.Equal(t, est, int64(0)) + sketch.Update(13) + // assert.Equal(t, sketch.getEstimate(13), 1) + } +} + +func TestGetInstanceSlice(t *testing.T) { + sl := make([]byte, 4) + _, err := NewLongSketchFromSlice(sl) + assert.Error(t, err) +} + +func TestGetInstanceString(t *testing.T) { + _, err := NewLongSketchFromString("") + assert.Error(t, err) +} + +func TestUpdateNegative(t *testing.T) { + minSize := 1 << _LG_MIN_MAP_SIZE + fls, err := NewLongSketchWithMaxMapSize(minSize) + assert.NoError(t, err) + err = fls.UpdateMany(1, 0) + assert.NoError(t, err) + err = fls.UpdateMany(1, -1) + assert.Error(t, err) +} diff --git a/frequencies/reverse_purge_long_hash_map.go b/frequencies/reverse_purge_long_hash_map.go index 36d26f8..03c2b16 100644 --- a/frequencies/reverse_purge_long_hash_map.go +++ b/frequencies/reverse_purge_long_hash_map.go @@ -47,8 +47,8 @@ type iteratorHashMap struct { } const ( - loadFactor = float64(0.75) - driftLimit = 1024 //used only in stress testing + reversePurgeLongHashMapLoadFactor = float64(0.75) + reversePurgeLongHashMapDriftLimit = 1024 //used only in stress testing ) // NewReversePurgeLongHashMap constructs a new reversePurgeLongHashMap. @@ -61,7 +61,7 @@ func NewReversePurgeLongHashMap(mapSize int) (*reversePurgeLongHashMap, error) { if err != nil { return nil, fmt.Errorf("mapSize: %e", err) } - loadThreshold := int(float64(mapSize) * loadFactor) + loadThreshold := int(float64(mapSize) * reversePurgeLongHashMapLoadFactor) keys := make([]int64, mapSize) values := make([]int64, mapSize) states := make([]int16, mapSize) @@ -105,7 +105,7 @@ func (r *reversePurgeLongHashMap) adjustOrPutValue(key int64, adjustAmount int64 for r.states[probe] != 0 && r.keys[probe] != key { probe = (probe + 1) & int64(arrayMask) drift++ - if drift >= driftLimit { + if drift >= reversePurgeLongHashMapDriftLimit { panic("drift >= driftLimit") } } @@ -135,7 +135,7 @@ func (r *reversePurgeLongHashMap) resize(newSize int) error { r.keys = make([]int64, newSize) r.values = make([]int64, newSize) r.states = make([]int16, newSize) - r.loadThreshold = int(float64(newSize) * loadFactor) + r.loadThreshold = int(float64(newSize) * reversePurgeLongHashMapLoadFactor) r.lgLength = bits.TrailingZeros(uint(newSize)) r.numActive = 0 err := error(nil) @@ -234,7 +234,7 @@ func (r *reversePurgeLongHashMap) hashDelete(deleteProbe int) { probe = (probe + 1) & arrayMask drift++ //only used for theoretical analysis - if drift >= driftLimit { + if drift >= reversePurgeLongHashMapDriftLimit { panic("drift >= driftLimit") } } diff --git a/frequencies/row.go b/frequencies/row.go index f428975..21a2c81 100644 --- a/frequencies/row.go +++ b/frequencies/row.go @@ -46,7 +46,19 @@ func (r *Row) String() string { return fmt.Sprintf(" %20d%20d%20d %d", r.item, r.est, r.ub, r.lb) } -func sortItems(sk *LongSketch, threshold int64, errorType ErrorType) ([]*Row, error) { +func (r *Row) getEstimate() int64 { + return r.est +} + +func (r *Row) getUpperBound() int64 { + return r.ub +} + +func (r *Row) getLowerBound() int64 { + return r.lb +} + +func sortItems(sk *LongsSketch, threshold int64, errorType ErrorType) ([]*Row, error) { rowList := make([]*Row, 0) iter := sk.hashMap.iterator() if errorType == NO_FALSE_NEGATIVES { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
