This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datasketches-go.git
commit 3b17e123f190cdfeb5b11615819be301693a4026 Author: Pierre Lacave <[email protected]> AuthorDate: Wed Dec 20 20:06:37 2023 +0100 Add serialization/deserialization to/from slice --- common/family.go | 24 ++++- frequencies/long_sketch.go | 137 ++++++++++++++++++++++++++--- frequencies/long_sketch_test.go | 86 +++++++++++++++++- frequencies/preable_utils.go | 111 ++++++++++++++++++++++- frequencies/reverse_purge_long_hash_map.go | 47 ++++++++-- frequencies/utils.go | 2 +- hll/hll_utils.go | 2 +- hll/preamble_utils.go | 4 +- thetacommon/quick_select.go | 14 +-- 9 files changed, 389 insertions(+), 38 deletions(-) diff --git a/common/family.go b/common/family.go index 957934f..8710add 100644 --- a/common/family.go +++ b/common/family.go @@ -17,7 +17,23 @@ package common -const ( - FamilyHllId = 7 - FamilyFrequencyId = 10 -) +type family struct { + Id int + MaxPreLongs int +} + +type families struct { + HLL family + Frequency family +} + +var FamilyEnum = &families{ + HLL: family{ + Id: 7, + MaxPreLongs: 1, + }, + Frequency: family{ + Id: 10, + MaxPreLongs: 4, + }, +} diff --git a/frequencies/long_sketch.go b/frequencies/long_sketch.go index 9c3d730..71618b4 100644 --- a/frequencies/long_sketch.go +++ b/frequencies/long_sketch.go @@ -18,6 +18,7 @@ package frequencies import ( + "encoding/binary" "fmt" "github.com/apache/datasketches-go/common" "math/bits" @@ -66,8 +67,8 @@ const ( // map managed by this sketch. func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) { //set initial size of hash map - lgMaxMapSize = max(lgMaxMapSize, lgMinMapSize) - lgCurMapSize = max(lgCurMapSize, lgMinMapSize) + lgMaxMapSize = max(lgMaxMapSize, _LG_MIN_MAP_SIZE) + lgCurMapSize = max(lgCurMapSize, _LG_MIN_MAP_SIZE) hashMap, err := NewReversePurgeLongHashMap(1 << lgCurMapSize) if err != nil { return nil, err @@ -85,18 +86,91 @@ func NewLongSketch(lgMaxMapSize int, lgCurMapSize int) (*LongSketch, error) { }, nil } -// NewLongSketchWithDefault constructs a new LongSketch with the given maxMapSize and the +// NewLongSketchForMaxMapSize constructs a new LongSketch with the given maxMapSize and the // default initialMapSize (8). // maxMapSize determines the physical size of the internal hash map managed by this // sketch and must be a power of 2. The maximum capacity of this internal hash map is // 0.75 times * maxMapSize. Both the ultimate accuracy and size of this sketch are a // function of maxMapSize. -func NewLongSketchWithDefault(maxMapSize int) (*LongSketch, error) { +func NewLongSketchForMaxMapSize(maxMapSize int) (*LongSketch, error) { log2OfInt, err := common.ExactLog2(maxMapSize) if err != nil { return nil, fmt.Errorf("maxMapSize, %e", err) } - return NewLongSketch(log2OfInt, lgMinMapSize) + return NewLongSketch(log2OfInt, _LG_MIN_MAP_SIZE) +} + +func NewLongSketchFromSlice(slc []byte) (*LongSketch, error) { + pre0, err := checkPreambleSize(slc) + if err != nil { + return nil, err + } + maxPreLongs := common.FamilyEnum.Frequency.MaxPreLongs + preLongs := extractPreLongs(pre0) + serVer := extractSerVer(pre0) + familyID := extractFamilyID(pre0) + lgMaxMapSize := extractLgMaxMapSize(pre0) + lgCurMapSize := extractLgCurMapSize(pre0) + empty := (extractFlags(pre0) & _EMPTY_FLAG_MASK) != 0 + + // Checks + preLongsEq1 := preLongs == 1 + preLongsEqMax := preLongs == maxPreLongs + if !preLongsEq1 && !preLongsEqMax { + return nil, fmt.Errorf("Possible Corruption: PreLongs must be 1 or %d: %d", maxPreLongs, preLongs) + } + if serVer != _SER_VER { + return nil, fmt.Errorf("Possible Corruption: Ser Ver must be %d: %d", _SER_VER, serVer) + } + actFamID := common.FamilyEnum.Frequency.Id + if familyID != actFamID { + return nil, fmt.Errorf("Possible Corruption: FamilyID must be %d: %d", actFamID, familyID) + } + if empty && !preLongsEq1 { + return nil, fmt.Errorf("Possible Corruption: Empty Flag set incorrectly: %t", preLongsEq1) + } + if empty { + return NewLongSketch(lgMaxMapSize, _LG_MIN_MAP_SIZE) + } + // get full preamble + preArr := make([]int64, preLongs) + for i := 0; i < preLongs; i++ { + preArr[i] = int64(binary.LittleEndian.Uint64(slc[i<<3:])) + } + fls, err := NewLongSketch(lgMaxMapSize, lgCurMapSize) + if err != nil { + return nil, err + } + fls.streamWeight = 0 //update after + fls.offset = preArr[3] + + preBytes := preLongs << 3 + activeItems := extractActiveItems(preArr[1]) + + // Get countArray + countArray := make([]int64, activeItems) + reqBytes := preBytes + 2*activeItems*8 //count Arr + Items Arr + if len(slc) < reqBytes { + return nil, fmt.Errorf("Possible Corruption: Insufficient bytes in array: %d, %d", len(slc), reqBytes) + } + for i := 0; i < activeItems; i++ { + countArray[i] = int64(binary.LittleEndian.Uint64(slc[preBytes+(i<<3):])) + } + + // Get itemArray + itemsOffset := preBytes + (8 * activeItems) + itemArray := make([]int64, activeItems) + for i := 0; i < activeItems; i++ { + itemArray[i] = int64(binary.LittleEndian.Uint64(slc[itemsOffset+(i<<3):])) + } + + // Update the sketch + for i := 0; i < activeItems; i++ { + fls.Update(itemArray[i], countArray[i]) + } + + fls.streamWeight = preArr[2] //override streamWeight due to updating + return fls, nil } func NewLongSketchFromString(str string) (*LongSketch, error) { @@ -148,10 +222,10 @@ func NewLongSketchFromString(str string) (*LongSketch, error) { lgCur := bits.TrailingZeros64(lgCurOrigin) //checks - if serVe != serVer { + if serVe != _SER_VER { return nil, fmt.Errorf("Possible Corruption: Bad SerVer: %d", serVe) } - if famID != common.FamilyFrequencyId { + if famID != int64(common.FamilyEnum.Frequency.Id) { return nil, fmt.Errorf("Possible Corruption: Bad Family: %d", famID) } empty := flags > 0 @@ -248,12 +322,12 @@ func (s *LongSketch) merge(other *LongSketch) (*LongSketch, error) { func (s *LongSketch) serializeToString() (string, error) { var sb strings.Builder //start the string with parameters of the sketch - serVer := serVer //0 - famID := common.FamilyFrequencyId + serVer := _SER_VER //0 + famID := common.FamilyEnum.Frequency.Id lgMaxMapSz := s.lgMaxMapSize flags := 0 if s.hashMap.numActive == 0 { - flags = emptyFlagMask + flags = _EMPTY_FLAG_MASK } _, err := fmt.Fprintf(&sb, "%d,%d,%d,%d,%d,%d,", serVer, famID, lgMaxMapSz, flags, s.streamWeight, s.offset) if err != nil { @@ -262,3 +336,46 @@ func (s *LongSketch) serializeToString() (string, error) { sb.WriteString(s.hashMap.serializeToString()) //numActive, curMaplen, key[i], value[i], ... return sb.String(), nil } + +func (s *LongSketch) toSlice() ([]byte, error) { + emtpy := s.isEmpty() + activeItems := s.getNumActiveItems() + preLongs := 1 + outBytes := 8 + if !emtpy { + preLongs = common.FamilyEnum.Frequency.MaxPreLongs //4 + outBytes = (preLongs + (2 * activeItems)) << 3 //2 because both keys and values are longs + } + outArr := make([]byte, outBytes) + + //build first preLong empty or not + pre0 := int64(0) + pre0 = insertPreLongs(int64(preLongs), pre0) //Byte 0 + pre0 = insertSerVer(_SER_VER, pre0) //Byte 1 + pre0 = insertFamilyID(int64(common.FamilyEnum.Frequency.Id), pre0) //Byte 2 + pre0 = insertLgMaxMapSize(int64(s.lgMaxMapSize), pre0) //Byte 3 + pre0 = insertLgCurMapSize(int64(s.hashMap.lgLength), pre0) //Byte 4 + if emtpy { + pre0 = insertFlags(_EMPTY_FLAG_MASK, pre0) //Byte 5 + binary.LittleEndian.PutUint64(outArr, uint64(pre0)) + return outArr, nil + } + + pre0 = insertFlags(0, pre0) //Byte 5 + preArr := make([]int64, preLongs) + preArr[0] = pre0 + preArr[1] = insertActiveItems(int64(activeItems), pre0) + preArr[2] = s.streamWeight + preArr[3] = s.offset + for i := 0; i < preLongs; i++ { + binary.LittleEndian.PutUint64(outArr[i<<3:], uint64(preArr[i])) + } + //now the active items + activeValues := s.hashMap.getActiveValues() + activeKeys := s.hashMap.getActiveKeys() + for i := 0; i < activeItems; i++ { + binary.LittleEndian.PutUint64(outArr[(preLongs+i)<<3:], uint64(activeValues[i])) + binary.LittleEndian.PutUint64(outArr[(preLongs+activeItems+i)<<3:], uint64(activeKeys[i])) + } + return outArr, nil +} diff --git a/frequencies/long_sketch_test.go b/frequencies/long_sketch_test.go index 9d02fc3..feee404 100644 --- a/frequencies/long_sketch_test.go +++ b/frequencies/long_sketch_test.go @@ -22,10 +22,10 @@ import ( "testing" ) -func TestFrequentItsemsStringSerialTest(t *testing.T) { - sketch, err := NewLongSketchWithDefault(8) +func TestFrequentItemsStringSerial(t *testing.T) { + sketch, err := NewLongSketchForMaxMapSize(8) assert.NoError(t, err) - sketch2, err := NewLongSketchWithDefault(128) + sketch2, err := NewLongSketchForMaxMapSize(128) assert.NoError(t, err) sketch.Update(10, 100) sketch.Update(10, 100) @@ -87,3 +87,83 @@ func TestFrequentItsemsStringSerialTest(t *testing.T) { assert.Equal(t, mergedSketch.getCurrentMapCapacity(), newMs.getCurrentMapCapacity()) assert.Equal(t, mergedSketch.getStreamLength(), newMs.getStreamLength()) } + +func TestFrequentItemsByteSerial(t *testing.T) { + sketch, err := NewLongSketchForMaxMapSize(16) + assert.NoError(t, err) + byteArray0, err := sketch.toSlice() + newSk0, err := NewLongSketchFromSlice(byteArray0) + assert.NoError(t, err) + str0, err := sketch.serializeToString() + assert.NoError(t, err) + newStr0, err := newSk0.serializeToString() + assert.NoError(t, err) + assert.Equal(t, str0, newStr0) + + sketch2, err := NewLongSketchForMaxMapSize(128) + assert.NoError(t, err) + sketch.Update(10, 100) + sketch.Update(10, 100) + sketch.Update(15, 3443) + sketch.Update(1000001, 1010230) + sketch.Update(1000002, 1010230) + + byteArray1, err := sketch.toSlice() + assert.NoError(t, err) + newSk1, err := NewLongSketchFromSlice(byteArray1) + assert.NoError(t, err) + str1, err := sketch.serializeToString() + newStr1, err := newSk1.serializeToString() + assert.NoError(t, err) + assert.Equal(t, str1, newStr1) + assert.Equal(t, sketch.getMaximumMapCapacity(), newSk1.getMaximumMapCapacity()) + assert.Equal(t, sketch.getCurrentMapCapacity(), newSk1.getCurrentMapCapacity()) + + sketch2.Update(190, 12902390) + sketch2.Update(191, 12902390) + sketch2.Update(192, 12902390) + sketch2.Update(193, 12902390) + sketch2.Update(194, 12902390) + sketch2.Update(195, 12902390) + sketch2.Update(196, 12902390) + sketch2.Update(197, 12902390) + sketch2.Update(198, 12902390) + sketch2.Update(199, 12902390) + sketch2.Update(200, 12902390) + sketch2.Update(201, 12902390) + sketch2.Update(202, 12902390) + sketch2.Update(203, 12902390) + sketch2.Update(204, 12902390) + sketch2.Update(205, 12902390) + sketch2.Update(206, 12902390) + sketch2.Update(207, 12902390) + sketch2.Update(208, 12902390) + + byteArray2, err := sketch2.toSlice() + assert.NoError(t, err) + newSk2, err := NewLongSketchFromSlice(byteArray2) + assert.NoError(t, err) + str2, err := sketch2.serializeToString() + assert.NoError(t, err) + newStr2, err := newSk2.serializeToString() + assert.NoError(t, err) + assert.Equal(t, str2, newStr2) + assert.Equal(t, sketch2.getMaximumMapCapacity(), newSk2.getMaximumMapCapacity()) + assert.Equal(t, sketch2.getCurrentMapCapacity(), newSk2.getCurrentMapCapacity()) + assert.Equal(t, sketch2.getStreamLength(), newSk2.getStreamLength()) + + mergedSketch, err := sketch.merge(sketch2) + assert.NoError(t, err) + byteArray3, err := mergedSketch.toSlice() + assert.NoError(t, err) + newSk3, err := NewLongSketchFromSlice(byteArray3) + assert.NoError(t, err) + str3, err := mergedSketch.serializeToString() + assert.NoError(t, err) + newStr3, err := newSk3.serializeToString() + assert.NoError(t, err) + assert.Equal(t, str3, newStr3) + assert.Equal(t, mergedSketch.getMaximumMapCapacity(), newSk3.getMaximumMapCapacity()) + assert.Equal(t, mergedSketch.getCurrentMapCapacity(), newSk3.getCurrentMapCapacity()) + assert.Equal(t, mergedSketch.getStreamLength(), newSk3.getStreamLength()) +} diff --git a/frequencies/preable_utils.go b/frequencies/preable_utils.go index f04aefc..b507687 100644 --- a/frequencies/preable_utils.go +++ b/frequencies/preable_utils.go @@ -1,11 +1,116 @@ package frequencies +import ( + "encoding/binary" + "fmt" +) + const ( + // ###### DO NOT MESS WITH THIS FROM HERE ... + // Preamble byte Addresses + _SER_VER_BYTE = 1 + _FAMILY_BYTE = 2 + _LG_MAX_MAP_SIZE_BYTE = 3 + _LG_CUR_MAP_SIZE_BYTE = 4 + _FLAGS_BYTE = 5 - // emptyFlagMask flag bit masks + // _EMPTY_FLAG_MASK flag bit masks // due to a mistake different bits were used in C++ and Java to indicate empty sketch // therefore both are set and checked for compatibility with historical binary format - emptyFlagMask = 5 + _EMPTY_FLAG_MASK = 5 - serVer = 1 + _SER_VER = 1 ) + +func checkPreambleSize(preamble []byte) (int64, error) { + if len(preamble) < 8 { + return 0, fmt.Errorf("preamble is too small") + } + pre0 := int64(binary.LittleEndian.Uint64(preamble)) + preLongs := int(pre0 & 0x3F) + required := max(preLongs<<3, 8) + if len(preamble) < required { + return 0, fmt.Errorf("preamble is too small") + } + return pre0, nil +} + +func insertPreLongs(preLongs, pre0 int64) int64 { + mask := int64(0x3F) + return (preLongs & mask) | (^mask & pre0) +} + +func insertSerVer(serVer, pre0 int64) int64 { + shift := _SER_VER_BYTE << 3 + mask := int64(0xFF) + return ((serVer & mask) << shift) | (^(mask << shift) & pre0) +} + +func insertFamilyID(familyID, pre0 int64) int64 { + shift := _FAMILY_BYTE << 3 + mask := int64(0xFF) + return ((familyID & mask) << shift) | (^(mask << shift) & pre0) +} + +func insertLgMaxMapSize(lgMaxMapSize, pre0 int64) int64 { + shift := _LG_MAX_MAP_SIZE_BYTE << 3 + mask := int64(0xFF) + return ((lgMaxMapSize & mask) << shift) | (^(mask << shift) & pre0) +} + +func insertLgCurMapSize(lgCurMapSize, pre0 int64) int64 { + shift := _LG_CUR_MAP_SIZE_BYTE << 3 + mask := int64(0xFF) + return ((lgCurMapSize & mask) << shift) | (^(mask << shift) & pre0) +} + +func insertFlags(flags, pre0 int64) int64 { + shift := _FLAGS_BYTE << 3 + mask := int64(0xFF) + return ((flags & mask) << shift) | (^(mask << shift) & pre0) +} + +func insertActiveItems(activeItems, pre1 int64) int64 { + mask := int64(0xFFFFFFFF) + return (activeItems & mask) | (^mask & pre1) +} + +func extractPreLongs(pre0 int64) int { + mask := int64(0x3F) + return int(pre0 & mask) +} + +func extractSerVer(pre0 int64) int { + shift := _SER_VER_BYTE << 3 + mask := int64(0xFF) + return int((pre0 >> shift) & mask) +} + +func extractFamilyID(pre0 int64) int { + shift := _FAMILY_BYTE << 3 + mask := int64(0xFF) + return int((pre0 >> shift) & mask) +} + +func extractLgMaxMapSize(pre0 int64) int { + shift := _LG_MAX_MAP_SIZE_BYTE << 3 + mask := int64(0xFF) + return int((pre0 >> shift) & mask) +} + +func extractLgCurMapSize(pre0 int64) int { + shift := _LG_CUR_MAP_SIZE_BYTE << 3 + mask := int64(0xFF) + return int((pre0 >> shift) & mask) +} + +func extractFlags(pre0 int64) int { + shift := _FLAGS_BYTE << 3 + mask := int64(0xFF) + return int((pre0 >> shift) & mask) +} + +func extractActiveItems(pre1 int64) int { + mask := int64(0xFFFFFFFF) + return int(pre1 & mask) +} diff --git a/frequencies/reverse_purge_long_hash_map.go b/frequencies/reverse_purge_long_hash_map.go index 66c18c5..e1cf1fc 100644 --- a/frequencies/reverse_purge_long_hash_map.go +++ b/frequencies/reverse_purge_long_hash_map.go @@ -183,6 +183,7 @@ func (r *reversePurgeLongHashMap) keepOnlyPositiveCounts() { } //Work towards the front; delete any non-positive entries. for probe := firstProbe; probe > 0; { + probe-- // When we find the next non-empty cell, we know we are at the high end of a cluster, // which is tracked by firstProbe. if r.states[probe] > 0 && r.values[probe] <= 0 { @@ -191,7 +192,8 @@ func (r *reversePurgeLongHashMap) keepOnlyPositiveCounts() { } } //now work on the first cluster that was skipped. - for probe := len(r.keys); probe > firstProbe; { + for probe := len(r.keys); probe-1 > firstProbe; { + probe-- if r.states[probe] > 0 && r.values[probe] <= 0 { r.hashDelete(probe) r.numActive-- @@ -290,13 +292,44 @@ func deserializeFromStringArray(tokens []string) (*reversePurgeLongHashMap, erro return hashMap, nil } -func (s *reversePurgeLongHashMap) iterator() *iteratorHashMap { - return &iteratorHashMap{ - keys_: s.keys, - values_: s.values, - states_: s.states, - numActive_: s.numActive, +func (r *reversePurgeLongHashMap) getActiveValues() []int64 { + if r.numActive == 0 { + return nil + } + returnValues := make([]int64, r.numActive) + j := 0 + for i := 0; i < len(r.values); i++ { + if r.states[i] > 0 { //isActive + returnValues[j] = r.values[i] + j++ + } + } + if j != r.numActive { + panic("j != r.numActive") + } + return returnValues +} + +func (r *reversePurgeLongHashMap) getActiveKeys() []int64 { + if r.numActive == 0 { + return nil } + returnValues := make([]int64, r.numActive) + j := 0 + for i := 0; i < len(r.keys); i++ { + if r.states[i] > 0 { //isActive + returnValues[j] = r.keys[i] + j++ + } + } + if j != r.numActive { + panic("j != r.numActive") + } + return returnValues +} + +func (s *reversePurgeLongHashMap) iterator() *iteratorHashMap { + return newIterator(s.keys, s.values, s.states, s.numActive) } func newIterator(keys []int64, values []int64, states []int16, numActive int) *iteratorHashMap { diff --git a/frequencies/utils.go b/frequencies/utils.go index 9774709..2d42118 100644 --- a/frequencies/utils.go +++ b/frequencies/utils.go @@ -18,7 +18,7 @@ package frequencies const ( - lgMinMapSize = 3 + _LG_MIN_MAP_SIZE = 3 // This constant is large enough so that computing the median of SAMPLE_SIZE // randomly selected entries from a list of numbers and outputting // the empirical median will give a constant-factor approximation to the diff --git a/hll/hll_utils.go b/hll/hll_utils.go index 6cf934a..419c23d 100644 --- a/hll/hll_utils.go +++ b/hll/hll_utils.go @@ -128,7 +128,7 @@ func checkPreamble(preamble []byte) (curMode, error) { famId := extractFamilyID(preamble) curMode := extractCurMode(preamble) - if famId != common.FamilyHllId { + if famId != common.FamilyEnum.HLL.Id { return 0, fmt.Errorf("possible Corruption: Invalid Family: %d", famId) } if serVer != 1 { diff --git a/hll/preamble_utils.go b/hll/preamble_utils.go index e903a40..cfac779 100644 --- a/hll/preamble_utils.go +++ b/hll/preamble_utils.go @@ -174,11 +174,11 @@ func computeLgArr(byteArr []byte, couponCount int, lgConfigK int) (int, error) { ceilPwr2 <<= 1 } if curMode == curModeSet { - v, err := common.ExactLog2(uint64(ceilPwr2)) + v, err := common.ExactLog2(ceilPwr2) return max(lgInitSetSize, v), err } //only used for HLL4 - v, err := common.ExactLog2(uint64(ceilPwr2)) + v, err := common.ExactLog2(ceilPwr2) return max(lgAuxArrInts[lgConfigK], v), err } diff --git a/thetacommon/quick_select.go b/thetacommon/quick_select.go index 47da86a..14295a1 100644 --- a/thetacommon/quick_select.go +++ b/thetacommon/quick_select.go @@ -33,23 +33,24 @@ func QuickSelect(arr []int64, lo int, hi int, pivot int) int64 { } func partition(arr []int64, lo int, hi int) int { - i := lo // left scan index - j := hi + 1 // right scan index - v := arr[lo] //partitioning item value + i := lo + j := hi + 1 + v := arr[lo] for { - // Scan right, scan left, check for scan complete, and exchange - for arr[i] < v { + for arr[i+1] < v { i++ if i == hi { break } } - for v < arr[j] { + i++ + for v < arr[j-1] { j-- if j == lo { break } } + j-- if i >= j { break } @@ -57,7 +58,6 @@ func partition(arr []int64, lo int, hi int) int { arr[i] = arr[j] arr[j] = x } - // put v=arr[j] into position with a[lo .. j-1] <= a[j] <= a[j+1 .. hi] x := arr[lo] arr[lo] = arr[j] arr[j] = x --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
