This is an automated email from the ASF dual-hosted git repository. placave pushed a commit to branch hll-serde-go in repository https://gitbox.apache.org/repos/asf/datasketches-characterization.git
commit 3b9de543bfd55c0c36f2919c6b70ca2d313916c7 Author: Pierre Lacave <[email protected]> AuthorDate: Wed May 8 09:17:27 2024 +0200 [Go] Add HLL SerDe profile --- go/distinct_count_serde_profile.go | 209 +++++++++++++++++++++++++++++++++++++ go/distinct_count_utils.go | 4 + go/hll_sketch_accuracy_runner.go | 19 ---- go/main.go | 17 +++ go/main_test.go | 4 + 5 files changed, 234 insertions(+), 19 deletions(-) diff --git a/go/distinct_count_serde_profile.go b/go/distinct_count_serde_profile.go new file mode 100644 index 0000000..9dabf8e --- /dev/null +++ b/go/distinct_count_serde_profile.go @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package main + +import ( + "fmt" + "github.com/apache/datasketches-go/hll" + "math" + "runtime/debug" + "strings" + "time" +) + +type DistinctCountSerDeProfile struct { + config distinctCountJobConfigType + sketch hll.HllSketch + startTime int64 +} + +type distinctCountSerdeStats struct { + serializeTime_nS int64 + deserializeTime_nS int64 + estimationTime_nS int64 + size_bytes uint64 +} + +func NewDistinctCountSerDeProfile(config distinctCountJobConfigType, tgtType hll.TgtHllType) *DistinctCountSerDeProfile { + sketch, _ := hll.NewHllSketch(config.lgK, tgtType) + return &DistinctCountSerDeProfile{ + config: config, + sketch: sketch, + startTime: time.Now().UnixMilli(), + } +} + +func (d *DistinctCountSerDeProfile) run() { + var ( + sb = &strings.Builder{} + vIn = int64(0) + rawStats = &distinctCountSerdeStats{} + meanStats = &distinctCountSerdeStats{} + maxU = 1 << d.config.lgMaxU + minU = 1 << d.config.lgMinU + lastU = 0 + ) + debug.SetGCPercent(-1) + debug.SetMemoryLimit(math.MaxInt64) + + d.setHeader(sb) + fmt.Println(sb.String()) + sb.Reset() + + for lastU < maxU { + nextU := minU + if lastU != 0 { + nextU = int(pwr2SeriesNext(d.config.uppo, uint64(lastU))) + } + lastU = nextU + + sumStats := &distinctCountSerdeStats{} + trials := d.getNumTrials(nextU) + for t := 0; t < trials; t++ { + vIn = d.runTrial(rawStats, vIn, nextU) + sumStats.add(rawStats) + } + meanStats.makeMeanOf(sumStats, trials) + d.process(meanStats, trials, nextU, sb) + fmt.Println(sb.String()) + sb.Reset() + } + +} + +func (d *DistinctCountSerDeProfile) setHeader(sb *strings.Builder) string { + sb.WriteString("TrueU") + sb.WriteString("\t") + sb.WriteString("Trials") + sb.WriteString("\t") + sb.WriteString("Ser_nS") + sb.WriteString("\t") + sb.WriteString("DeSer_nS") + sb.WriteString("\t") + sb.WriteString("Est_nS") + sb.WriteString("\t") + sb.WriteString("Size_B") + return sb.String() +} + +func (d *DistinctCountSerDeProfile) runTrial(stats *distinctCountSerdeStats, key int64, lgDeltaU int) int64 { + var ( + startEstimationTime_Ns int64 + stopEstimationTime_Ns int64 + startSerTime_Ns int64 + stopSerTime_Ns int64 + startDeserTime_Ns int64 + stopDeserTime_Ns int64 + sketchBytes []byte + est1 float64 + est2 float64 + ) + d.sketch.Reset() + + for u := lgDeltaU; u > 0; u-- { + key++ + d.sketch.UpdateInt64(key) + } + + startEstimationTime_Ns = time.Now().UnixNano() + est1, err := d.sketch.GetEstimate() + if err != nil { + panic(err) + } + stopEstimationTime_Ns = time.Now().UnixNano() + + if d.config.compact { + startSerTime_Ns = time.Now().UnixNano() + sketchBytes, err = d.sketch.ToCompactSlice() + stopSerTime_Ns = time.Now().UnixNano() + } else { + startSerTime_Ns = time.Now().UnixNano() + sketchBytes, err = d.sketch.ToUpdatableSlice() + stopSerTime_Ns = time.Now().UnixNano() + } + + startDeserTime_Ns = time.Now().UnixNano() + sketchRebuild, err := hll.NewHllSketchFromSlice(sketchBytes, true) + stopDeserTime_Ns = time.Now().UnixNano() + if err != nil { + panic(err) + } + + est2, err = sketchRebuild.GetEstimate() + if err != nil { + panic(err) + } + + if est1 != est2 { + panic("Estimation mismatch") + } + + stats.serializeTime_nS = stopSerTime_Ns - startSerTime_Ns + stats.deserializeTime_nS = stopDeserTime_Ns - startDeserTime_Ns + stats.estimationTime_nS = stopEstimationTime_Ns - startEstimationTime_Ns + stats.size_bytes = uint64(len(sketchBytes)) + return key +} + +func (d *DistinctCountSerDeProfile) process(stats *distinctCountSerdeStats, trials int, uPerTrial int, sb *strings.Builder) string { + sb.WriteString(fmt.Sprintf("%d", uPerTrial)) + sb.WriteString("\t") + sb.WriteString(fmt.Sprintf("%d", trials)) + sb.WriteString("\t") + sb.WriteString(fmt.Sprintf("%d", stats.serializeTime_nS)) + sb.WriteString("\t") + sb.WriteString(fmt.Sprintf("%d", stats.deserializeTime_nS)) + sb.WriteString("\t") + sb.WriteString(fmt.Sprintf("%d", stats.estimationTime_nS)) + sb.WriteString("\t") + sb.WriteString(fmt.Sprintf("%d", stats.size_bytes)) + return sb.String() +} + +// getNumTrials computes the number of trials for a given current number of uniques for a +// trial set. This is used in speed trials and decreases the number of trials +// as the number of uniques increase. +func (d *DistinctCountSerDeProfile) getNumTrials(curU int) int { + minBpU := 1 << d.config.lgMinBpU + maxBpU := 1 << d.config.lgMaxBpU + maxT := 1 << d.config.lgMaxT + minT := 1 << d.config.lgMinT + if d.config.lgMinT == d.config.lgMaxT || curU <= minBpU { + return maxT + } + if curU >= maxBpU { + return minT + } + lgCurU := math.Log2(float64(curU)) + slope := float64(d.config.lgMaxT-d.config.lgMinT) / float64(d.config.lgMinBpU-d.config.lgMaxBpU) + lgTrials := slope*(lgCurU-float64(d.config.lgMinBpU)) + float64(d.config.lgMaxT) + return int(math.Pow(2.0, lgTrials)) +} + +func (s *distinctCountSerdeStats) add(o *distinctCountSerdeStats) { + s.serializeTime_nS += o.serializeTime_nS + s.deserializeTime_nS += o.deserializeTime_nS + s.estimationTime_nS += o.estimationTime_nS + s.size_bytes += o.size_bytes +} + +func (s *distinctCountSerdeStats) makeMeanOf(o *distinctCountSerdeStats, count int) { + s.serializeTime_nS = int64(float64(o.serializeTime_nS) / float64(count)) + s.deserializeTime_nS = int64(float64(o.deserializeTime_nS) / float64(count)) + s.estimationTime_nS = int64(float64(o.estimationTime_nS) / float64(count)) + s.size_bytes = o.size_bytes / uint64(count) +} diff --git a/go/distinct_count_utils.go b/go/distinct_count_utils.go index 24e0cf7..b9cf994 100644 --- a/go/distinct_count_utils.go +++ b/go/distinct_count_utils.go @@ -37,6 +37,10 @@ type distinctCountJobConfigType struct { minLgK int // X-axis LgK Profile maxLgK int // X-axis LgK Profile + // Trail Speed + lgMinBpU int // start the downward slope of trials at this LgU + lgMaxBpU int // stop the downward slope of trials at this LgU + lgDeltaU int lgQK int // size of quantiles sketch diff --git a/go/hll_sketch_accuracy_runner.go b/go/hll_sketch_accuracy_runner.go deleted file mode 100644 index d732b5d..0000000 --- a/go/hll_sketch_accuracy_runner.go +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package main - diff --git a/go/main.go b/go/main.go index 24045bf..c60399b 100644 --- a/go/main.go +++ b/go/main.go @@ -62,6 +62,23 @@ var ( }, hll.TgtHllTypeHll8, ), + "distinct_count_serde_profile": NewDistinctCountSerDeProfile( + distinctCountJobConfigType{ + lgMinU: 0, + lgMaxU: 20, + uppo: 2, + + lgMaxT: 16, + lgMinT: 7, + + lgMinBpU: 4, + lgMaxBpU: 20, + + lgK: 12, + compact: false, + }, + hll.TgtHllTypeHll8, + ), } ) diff --git a/go/main_test.go b/go/main_test.go index b07b55d..7d8034f 100644 --- a/go/main_test.go +++ b/go/main_test.go @@ -32,3 +32,7 @@ func TestHllSketchMergeAccuracyRunner(t *testing.T) { func TestHllSketchMergeSpeedRunner(t *testing.T) { jobs["distinct_count_merge_speed_profile"].run() } + +func TestHllSketchSerdeRunner(t *testing.T) { + jobs["distinct_count_serde_profile"].run() +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
