This is an automated email from the ASF dual-hosted git repository. alsay pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datasketches-bigquery.git
commit a717c829717ef85161d544bb588c9cf7aa696dc5 Author: AlexanderSaydakov <[email protected]> AuthorDate: Mon Jun 10 18:01:12 2024 -0700 theta sketch functions --- Makefile | 44 +++++++ README.md | 27 ++++ base64.hpp | 113 ++++++++++++++++ theta_sketch.cpp | 248 +++++++++++++++++++++++++++++++++++ theta_sketch_a_not_b.sql | 27 ++++ theta_sketch_agg_string.sql | 122 +++++++++++++++++ theta_sketch_agg_union.sql | 99 ++++++++++++++ theta_sketch_get_estimate.sql | 31 +++++ theta_sketch_scalar_intersection.sql | 29 ++++ theta_sketch_scalar_union.sql | 30 +++++ theta_sketch_to_string.sql | 31 +++++ 11 files changed, 801 insertions(+) diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a5083a4 --- /dev/null +++ b/Makefile @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +EMCC=emcc +EMCFLAGS=-Idatasketches-cpp/common/include \ + -Idatasketches-cpp/theta/include \ + --no-entry \ + -sWASM_BIGINT=1 \ + -sEXPORTED_FUNCTIONS=[_malloc,_free] \ + -sENVIRONMENT=shell \ + -sTOTAL_MEMORY=1024MB \ + -O3 \ + --bind + +all: theta_sketch.mjs theta_sketch.js theta_sketch.wasm + +%.mjs: %.cpp + $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@ + +# this rule creates a non-es6 loadable library +%.js: %.cpp + $(EMCC) $< $(EMCFLAGS) -sSINGLE_FILE=1 -o $@ + +%.wasm: %.cpp + $(EMCC) $< $(EMCFLAGS) -sSTANDALONE_WASM=1 -o $@ + +clean: + $(RM) *.mjs *.js *.wasm + +.PHONY: clean diff --git a/README.md b/README.md new file mode 100644 index 0000000..06875b1 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + +# Apache DataSketches functions for Google Cloud BigQuery + +[User-Defined Aggregate Functions (UDAFs)](https://cloud.google.com/bigquery/docs/user-defined-aggregates) and +[non-aggregate (scalar) functions (UDFs)](https://cloud.google.com/bigquery/docs/user-defined-functions) for BigQuery SQL engine. + +Please visit the main [Apache DataSketches website](https://datasketches.apache.org) for more information about DataSketches library. + +If you are interested in making contributions to this project please see our [Community](https://datasketches.apache.org/docs/Community/) page for how to contact us. diff --git a/base64.hpp b/base64.hpp new file mode 100644 index 0000000..829761d --- /dev/null +++ b/base64.hpp @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef BASE64_HPP_ +#define BASE64_HPP_ + +static const char bin_to_b64[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + +static const char b64_to_bin[128] = { + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 62, 0, 0, 0, 63, + 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, + 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 0, 0, 0, 0, 0, + 0, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, + 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 0, 0, 0, 0, 0 +}; + +// with full padding +// produces exactly b64_enc_len(srclen) chars +// no \0 termination +static inline void b64_encode(const char* src, unsigned srclen, char* dst) { + unsigned buf = 0; + int pos = 2; + + while (srclen--) { + buf |= (unsigned char)*(src++) << ((pos--) << 3); + + if (pos < 0) { + *dst++ = bin_to_b64[(buf >> 18) & 0x3f]; + *dst++ = bin_to_b64[(buf >> 12) & 0x3f]; + *dst++ = bin_to_b64[(buf >> 6) & 0x3f]; + *dst++ = bin_to_b64[buf & 0x3f]; + pos = 2; + buf = 0; + } + } + if (pos != 2) { + *dst++ = bin_to_b64[(buf >> 18) & 0x3f]; + *dst++ = bin_to_b64[(buf >> 12) & 0x3f]; + *dst++ = (pos == 0) ? bin_to_b64[(buf >> 6) & 0x3f] : '='; + *dst++ = '='; + } +} + +// supports no padding or partial padding (one =) +// ignores invalid chars (fills zeros instead) +// produces exactly b64_dec_len(src, srclen) bytes +static inline void b64_decode(const char* src, unsigned srclen, char* dst) { + unsigned buf = 0; + char c; + int bits = 0; + int pos = 0; + int pad = 0; + + while (srclen--) { + c = *src++; + if (c == ' ' || c == '\t' || c == '\n' || c == '\r') continue; + bits = 0; + if (c != '=') { + if (c > 0 && c < 127) bits = b64_to_bin[(int)c]; + } else { + pad++; + } + buf = (buf << 6) | bits; + pos++; + if (pos == 4) { + *dst++ = (buf >> 16) & 0xff; + if (pad < 2) *dst++ = (buf >> 8) & 0xff; + if (pad == 0) *dst++ = buf & 0xff; + buf = 0; + pos = 0; + } + } + // no padding or partial padding. pos must be 2 or 3 + if (pos == 2) { + *dst++ = (buf >> 4) & 0xff; + } else if (pos == 3) { + *dst++ = (buf >> 10) & 0xff; + if (pad == 0) *dst++ = (buf >> 2) & 0xff; + } +} + +// with padding +static inline unsigned b64_enc_len(unsigned srclen) { + return ((srclen + 2) / 3) * 4; +} + +static inline unsigned b64_dec_len(const char* src, unsigned srclen) { + unsigned pad = 0; + if (srclen > 0 && src[srclen - 1] == '=') pad++; + if (srclen > 1 && src[srclen - 2] == '=') pad++; + return ((srclen * 3) >> 2) - pad; +} + +#endif diff --git a/theta_sketch.cpp b/theta_sketch.cpp new file mode 100644 index 0000000..ea338da --- /dev/null +++ b/theta_sketch.cpp @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <emscripten/bind.h> + +#include <theta_sketch.hpp> +#include <theta_union.hpp> +#include <theta_intersection.hpp> +#include <theta_a_not_b.hpp> + +#include "base64.hpp" + +using datasketches::update_theta_sketch; +using datasketches::compact_theta_sketch; +using datasketches::wrapped_compact_theta_sketch; +using datasketches::theta_union; +using datasketches::theta_intersection; +using datasketches::theta_a_not_b; + +const emscripten::val Uint8Array = emscripten::val::global("Uint8Array"); + +EMSCRIPTEN_BINDINGS(theta_sketch) { + emscripten::register_vector<uint8_t>("VectorBytes"); + + emscripten::function("getExceptionMessage", emscripten::optional_override([](intptr_t ptr) { + return std::string(reinterpret_cast<std::exception*>(ptr)->what()); + })); + using vector_bytes = compact_theta_sketch::vector_bytes; + + emscripten::class_<update_theta_sketch>("update_theta_sketch") + .constructor(emscripten::optional_override([](uint8_t lg_k, uint64_t seed) { + return new update_theta_sketch(update_theta_sketch::builder().set_lg_k(lg_k).set_seed(seed).build()); + })) + .function("updateString", emscripten::select_overload<void(const std::string&)>(&update_theta_sketch::update)) + .function("serialize", emscripten::optional_override([](update_theta_sketch& self) { + return self.compact().serialize_compressed(); + })) + .function("serializeB64", emscripten::optional_override([](const update_theta_sketch& self) { + auto bytes = self.compact().serialize(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + .function("serializeB64Compressed", emscripten::optional_override([](const update_theta_sketch& self) { + auto bytes = self.compact().serialize_compressed(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + .function("serializeAsUint8Array", emscripten::optional_override([](const update_theta_sketch& self) { + auto bytes = self.compact().serialize(); + return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), bytes.data())); + })) + .function("serializeAsUint8ArrayCompressed", emscripten::optional_override([](const update_theta_sketch& self) { + auto bytes = self.compact().serialize_compressed(); + return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), bytes.data())); + })) + ; + + emscripten::class_<compact_theta_sketch>("compact_theta_sketch") + .constructor(emscripten::optional_override([](intptr_t bytes, size_t size, uint64_t seed) { + return new compact_theta_sketch(compact_theta_sketch::deserialize(reinterpret_cast<void*>(bytes), size, seed)); + })) + .constructor(emscripten::optional_override([](compact_theta_sketch::vector_bytes& bytes, uint64_t seed) { + return new compact_theta_sketch(compact_theta_sketch::deserialize(bytes.data(), bytes.size(), seed)); + })) + .class_function("deserializeFromB64", emscripten::optional_override([](const std::string& b64, uint64_t seed) { + std::vector<char> bytes(b64_dec_len(b64.data(), b64.size())); + b64_decode(b64.data(), b64.size(), bytes.data()); + return new compact_theta_sketch(compact_theta_sketch::deserialize(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .class_function("deserializeFromBinary", emscripten::optional_override([](const std::string& bytes, uint64_t seed) { + return new compact_theta_sketch(compact_theta_sketch::deserialize(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .class_function("deserializeFromVectorBytes", emscripten::optional_override([](const vector_bytes& bytes, uint64_t seed) { + return new compact_theta_sketch(compact_theta_sketch::deserialize(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .function("getEstimate", emscripten::optional_override([](const compact_theta_sketch& self) { + return self.get_estimate(); + })) + .function("toString", emscripten::optional_override([](const compact_theta_sketch& self) { + return std::string(self.to_string()); + })) + .function("serialize", emscripten::optional_override([](const compact_theta_sketch& self) { + return self.serialize_compressed(); + })) + ; + + emscripten::class_<wrapped_compact_theta_sketch>("wrapped_compact_theta_sketch") + .constructor(emscripten::optional_override([](intptr_t bytes, size_t size) { + return new wrapped_compact_theta_sketch(wrapped_compact_theta_sketch::wrap(reinterpret_cast<void*>(bytes), size)); + })) + .function("getEstimate", emscripten::optional_override([](wrapped_compact_theta_sketch& self) { + return self.get_estimate(); + })) + .function("toString", emscripten::optional_override([](wrapped_compact_theta_sketch& self) { + return std::string(self.to_string()); + })) + ; + + emscripten::class_<theta_union>("theta_union") + .constructor(emscripten::optional_override([]() { + return new theta_union(theta_union::builder().build()); + })) + .constructor(emscripten::optional_override([](uint8_t lg_k) { + return new theta_union(theta_union::builder().set_lg_k(lg_k).build()); + })) + .constructor(emscripten::optional_override([](uint8_t lg_k, uint64_t seed) { + return new theta_union(theta_union::builder().set_lg_k(lg_k).set_seed(seed).build()); + })) + .function("updateWithUpdateSketch", emscripten::optional_override([](theta_union& self, const update_theta_sketch& sketch) { + self.update(sketch); + }), emscripten::allow_raw_pointers()) + .function("updateWithCompactSketch", emscripten::optional_override([](theta_union& self, const compact_theta_sketch& sketch) { + self.update(sketch); + }), emscripten::allow_raw_pointers()) + .function("updateWithWrappedSketch", emscripten::optional_override([](theta_union& self, const wrapped_compact_theta_sketch& sketch) { + self.update(sketch); + }), emscripten::allow_raw_pointers()) + .function("updateWithBytes", emscripten::optional_override([](theta_union& self, const std::string& bytes, uint64_t seed) { + self.update(wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .function("updateWithVectorBytes", emscripten::optional_override([](theta_union& self, const vector_bytes& bytes, uint64_t seed) { + self.update(wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .function("updateWithB64", emscripten::optional_override([](theta_union& self, const std::string& b64, uint64_t seed) { + std::vector<char> bytes(b64_dec_len(b64.data(), b64.size())); + b64_decode(b64.data(), b64.size(), bytes.data()); + self.update(wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .function("getResultSerialized", emscripten::optional_override([](theta_union& self) { + return self.get_result().serialize_compressed(); + })) + .function("getResultAsUint8Array", emscripten::optional_override([](theta_union& self) { + auto bytes = self.get_result().serialize(); + return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), bytes.data())); + })) + .function("getResultAsUint8ArrayCompressed", emscripten::optional_override([](theta_union& self) { + auto bytes = self.get_result().serialize_compressed(); + return Uint8Array.new_(emscripten::typed_memory_view(bytes.size(), bytes.data())); + })) + .function("getResultB64", emscripten::optional_override([](theta_union& self) { + auto bytes = self.get_result().serialize(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + .function("getResultB64Compressed", emscripten::optional_override([](theta_union& self) { + auto bytes = self.get_result().serialize_compressed(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + ; + + emscripten::class_<theta_intersection>("theta_intersection") + .constructor(emscripten::optional_override([]() { + return new theta_intersection(); + })) + .constructor(emscripten::optional_override([](uint64_t seed) { + return new theta_intersection(seed); + })) + .function("updateWithCompactSketch", emscripten::optional_override([](theta_intersection& self, const compact_theta_sketch& sketch) { + self.update(sketch); + })) + .function("updateWithWrappedSketch", emscripten::optional_override([](theta_intersection& self, const wrapped_compact_theta_sketch& sketch) { + self.update(sketch); + })) + .function("updateWithB64", emscripten::optional_override([](theta_intersection& self, const std::string& b64, uint64_t seed) { + std::vector<char> bytes(b64_dec_len(b64.data(), b64.size())); + b64_decode(b64.data(), b64.size(), bytes.data()); + self.update(wrapped_compact_theta_sketch::wrap(bytes.data(), bytes.size(), seed)); + }), emscripten::allow_raw_pointers()) + .function("getResultB64", emscripten::optional_override([](theta_intersection& self) { + auto bytes = self.get_result().serialize(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + .function("getResultB64Compressed", emscripten::optional_override([](theta_intersection& self) { + auto bytes = self.get_result().serialize_compressed(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + ; + + emscripten::class_<theta_a_not_b>("theta_a_not_b") + .constructor(emscripten::optional_override([]() { + return new theta_a_not_b(); + })) + .constructor(emscripten::optional_override([](uint64_t seed) { + return new theta_a_not_b(seed); + })) + .function("computeWithCompactSketch", emscripten::optional_override([](theta_a_not_b& self, + const compact_theta_sketch& sketch1, const compact_theta_sketch& sketch2) { + return self.compute(sketch1, sketch2); + })) + .function("computeWithWrappedSketch", emscripten::optional_override([](theta_a_not_b& self, + const wrapped_compact_theta_sketch& sketch1, const wrapped_compact_theta_sketch& sketch2) { + return self.compute(sketch1, sketch2); + })) + .function("computeWithB64ReturnB64", emscripten::optional_override([](theta_a_not_b& self, + const std::string& b64_1, const std::string& b64_2, uint64_t seed) { + std::vector<char> bytes1(b64_dec_len(b64_1.data(), b64_1.size())); + b64_decode(b64_1.data(), b64_1.size(), bytes1.data()); + std::vector<char> bytes2(b64_dec_len(b64_2.data(), b64_2.size())); + b64_decode(b64_2.data(), b64_2.size(), bytes2.data()); + auto bytes = self.compute( + wrapped_compact_theta_sketch::wrap(bytes1.data(), bytes1.size(), seed), + wrapped_compact_theta_sketch::wrap(bytes2.data(), bytes2.size(), seed) + ).serialize(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + .function("computeWithB64ReturnB64Compressed", emscripten::optional_override([](theta_a_not_b& self, + const std::string& b64_1, const std::string& b64_2, uint64_t seed) { + std::vector<char> bytes1(b64_dec_len(b64_1.data(), b64_1.size())); + b64_decode(b64_1.data(), b64_1.size(), bytes1.data()); + std::vector<char> bytes2(b64_dec_len(b64_2.data(), b64_2.size())); + b64_decode(b64_2.data(), b64_2.size(), bytes2.data()); + auto bytes = self.compute( + wrapped_compact_theta_sketch::wrap(bytes1.data(), bytes1.size(), seed), + wrapped_compact_theta_sketch::wrap(bytes2.data(), bytes2.size(), seed) + ).serialize_compressed(); + std::vector<char> b64(b64_enc_len(bytes.size())); + b64_encode((const char*) bytes.data(), bytes.size(), b64.data()); + return std::string(b64.data(), b64.size()); + })) + ; +} diff --git a/theta_sketch_a_not_b.sql b/theta_sketch_a_not_b.sql new file mode 100644 index 0000000..6912b2a --- /dev/null +++ b/theta_sketch_a_not_b.sql @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE FUNCTION test.theta_sketch_a_not_b(sketch1 BYTES, sketch2 BYTES, seed INT64) RETURNS BYTES LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.js"]) AS R""" +const default_seed = BigInt(9001); +var a_not_b = new Module.theta_a_not_b(seed ? BigInt(seed) : default_seed); +try { + return a_not_b.computeWithB64ReturnB64Compressed(sketch1, sketch2, seed ? BigInt(seed) : default_seed); +} finally { + a_not_b.delete(); +} +"""; diff --git a/theta_sketch_agg_string.sql b/theta_sketch_agg_string.sql new file mode 100644 index 0000000..73286ae --- /dev/null +++ b/theta_sketch_agg_string.sql @@ -0,0 +1,122 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE AGGREGATE FUNCTION test.theta_sketch_agg_string(str STRING, seed INT64 NOT AGGREGATE) RETURNS BYTES LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.mjs"]) AS R""" +import ModuleFactory from "gs://datasketches/theta_sketch.mjs"; +var Module = await ModuleFactory(); +const default_lg_k = 12; +const default_seed = BigInt(9001); + +function destroyState(state) { + if (state.sketch) { + state.sketch.delete(); + state.sketch = null; + } + if (state.union) { + state.union.delete(); + state.union = null; + } + state.serialized = null; +} + +// UDAF interface +export function initialState(seed) { + var state = { + lg_k: default_lg_k, + seed: seed == null ? default_seed : seed, + sketch: null, + union: null, + serialized: null + }; + if (state.seed == null) state.seed = default_seed; + state.sketch = new Module.update_theta_sketch(state.lg_k, state.seed); + return state; +} + +export function aggregate(state, str) { + if (state.sketch == null) { + state.sketch = new Module.update_theta_sketch(state.lg_k, state.seed); + } + state.sketch.updateString(str); +} + +export function serialize(state) { + try { + if (state.sketch != null && state.serialized != null) { + // merge aggregated and serialized state + var u = new Module.theta_union(state.lg_k, state.seed); + try { + u.updateWithUpdateSketch(state.sketch); + u.updateWithBytes(state.serialized, state.seed); + state.serialized = u.getResultAsUint8ArrayCompressed(); + } finally { + u.delete(); + } + } else if (state.sketch != null) { + state.serialized = state.sketch.serializeAsUint8ArrayCompressed(); + } else if (state.union != null) { + state.serialized = state.union.getResultAsUint8ArrayCompressed(); + } else if (state.serialized == null) { + throw new Error("Unexpected state in serialization " + JSON.stringify(state)); + } + return { + lg_k: state.lg_k, + seed: state.seed, + bytes: state.serialized + }; + } finally { + destroyState(state); + } +} + +export function deserialize(serialized) { + return { + sketch: null, + union: null, + serialized: serialized.bytes, + lg_k: serialized.lg_k, + seed: serialized.seed + }; +} + +export function merge(state, other_state) { + if (!state.union) { + state.union = new Module.theta_union(state.lg_k, state.seed); + } + if (state.sketch || other_state.sketch) { + throw new Error("update_theta_sketch not expected during merge()"); + } + if (other_state.union) { + throw new Error("other_state should not have union during merge()"); + } + if (state.serialized) { + state.union.updateWithBytes(state.serialized, state.seed); + state.serialized = null; + } + if (other_state.serialized) { + state.union.updateWithBytes(other_state.serialized, state.seed); + other_state.serialized = null; + } else { + throw new Error("other_state should have serialized sketch during merge"); + } +} + +export function finalize(state) { + return serialize(state).bytes +} +"""; diff --git a/theta_sketch_agg_union.sql b/theta_sketch_agg_union.sql new file mode 100644 index 0000000..b50e8d5 --- /dev/null +++ b/theta_sketch_agg_union.sql @@ -0,0 +1,99 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE AGGREGATE FUNCTION test.theta_sketch_agg_union(sketch BYTES, lg_k INT64 NOT AGGREGATE) RETURNS BYTES LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.mjs"]) AS R""" +import ModuleFactory from "gs://datasketches/theta_sketch.mjs"; +var Module = await ModuleFactory(); +const default_lg_k = 12; +const default_seed = BigInt(9001); + +// ensures we have a theta_union +// if there is a compact_theta_sketch, add it to the union and destroy it +function ensureUnion(state) { + if (state.union == null) { + state.union = new Module.theta_union(state.lg_k, state.seed); + } + if (state.serialized != null) { + state.union.updateWithBytes(state.serialized, state.seed); + state.serialized = null; + } +} + +export function initialState(lg_k) { + return { + lg_k: lg_k == null ? Number(default_lg_k) : Number(lg_k), + seed: default_seed, + union: null, + serialized: null + }; +} + +export function aggregate(state, sketch) { + if (sketch != null) { + ensureUnion(state); + try { + state.union.updateWithBytes(sketch, state.seed); + } catch (e) { + throw new Error(Module.getExceptionMessage(e)); + } + } +} + +export function serialize(state) { + ensureUnion(state); + try { + return { + lg_k: state.lg_k, + seed: state.seed, + bytes: state.union.getResultAsUint8ArrayCompressed() + }; + } finally { + state.union.delete(); + state.union = null; + } +} + +export function deserialize(serialized) { + return { + lg_k: serialized.lg_k, + seed: serialized.seed, + union: null, + serialized: serialized.bytes + }; +} + +export function merge(state, other_state) { + ensureUnion(state); + if (other_state.union) { + throw new Error("Did not expect union in other state"); + } + if (other_state.serialized) { + try { + state.union.updateWithBytes(other_state.serialized, state.seed); + other_state.serialized = null; + } catch (e) { + throw new Error(Module.getExceptionMessage(e)); + } + } else { + throw new Error("Expected serialized sketch in other_state"); + } +} +export function finalize(state) { + return serialize(state).bytes; +} +"""; diff --git a/theta_sketch_get_estimate.sql b/theta_sketch_get_estimate.sql new file mode 100644 index 0000000..c00608d --- /dev/null +++ b/theta_sketch_get_estimate.sql @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE FUNCTION test.theta_sketch_get_estimate(base64 BYTES, seed INT64) RETURNS INT64 LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.js"]) AS R""" +const default_seed = BigInt(9001); +try { + var sketch = Module.compact_theta_sketch.deserializeFromB64(base64, seed ? BigInt(seed) : default_seed); + try { + return sketch.getEstimate(); + } finally { + sketch.delete(); + } +} catch (e) { + throw new Error(Module.getExceptionMessage(e)); +} +"""; diff --git a/theta_sketch_scalar_intersection.sql b/theta_sketch_scalar_intersection.sql new file mode 100644 index 0000000..7820e3c --- /dev/null +++ b/theta_sketch_scalar_intersection.sql @@ -0,0 +1,29 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE FUNCTION test.theta_sketch_scalar_intersection(sketchBytes1 BYTES, sketchBytes2 BYTES, seed INT64) RETURNS BYTES LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.js"]) AS R""" +const default_seed = BigInt(9001); +var intersection = new Module.theta_intersection(seed ? BigInt(seed) : default_seed); +try { + intersection.updateWithB64(sketchBytes1, seed ? BigInt(seed) : default_seed); + intersection.updateWithB64(sketchBytes2, seed ? BigInt(seed) : default_seed); + return intersection.getResultB64Compressed(); +} finally { + intersection.delete(); +} +"""; diff --git a/theta_sketch_scalar_union.sql b/theta_sketch_scalar_union.sql new file mode 100644 index 0000000..eea4626 --- /dev/null +++ b/theta_sketch_scalar_union.sql @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE FUNCTION test.theta_sketch_scalar_union(sketch1 BYTES, sketch2 BYTES, lg_k INT64, seed INT64) RETURNS BYTES LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.js"]) AS R""" +const default_lg_k = 12; +const default_seed = BigInt(9001); +var union = new Module.theta_union(lg_k ? lg_k : default_lg_k, seed ? BigInt(seed) : default_seed); +try { + union.updateWithB64(sketch1, seed ? BigInt(seed) : default_seed) + union.updateWithB64(sketch2, seed ? BigInt(seed) : default_seed) + return union.getResultB64Compressed(); +} finally { + union.delete(); +} +"""; diff --git a/theta_sketch_to_string.sql b/theta_sketch_to_string.sql new file mode 100644 index 0000000..876b09e --- /dev/null +++ b/theta_sketch_to_string.sql @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +CREATE OR REPLACE FUNCTION test.theta_sketch_to_string(base64 BYTES, seed INT64) RETURNS STRING LANGUAGE js +OPTIONS (library=["gs://datasketches/theta_sketch.js"]) AS R""" +try { + const default_seed = BigInt(9001); + var sketch = Module.compact_theta_sketch.deserializeFromB64(base64, seed ? BigInt(seed) : default_seed); + try { + return sketch.toString(); + } finally { + sketch.delete(); + } +} catch (e) { + throw new Error(Module.getExceptionMessage(e)); +} +"""; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
