This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datasketches-rust.git
The following commit(s) were added to refs/heads/main by this push:
new d6cc4bb feat: impl CpcWrapper (#101)
d6cc4bb is described below
commit d6cc4bb8d9c46371d36db882d8bbe2945f96a61f
Author: tison <[email protected]>
AuthorDate: Fri Feb 20 19:12:57 2026 +0800
feat: impl CpcWrapper (#101)
Signed-off-by: tison <[email protected]>
---
CHANGELOG.md | 1 +
datasketches/src/cpc/estimator.rs | 56 ++++++++---
datasketches/src/cpc/mod.rs | 3 +
datasketches/src/cpc/serialization.rs | 48 +++++++++
datasketches/src/cpc/sketch.rs | 76 ++++++---------
datasketches/src/cpc/wrapper.rs | 173 +++++++++++++++++++++++++++++++++
datasketches/tests/cpc_wrapper_test.rs | 92 ++++++++++++++++++
7 files changed, 387 insertions(+), 62 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index af1928f..972f86c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -13,6 +13,7 @@ All significant changes to this project will be documented in
this file.
* `CountMinSketch` with unsigned values now supports `halve` and `decay`
operations.
* `CpcSketch` and `CpcUnion` are now available for cardinality estimation.
+* `CpcWrapper` is now available for reading estimation from a serialized
CpcSketch without full deserialization.
* `FrequentItemsSketch` now supports serde for any value implement
`FrequentItemValue` (builtin supports for `i64`, `u64`, and `String`).
* Expose `codec::SketchBytes`, `codec::SketchSlice`, and `FrequentItemValue`
as public API.
diff --git a/datasketches/src/cpc/estimator.rs
b/datasketches/src/cpc/estimator.rs
index 81a2252..ef00b0b 100644
--- a/datasketches/src/cpc/estimator.rs
+++ b/datasketches/src/cpc/estimator.rs
@@ -88,7 +88,43 @@ static HIP_HIGH_SIDE_DATA: [u16; 33] = [
5880, 5914, 5953, // 14 1000297
];
-pub(super) fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev)
-> f64 {
+pub(super) fn estimate(merge_flag: bool, hip_est_accum: f64, lg_k: u8,
num_coupons: u32) -> f64 {
+ if !merge_flag {
+ hip_est_accum
+ } else {
+ icon_estimate(lg_k, num_coupons)
+ }
+}
+
+pub(super) fn lower_bound(
+ merge_flag: bool,
+ hip_est_accum: f64,
+ lg_k: u8,
+ num_coupons: u32,
+ kappa: NumStdDev,
+) -> f64 {
+ if !merge_flag {
+ hip_confidence_lb(lg_k, num_coupons, hip_est_accum, kappa)
+ } else {
+ icon_confidence_lb(lg_k, num_coupons, kappa)
+ }
+}
+
+pub(super) fn upper_bound(
+ merge_flag: bool,
+ hip_est_accum: f64,
+ lg_k: u8,
+ num_coupons: u32,
+ kappa: NumStdDev,
+) -> f64 {
+ if !merge_flag {
+ hip_confidence_ub(lg_k, num_coupons, hip_est_accum, kappa)
+ } else {
+ icon_confidence_ub(lg_k, num_coupons, kappa)
+ }
+}
+
+fn icon_confidence_lb(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 {
if num_coupons == 0 {
return 0.0;
}
@@ -112,7 +148,7 @@ pub(super) fn icon_confidence_lb(lg_k: u8, num_coupons:
u32, kappa: NumStdDev) -
}
}
-pub(super) fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev)
-> f64 {
+fn icon_confidence_ub(lg_k: u8, num_coupons: u32, kappa: NumStdDev) -> f64 {
if num_coupons == 0 {
return 0.0;
}
@@ -132,12 +168,7 @@ pub(super) fn icon_confidence_ub(lg_k: u8, num_coupons:
u32, kappa: NumStdDev) -
result.ceil() // slight widening of interval to be conservative
}
-pub(super) fn hip_confidence_lb(
- lg_k: u8,
- num_coupons: u32,
- hip_est_accum: f64,
- kappa: NumStdDev,
-) -> f64 {
+fn hip_confidence_lb(lg_k: u8, num_coupons: u32, hip_est_accum: f64, kappa:
NumStdDev) -> f64 {
if num_coupons == 0 {
return 0.0;
}
@@ -160,12 +191,7 @@ pub(super) fn hip_confidence_lb(
}
}
-pub(super) fn hip_confidence_ub(
- lg_k: u8,
- num_coupons: u32,
- hip_est_accum: f64,
- kappa: NumStdDev,
-) -> f64 {
+fn hip_confidence_ub(lg_k: u8, num_coupons: u32, hip_est_accum: f64, kappa:
NumStdDev) -> f64 {
if num_coupons == 0 {
return 0.0;
}
@@ -362,7 +388,7 @@ fn icon_exponential_approximation(k: f64, c: f64) -> f64 {
0.7940236163830469 * k * 2f64.powf(c / k)
}
-pub(super) fn icon_estimate(lg_k: u8, num_coupons: u32) -> f64 {
+fn icon_estimate(lg_k: u8, num_coupons: u32) -> f64 {
let lg_k = lg_k as usize;
assert!(
(ICON_MIN_LOG_K..=ICON_MAX_LOG_K).contains(&lg_k),
diff --git a/datasketches/src/cpc/mod.rs b/datasketches/src/cpc/mod.rs
index bbc38f8..0313c98 100644
--- a/datasketches/src/cpc/mod.rs
+++ b/datasketches/src/cpc/mod.rs
@@ -40,11 +40,14 @@ mod compression_data;
mod estimator;
mod kxp_byte_lookup;
mod pair_table;
+mod serialization;
mod sketch;
mod union;
+mod wrapper;
pub use self::sketch::CpcSketch;
pub use self::union::CpcUnion;
+pub use self::wrapper::CpcWrapper;
/// Default log2 of K.
const DEFAULT_LG_K: u8 = 11;
diff --git a/datasketches/src/cpc/serialization.rs
b/datasketches/src/cpc/serialization.rs
new file mode 100644
index 0000000..3267ec3
--- /dev/null
+++ b/datasketches/src/cpc/serialization.rs
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(super) const SERIAL_VERSION: u8 = 1;
+pub(super) const FLAG_COMPRESSED: u8 = 1;
+pub(super) const FLAG_HAS_HIP: u8 = 2;
+pub(super) const FLAG_HAS_TABLE: u8 = 3;
+pub(super) const FLAG_HAS_WINDOW: u8 = 4;
+
+pub(super) fn make_preamble_ints(
+ num_coupons: u32,
+ has_hip: bool,
+ has_table: bool,
+ has_window: bool,
+) -> u8 {
+ let mut preamble_ints = 2;
+ if num_coupons > 0 {
+ preamble_ints += 1; // number of coupons
+ if has_hip {
+ preamble_ints += 4; // HIP
+ }
+ if has_table {
+ preamble_ints += 1; // table data length
+ // number of values (if there is no window it is the same as
number of coupons)
+ if has_window {
+ preamble_ints += 1;
+ }
+ }
+ if has_window {
+ preamble_ints += 1; // window length
+ }
+ }
+ preamble_ints
+}
diff --git a/datasketches/src/cpc/sketch.rs b/datasketches/src/cpc/sketch.rs
index 581fce5..15c818f 100644
--- a/datasketches/src/cpc/sketch.rs
+++ b/datasketches/src/cpc/sketch.rs
@@ -33,13 +33,17 @@ use crate::cpc::compression::CompressedState;
use crate::cpc::count_bits_set_in_matrix;
use crate::cpc::determine_correct_offset;
use crate::cpc::determine_flavor;
-use crate::cpc::estimator::hip_confidence_lb;
-use crate::cpc::estimator::hip_confidence_ub;
-use crate::cpc::estimator::icon_confidence_lb;
-use crate::cpc::estimator::icon_confidence_ub;
-use crate::cpc::estimator::icon_estimate;
+use crate::cpc::estimator::estimate;
+use crate::cpc::estimator::lower_bound;
+use crate::cpc::estimator::upper_bound;
use crate::cpc::kxp_byte_lookup::KXP_BYTE_TABLE;
use crate::cpc::pair_table::PairTable;
+use crate::cpc::serialization::FLAG_COMPRESSED;
+use crate::cpc::serialization::FLAG_HAS_HIP;
+use crate::cpc::serialization::FLAG_HAS_TABLE;
+use crate::cpc::serialization::FLAG_HAS_WINDOW;
+use crate::cpc::serialization::SERIAL_VERSION;
+use crate::cpc::serialization::make_preamble_ints;
use crate::error::Error;
use crate::error::ErrorKind;
use crate::hash::DEFAULT_UPDATE_SEED;
@@ -130,29 +134,34 @@ impl CpcSketch {
/// Returns the best estimate of the cardinality of the sketch.
pub fn estimate(&self) -> f64 {
- if !self.merge_flag {
- self.hip_est_accum
- } else {
- icon_estimate(self.lg_k, self.num_coupons)
- }
+ estimate(
+ self.merge_flag,
+ self.hip_est_accum,
+ self.lg_k,
+ self.num_coupons,
+ )
}
/// Returns the best estimate of the lower bound of the confidence
interval given `kappa`.
pub fn lower_bound(&self, kappa: NumStdDev) -> f64 {
- if !self.merge_flag {
- hip_confidence_lb(self.lg_k, self.num_coupons, self.hip_est_accum,
kappa)
- } else {
- icon_confidence_lb(self.lg_k, self.num_coupons, kappa)
- }
+ lower_bound(
+ self.merge_flag,
+ self.hip_est_accum,
+ self.lg_k,
+ self.num_coupons,
+ kappa,
+ )
}
/// Returns the best estimate of the upper bound of the confidence
interval given `kappa`.
pub fn upper_bound(&self, kappa: NumStdDev) -> f64 {
- if !self.merge_flag {
- hip_confidence_ub(self.lg_k, self.num_coupons, self.hip_est_accum,
kappa)
- } else {
- icon_confidence_ub(self.lg_k, self.num_coupons, kappa)
- }
+ upper_bound(
+ self.merge_flag,
+ self.hip_est_accum,
+ self.lg_k,
+ self.num_coupons,
+ kappa,
+ )
}
/// Returns true if the sketch is empty.
@@ -437,12 +446,6 @@ impl CpcSketch {
}
}
-const SERIAL_VERSION: u8 = 1;
-const FLAG_COMPRESSED: u8 = 1;
-const FLAG_HAS_HIP: u8 = 2;
-const FLAG_HAS_TABLE: u8 = 3;
-const FLAG_HAS_WINDOW: u8 = 4;
-
impl CpcSketch {
/// Serializes this CpcSketch to bytes.
pub fn serialize(&self) -> Vec<u8> {
@@ -637,27 +640,6 @@ impl CpcSketch {
}
}
-fn make_preamble_ints(num_coupons: u32, has_hip: bool, has_table: bool,
has_window: bool) -> u8 {
- let mut preamble_ints = 2;
- if num_coupons > 0 {
- preamble_ints += 1; // number of coupons
- if has_hip {
- preamble_ints += 4; // HIP
- }
- if has_table {
- preamble_ints += 1; // table data length
- // number of values (if there is no window it is the same as
number of coupons)
- if has_window {
- preamble_ints += 1;
- }
- }
- if has_window {
- preamble_ints += 1; // window length
- }
- }
- preamble_ints
-}
-
impl CpcSketch {
/// Returns the estimated maximum compressed serialized size of a sketch.
///
diff --git a/datasketches/src/cpc/wrapper.rs b/datasketches/src/cpc/wrapper.rs
new file mode 100644
index 0000000..2b1000e
--- /dev/null
+++ b/datasketches/src/cpc/wrapper.rs
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::codec::SketchSlice;
+use crate::codec::family::Family;
+use crate::codec::utility::ensure_preamble_longs_in;
+use crate::codec::utility::ensure_serial_version_is;
+use crate::common::NumStdDev;
+use crate::cpc::MAX_LG_K;
+use crate::cpc::MIN_LG_K;
+use crate::cpc::estimator::estimate;
+use crate::cpc::estimator::lower_bound;
+use crate::cpc::estimator::upper_bound;
+use crate::cpc::serialization::FLAG_COMPRESSED;
+use crate::cpc::serialization::FLAG_HAS_HIP;
+use crate::cpc::serialization::FLAG_HAS_TABLE;
+use crate::cpc::serialization::FLAG_HAS_WINDOW;
+use crate::cpc::serialization::SERIAL_VERSION;
+use crate::cpc::serialization::make_preamble_ints;
+use crate::error::Error;
+use crate::error::ErrorKind;
+
+/// A read-only view of a serialized image of a CpcSketch.
+#[derive(Debug, Clone)]
+pub struct CpcWrapper {
+ lg_k: u8,
+ merge_flag: bool,
+ num_coupons: u32,
+ hip_est_accum: f64,
+}
+
+impl CpcWrapper {
+ /// Creates a new `CpcWrapper` from the given byte slice without copying
bytes.
+ pub fn new(bytes: &[u8]) -> Result<Self, Error> {
+ fn make_error(tag: &'static str) -> impl FnOnce(std::io::Error) ->
Error {
+ move |_| Error::insufficient_data(tag)
+ }
+
+ let mut cursor = SketchSlice::new(bytes);
+ let preamble_ints =
cursor.read_u8().map_err(make_error("preamble_ints"))?;
+ let serial_version =
cursor.read_u8().map_err(make_error("serial_version"))?;
+ let family_id = cursor.read_u8().map_err(make_error("family_id"))?;
+ Family::CPC.validate_id(family_id)?;
+ ensure_serial_version_is(SERIAL_VERSION, serial_version)?;
+
+ let lg_k = cursor.read_u8().map_err(make_error("lg_k"))?;
+ let first_interesting_column = cursor
+ .read_u8()
+ .map_err(make_error("first_interesting_column"))?;
+ if !(MIN_LG_K..=MAX_LG_K).contains(&lg_k) {
+ return Err(Error::invalid_argument(format!(
+ "lg_k out of range; got {}",
+ lg_k
+ )));
+ }
+ if first_interesting_column > 63 {
+ return Err(Error::invalid_argument(format!(
+ "first_interesting_column out of range; got {}",
+ first_interesting_column
+ )));
+ }
+
+ let flags = cursor.read_u8().map_err(make_error("flags"))?;
+ let is_compressed = flags & (1 << FLAG_COMPRESSED) != 0;
+ if !is_compressed {
+ return Err(Error::new(
+ ErrorKind::InvalidData,
+ "only compressed sketches are supported",
+ ));
+ }
+ let has_hip = flags & (1 << FLAG_HAS_HIP) != 0;
+ let has_table = flags & (1 << FLAG_HAS_TABLE) != 0;
+ let has_window = flags & (1 << FLAG_HAS_WINDOW) != 0;
+
+ cursor.read_u16_le().map_err(make_error("seed_hash"))?;
+
+ let mut num_coupons = 0;
+ let mut hip_est_accum = 0.0;
+
+ if has_table || has_window {
+ num_coupons =
cursor.read_u32_le().map_err(make_error("num_coupons"))?;
+ if has_table && has_window {
+ cursor
+ .read_u32_le()
+ .map_err(make_error("table_num_entries"))?;
+ if has_hip {
+ cursor.read_f64_le().map_err(make_error("kxp"))?;
+ hip_est_accum =
cursor.read_f64_le().map_err(make_error("hip_est_accum"))?;
+ }
+ }
+ if has_table {
+ cursor
+ .read_u32_le()
+ .map_err(make_error("table_data_words"))?;
+ }
+ if has_window {
+ cursor
+ .read_u32_le()
+ .map_err(make_error("window_data_words"))?;
+ }
+ if has_hip && !(has_table && has_window) {
+ cursor.read_f64_le().map_err(make_error("kxp"))?;
+ hip_est_accum =
cursor.read_f64_le().map_err(make_error("hip_est_accum"))?;
+ }
+ }
+
+ let expected_preamble_ints =
+ make_preamble_ints(num_coupons, has_hip, has_table, has_window);
+ ensure_preamble_longs_in(&[expected_preamble_ints], preamble_ints)?;
+ Ok(CpcWrapper {
+ lg_k,
+ merge_flag: !has_hip,
+ num_coupons,
+ hip_est_accum,
+ })
+ }
+
+ /// Return the parameter lg_k.
+ pub fn lg_k(&self) -> u8 {
+ self.lg_k
+ }
+
+ /// Returns the best estimate of the cardinality of the sketch.
+ pub fn estimate(&self) -> f64 {
+ estimate(
+ self.merge_flag,
+ self.hip_est_accum,
+ self.lg_k,
+ self.num_coupons,
+ )
+ }
+
+ /// Returns the best estimate of the lower bound of the confidence
interval given `kappa`.
+ pub fn lower_bound(&self, kappa: NumStdDev) -> f64 {
+ lower_bound(
+ self.merge_flag,
+ self.hip_est_accum,
+ self.lg_k,
+ self.num_coupons,
+ kappa,
+ )
+ }
+
+ /// Returns the best estimate of the upper bound of the confidence
interval given `kappa`.
+ pub fn upper_bound(&self, kappa: NumStdDev) -> f64 {
+ upper_bound(
+ self.merge_flag,
+ self.hip_est_accum,
+ self.lg_k,
+ self.num_coupons,
+ kappa,
+ )
+ }
+
+ /// Returns true if the sketch is empty.
+ pub fn is_empty(&self) -> bool {
+ self.num_coupons == 0
+ }
+}
diff --git a/datasketches/tests/cpc_wrapper_test.rs
b/datasketches/tests/cpc_wrapper_test.rs
new file mode 100644
index 0000000..6355ed0
--- /dev/null
+++ b/datasketches/tests/cpc_wrapper_test.rs
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datasketches::common::NumStdDev;
+use datasketches::cpc::CpcSketch;
+use datasketches::cpc::CpcUnion;
+use datasketches::cpc::CpcWrapper;
+use googletest::assert_that;
+use googletest::prelude::contains_substring;
+use googletest::prelude::eq;
+
+#[test]
+fn test_cpc_wrapper() {
+ let lg_k = 10;
+ let mut sk1 = CpcSketch::new(lg_k);
+ let mut sk2 = CpcSketch::new(lg_k);
+ let mut sk_dst = CpcSketch::new(lg_k);
+
+ let n = 100000;
+ for i in 0..n {
+ sk1.update(i);
+ sk2.update(i + n);
+ sk_dst.update(i);
+ sk_dst.update(i + n);
+ }
+
+ let dst_est = sk_dst.estimate();
+ let dst_lb = sk_dst.lower_bound(NumStdDev::Two);
+ let dst_ub = sk_dst.upper_bound(NumStdDev::Two);
+
+ let concat_bytes = sk_dst.serialize();
+ let concat_wrapper = CpcWrapper::new(&concat_bytes).unwrap();
+ assert_that!(concat_wrapper.lg_k(), eq(lg_k));
+ assert_that!(concat_wrapper.estimate(), eq(dst_est));
+ assert_that!(concat_wrapper.lower_bound(NumStdDev::Two), eq(dst_lb));
+ assert_that!(concat_wrapper.upper_bound(NumStdDev::Two), eq(dst_ub));
+
+ let mut union = CpcUnion::new(lg_k);
+ union.update(&sk1);
+ union.update(&sk2);
+ let merged = union.to_sketch();
+ let merged_est = merged.estimate();
+ let merged_lb = merged.lower_bound(NumStdDev::Two);
+ let merged_ub = merged.upper_bound(NumStdDev::Two);
+
+ let merged_bytes = merged.serialize();
+ let merged_wrapper = CpcWrapper::new(&merged_bytes).unwrap();
+ assert_that!(merged_wrapper.lg_k(), eq(lg_k));
+ assert_that!(merged_wrapper.estimate(), eq(merged_est));
+ assert_that!(merged_wrapper.lower_bound(NumStdDev::Two), eq(merged_lb));
+ assert_that!(merged_wrapper.upper_bound(NumStdDev::Two), eq(merged_ub));
+}
+
+#[test]
+fn test_is_empty() {
+ let empty_sketch = CpcSketch::new(10);
+ let empty_bytes = empty_sketch.serialize();
+ let empty_wrapper = CpcWrapper::new(&empty_bytes).unwrap();
+ assert_that!(empty_wrapper.is_empty(), eq(true));
+
+ let mut non_empty_sketch = CpcSketch::new(10);
+ non_empty_sketch.update(1u64);
+ let non_empty_bytes = non_empty_sketch.serialize();
+ let non_empty_wrapper = CpcWrapper::new(&non_empty_bytes).unwrap();
+ assert_that!(non_empty_wrapper.is_empty(), eq(false));
+}
+
+#[test]
+fn test_is_compressed() {
+ let sketch = CpcSketch::new(10);
+ let mut bytes = sketch.serialize();
+ bytes[5] &= (-3i8) as u8; // clear compressed flag
+ let err = CpcWrapper::new(&bytes).unwrap_err();
+ assert_that!(
+ err.message(),
+ contains_substring("only compressed sketches are supported")
+ );
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]