This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datasketches-go.git
commit b5bc5b7a15b4faea49abc6b7edb268b4a8102167 Author: Pierre Lacave <[email protected]> AuthorDate: Wed Dec 20 22:11:01 2023 +0100 add estimates for frequency long --- frequencies/error_types.go | 8 +++ frequencies/long_sketch.go | 43 ++++++++++--- frequencies/long_sketch_test.go | 72 ++++++++++++++++++++++ frequencies/reverse_purge_long_hash_map.go | 20 ++++++ frequencies/row.go | 97 ++++++++++++++++++++++++++++++ 5 files changed, 232 insertions(+), 8 deletions(-) diff --git a/frequencies/error_types.go b/frequencies/error_types.go new file mode 100644 index 0000000..c4e38e6 --- /dev/null +++ b/frequencies/error_types.go @@ -0,0 +1,8 @@ +package frequencies + +type ErrorType = int + +const ( + NO_FALSE_POSITIVES = ErrorType(1) + NO_FALSE_NEGATIVES = ErrorType(2) +) diff --git a/frequencies/long_sketch.go b/frequencies/long_sketch.go index 3d10120..0fcc9db 100644 --- a/frequencies/long_sketch.go +++ b/frequencies/long_sketch.go @@ -250,6 +250,27 @@ func NewLongSketchFromString(str string) (*LongSketch, error) { return sk, nil } +func (s *LongSketch) getEstimate(item int64) (int64, error) { + itemCount, err := s.hashMap.get(item) + if err != nil { + return 0, err + } + return itemCount + s.offset, nil +} + +func (s *LongSketch) getLowerBound(item int64) (int64, error) { + // LB = itemCount + return s.hashMap.get(item) +} + +func (s *LongSketch) getUpperBound(item int64) (int64, error) { + itemCount, err := s.hashMap.get(item) + if err != nil { + return 0, err + } + return itemCount + s.offset, nil +} + func (s *LongSketch) getNumActiveItems() int { return s.hashMap.numActive } @@ -260,10 +281,21 @@ func (s *LongSketch) getMaximumMapCapacity() int { return int(float64(uint64(1<<s.lgMaxMapSize)) * loadFactor) } +func (s *LongSketch) getStorageBytes() int { + if s.isEmpty() { + return 8 + } + return (4 * 8) + (16 * s.getNumActiveItems()) +} + func (s *LongSketch) getCurrentMapCapacity() int { return s.curMapCap } +func (s *LongSketch) getMaximumError() int64 { + return s.offset +} + func (s *LongSketch) getStreamLength() int64 { return s.streamWeight } @@ -388,11 +420,6 @@ func (s *LongSketch) Reset() { s.hashMap = hasMap } -/* - public void reset() { - hashMap = new ReversePurgeLongHashMap(1 << LG_MIN_MAP_SIZE); - curMapCap = hashMap.getCapacity(); - offset = 0; - streamWeight = 0; - } -*/ +func (s *LongSketch) getFrequentItems(errorType ErrorType) ([]*Row, error) { + return sortItems(s, s.getMaximumError(), errorType) +} diff --git a/frequencies/long_sketch_test.go b/frequencies/long_sketch_test.go index eddfc10..9f0f613 100644 --- a/frequencies/long_sketch_test.go +++ b/frequencies/long_sketch_test.go @@ -190,3 +190,75 @@ func TestFrequentItemsByteResetAndEmptySerial(t *testing.T) { assert.Equal(t, sketch.getMaximumMapCapacity(), newSk0.getMaximumMapCapacity()) assert.Equal(t, sketch.getCurrentMapCapacity(), newSk0.getCurrentMapCapacity()) } + +func TestFreqLongMeSerDe(t *testing.T) { + minSize := 1 << _LG_MIN_MAP_SIZE + sk1, err := NewLongSketchWithMaxMapSize(minSize) + assert.NoError(t, err) + sk1.Update(10, 100) + sk1.Update(10, 100) + sk1.Update(15, 3443) + sk1.Update(1000001, 1010230) + sk1.Update(1000002, 1010230) + + byteArray0, err := sk1.toSlice() + assert.NoError(t, err) + sk2, err := NewLongSketchFromSlice(byteArray0) + assert.NoError(t, err) + + checkEquality(t, sk1, sk2) +} + +/* + @Test + public void checkFreqLongsMemSerDe() { + int minSize = 1 << LG_MIN_MAP_SIZE; + LongsSketch sk1 = new LongsSketch(minSize); + sk1.update(10, 100); + sk1.update(10, 100); + sk1.update(15, 3443); println(sk1.toString()); + sk1.update(1000001, 1010230); println(sk1.toString()); + sk1.update(1000002, 1010230); println(sk1.toString()); + + byte[] bytearray0 = sk1.toByteArray(); + Memory mem0 = Memory.wrap(bytearray0); + LongsSketch sk2 = LongsSketch.getInstance(mem0); + + checkEquality(sk1, sk2); + } +*/ + +func checkEquality(t *testing.T, sk1, sk2 *LongSketch) { + assert.Equal(t, sk1.getNumActiveItems(), sk2.getNumActiveItems()) + assert.Equal(t, sk1.getCurrentMapCapacity(), sk2.getCurrentMapCapacity()) + assert.Equal(t, sk1.getMaximumError(), sk2.getMaximumError()) + assert.Equal(t, sk1.getMaximumMapCapacity(), sk2.getMaximumMapCapacity()) + assert.Equal(t, sk1.getStorageBytes(), sk2.getStorageBytes()) + assert.Equal(t, sk1.getStreamLength(), sk2.getStreamLength()) + assert.Equal(t, sk1.isEmpty(), sk2.isEmpty()) + + NFN := NO_FALSE_NEGATIVES + NFP := NO_FALSE_POSITIVES + + rowArr1, err := sk1.getFrequentItems(NFN) + assert.NoError(t, err) + rowArr2, err := sk2.getFrequentItems(NFN) + assert.NoError(t, err) + assert.Equal(t, len(rowArr1), len(rowArr2)) + for i := 0; i < len(rowArr1); i++ { + s1 := rowArr1[i].String() + s2 := rowArr2[i].String() + assert.Equal(t, s1, s2) + } + + rowArr1, err = sk1.getFrequentItems(NFP) + assert.NoError(t, err) + rowArr2, err = sk2.getFrequentItems(NFP) + assert.NoError(t, err) + assert.Equal(t, len(rowArr1), len(rowArr2)) + for i := 0; i < len(rowArr1); i++ { + s1 := rowArr1[i].String() + s2 := rowArr2[i].String() + assert.Equal(t, s1, s2) + } +} diff --git a/frequencies/reverse_purge_long_hash_map.go b/frequencies/reverse_purge_long_hash_map.go index e1cf1fc..36d26f8 100644 --- a/frequencies/reverse_purge_long_hash_map.go +++ b/frequencies/reverse_purge_long_hash_map.go @@ -74,6 +74,17 @@ func NewReversePurgeLongHashMap(mapSize int) (*reversePurgeLongHashMap, error) { }, nil } +func (r *reversePurgeLongHashMap) get(key int64) (int64, error) { + probe := r.hashProbe(key) + if r.states[probe] > 0 { + if r.keys[probe] == key { + return r.values[probe], nil + } + return 0, fmt.Errorf("key not found") + } + return 0, nil +} + // getCapacity returns the current capacity of the hash map (i.e., max number of keys that can be stored). func (r *reversePurgeLongHashMap) getCapacity() int { return r.loadThreshold @@ -332,6 +343,15 @@ func (s *reversePurgeLongHashMap) iterator() *iteratorHashMap { return newIterator(s.keys, s.values, s.states, s.numActive) } +func (s *reversePurgeLongHashMap) hashProbe(key int64) int { + arrayMask := len(s.keys) - 1 + probe := int(hash(key)) & arrayMask + for s.states[probe] > 0 && s.keys[probe] != key { + probe = (probe + 1) & arrayMask + } + return probe +} + func newIterator(keys []int64, values []int64, states []int16, numActive int) *iteratorHashMap { stride := int(uint64(float64(len(keys))*common.InverseGolden) | 1) return &iteratorHashMap{ diff --git a/frequencies/row.go b/frequencies/row.go new file mode 100644 index 0000000..48d981f --- /dev/null +++ b/frequencies/row.go @@ -0,0 +1,97 @@ +package frequencies + +import ( + "fmt" + "sort" +) + +/* + public static class Row implements Comparable<Row> { + final long item; + final long est; + final long ub; + final long lb; + private static final String fmt = (" %20d%20d%20d %d"); + private static final String hfmt = (" %20s%20s%20s %s"); + + Row(final long item, final long estimate, final long ub, final long lb) { + this.item = item; + est = estimate; + this.ub = ub; + this.lb = lb; + } +*/ + +const ( + hfmt string = " %20s%20s%20s %s" +) + +type Row struct { + item int64 + est int64 + ub int64 + lb int64 +} + +func NewRow(item int64, estimate int64, ub int64, lb int64) Row { + return Row{ + item: item, + est: estimate, + ub: ub, + lb: lb, + } +} + +func (r *Row) String() string { + return fmt.Sprintf(" %20d%20d%20d %d", r.item, r.est, r.ub, r.lb) +} + +func sortItems(sk *LongSketch, threshold int64, errorType ErrorType) ([]*Row, error) { + rowList := make([]*Row, 0) + iter := sk.hashMap.iterator() + if errorType == NO_FALSE_NEGATIVES { + for iter.next() { + est, err := sk.getEstimate(iter.getKey()) + if err != nil { + return nil, err + } + ub, err := sk.getUpperBound(iter.getKey()) + if err != nil { + return nil, err + } + lb, err := sk.getLowerBound(iter.getKey()) + if err != nil { + return nil, err + } + if ub >= threshold { + row := NewRow(iter.getKey(), est, ub, lb) + rowList = append(rowList, &row) + } + } + } else { //NO_FALSE_POSITIVES + for iter.next() { + est, err := sk.getEstimate(iter.getKey()) + if err != nil { + return nil, err + } + ub, err := sk.getUpperBound(iter.getKey()) + if err != nil { + return nil, err + } + lb, err := sk.getLowerBound(iter.getKey()) + if err != nil { + return nil, err + } + if lb >= threshold { + row := NewRow(iter.getKey(), est, ub, lb) + rowList = append(rowList, &row) + } + } + } + + sort.Slice(rowList, func(i, j int) bool { + return rowList[i].est < rowList[j].est + }) + + return rowList, nil +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
