pitrou commented on code in PR #45360: URL: https://github.com/apache/arrow/pull/45360#discussion_r1989650770
########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. Review Comment: Is this the approach from the FastCDC paper or is it specific to this PR? ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" Review Comment: Nit: please leave empty line between include groups ```suggestion #include "arrow/array.h" ``` ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather Review Comment: There might be a missing word here? "one with a lower and one with a probability of matching" ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size Review Comment: Hmm, if we make the mask smaller (less 1s), then the distribution will be closer to the min_size, no? ########## cpp/src/parquet/chunker_internal.h: ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which are contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... +/// +/// Then the parquet file is being uploaded to a content addressable storage system (CAS) +/// which splits the bytes stream into content defined blobs. The CAS system will +/// calculate a unique identifier for each blob, then store the blob in a key-value store. +/// If the same blob is encountered again, the system can refer to the hash instead of +/// physically storing the blob again. In the example above, the CAS system would store +/// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the +/// files. +/// While the deduplication is performed by the CAS system, the parquet chunker makes it +/// possible to efficiently deduplicate the data by consistently dividing the data into +/// chunks. +/// +/// Implementation details: +/// +/// Only the parquet writer must be aware of the content defined chunking, the reader +/// doesn't need to know about it. Each parquet column writer holds a +/// ContentDefinedChunker instance depending on the writer's properties. The chunker's +/// state is maintained across the entire column without being reset between pages and row +/// groups. +/// +/// The chunker receives the record shredded column data (def_levels, rep_levels, values) +/// and goes over the (def_level, rep_level, value) triplets one by one while adjusting +/// the column-global rolling hash based on the triplet. Whenever the rolling hash matches +/// a predefined mask, the chunker creates a new chunk. The chunker returns a vector of +/// Chunk objects that represent the boundaries of the chunks. +/// Note that the boundaries are deterministically calculated exclusively based on the +/// data itself, so the same data will always produce the same chunks - given the same +/// chunker configuration. +/// +/// References: +/// - FastCDC: a Fast and Efficient Content-Defined Chunking Approach for Data +/// Deduplication +/// https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf +/// - Git is for Data (chunk size normalization used here is described in section 6.2.1): +/// https://www.cidrdb.org/cidr2023/papers/p43-low.pdf +class ContentDefinedChunker { + public: + /// Create a new ContentDefinedChunker instance + /// + /// @param level_info Information about definition and repetition levels + /// @param size_range Min/max chunk size as pair<min_size, max_size>, the chunker will + /// attempt to uniformly distribute the chunks between these extremes. + /// @param norm_factor Normalization factor to center the chunk size around the average + /// size more aggressively. By increasing the normalization factor, + /// probability of finding a chunk boundary increases. + ContentDefinedChunker(const LevelInfo& level_info, + std::pair<uint64_t, uint64_t> size_range, + uint8_t norm_factor = 0); + + /// Get the chunk boundaries for the given column data + /// + /// @param def_levels Definition levels + /// @param rep_levels Repetition levels + /// @param num_levels Number of levels + /// @param values Column values as an Arrow array + /// @return Vector of Chunk objects representing the chunk boundaries + const std::vector<Chunk> GetBoundaries(const int16_t* def_levels, + const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& values); + + private: + // Update the rolling hash with a compile-time known sized value, set has_matched_ to + // true if the hash matches the mask. + template <typename T> + void Roll(const T value); + Review Comment: Yes, it makes the header file cleaner and it lets us include less also. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + int64_t min_size, int64_t max_size, + int8_t norm_factor) + : level_info_(level_info), + min_size_(min_size), + max_size_(max_size), + hash_mask_(GetMask(min_size, max_size, norm_factor)) { + if (min_size_ < 0) { + throw ParquetException("min_size must be non-negative"); + } + if (max_size_ < 0) { + throw ParquetException("max_size must be non-negative"); + } + if (min_size_ > max_size_) { + throw ParquetException("min_size must be less than or equal to max_size"); + } +} + +void ContentDefinedChunker::Roll(const bool value) { + if (chunk_size_++ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); +} + +template <int ByteWidth> +void ContentDefinedChunker::Roll(const uint8_t* value) { + chunk_size_ += ByteWidth; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < ByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +template <typename T> +void ContentDefinedChunker::Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); +} + +void ContentDefinedChunker::Roll(const uint8_t* value, int64_t length) { + chunk_size_ += length; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +bool ContentDefinedChunker::NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has exponential distribution, and + // we use central limit theorem to approximate normal distribution) + if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) { + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) { + // we have a hard limit on the maximum chunk size, note that we don't reset the + // rolling hash state here, so the next NeedNewChunk() call will continue from the + // current state + chunk_size_ = 0; + return true; + } + return false; +} + +template <typename RollFunc> +const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_levels, Review Comment: `const` on the return value is pointless. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + int64_t min_size, int64_t max_size, + int8_t norm_factor) + : level_info_(level_info), + min_size_(min_size), + max_size_(max_size), + hash_mask_(GetMask(min_size, max_size, norm_factor)) { + if (min_size_ < 0) { + throw ParquetException("min_size must be non-negative"); + } + if (max_size_ < 0) { + throw ParquetException("max_size must be non-negative"); + } Review Comment: We should also mandate that they are non-zero, IMHO. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); Review Comment: Can use `arrow::bit_util::Log2` instead which will avoid a costly logarithm. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + int64_t min_size, int64_t max_size, + int8_t norm_factor) + : level_info_(level_info), + min_size_(min_size), + max_size_(max_size), + hash_mask_(GetMask(min_size, max_size, norm_factor)) { + if (min_size_ < 0) { + throw ParquetException("min_size must be non-negative"); + } + if (max_size_ < 0) { + throw ParquetException("max_size must be non-negative"); + } + if (min_size_ > max_size_) { + throw ParquetException("min_size must be less than or equal to max_size"); + } +} + +void ContentDefinedChunker::Roll(const bool value) { + if (chunk_size_++ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); +} + +template <int ByteWidth> +void ContentDefinedChunker::Roll(const uint8_t* value) { + chunk_size_ += ByteWidth; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < ByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +template <typename T> +void ContentDefinedChunker::Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); +} + +void ContentDefinedChunker::Roll(const uint8_t* value, int64_t length) { + chunk_size_ += length; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +bool ContentDefinedChunker::NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has exponential distribution, and + // we use central limit theorem to approximate normal distribution) + if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) { + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) { + // we have a hard limit on the maximum chunk size, note that we don't reset the + // rolling hash state here, so the next NeedNewChunk() call will continue from the + // current state + chunk_size_ = 0; + return true; + } + return false; +} + +template <typename RollFunc> +const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const RollFunc& RollValue) { + std::vector<Chunk> chunks; + int64_t offset; + int64_t prev_offset = 0; + int64_t prev_value_offset = 0; + bool has_def_levels = level_info_.def_level > 0; + bool has_rep_levels = level_info_.rep_level > 0; + + if (!has_rep_levels && !has_def_levels) { + // fastest path for non-nested non-null data + for (offset = 0; offset < num_levels; ++offset) { + RollValue(offset); + if (NeedNewChunk()) { + chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else if (!has_rep_levels) { + // non-nested data with nulls + int16_t def_level; + for (int64_t offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + + Roll(&def_level); + if (def_level == level_info_.def_level) { + RollValue(offset); + } + if (NeedNewChunk()) { + chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else { + // nested data with nulls + int16_t def_level; + int16_t rep_level; + int64_t value_offset = 0; + + for (offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + rep_level = rep_levels[offset]; + + Roll(&def_level); + Roll(&rep_level); + if (def_level == level_info_.def_level) { + RollValue(value_offset); + } + + if ((rep_level == 0) && NeedNewChunk()) { + // if we are at a record boundary and need a new chunk, we create a new chunk + auto levels_to_write = offset - prev_offset; + if (levels_to_write > 0) { + chunks.emplace_back(prev_offset, prev_value_offset, levels_to_write); + prev_offset = offset; + prev_value_offset = value_offset; + } + } + if (def_level >= level_info_.repeated_ancestor_def_level) { + // we only increment the value offset if we have a leaf value + ++value_offset; + } + } + } + + // add the last chunk if we have any levels left + if (prev_offset < num_levels) { + chunks.emplace_back(prev_offset, prev_value_offset, num_levels - prev_offset); + } + return chunks; +} + +#define FIXED_WIDTH_CASE(ByteWidth) \ + { \ + const auto raw_values = values.data()->GetValues<uint8_t>(1); \ + return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { \ + return Roll<ByteWidth>(raw_values + i * ByteWidth); \ + }); \ + } + +#define BINARY_LIKE_CASE(ArrayType) \ + { \ + const auto& array = static_cast<const ArrayType&>(values); \ + const uint8_t* value; \ + ArrayType::offset_type length; \ + return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { \ + value = array.GetValue(i, &length); \ + Roll(value, length); \ + }); \ + } + +const std::vector<Chunk> ContentDefinedChunker::GetBoundaries( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& values) { + auto type_id = values.type()->id(); + switch (type_id) { + case ::arrow::Type::NA: { + return Calculate(def_levels, rep_levels, num_levels, [](int64_t) {}); + } + case ::arrow::Type::BOOL: { + const auto& bool_array = static_cast<const ::arrow::BooleanArray&>(values); + return Calculate(def_levels, rep_levels, num_levels, + [&](int64_t i) { return Roll(bool_array.Value(i)); }); + } + case ::arrow::Type::INT8: + case ::arrow::Type::UINT8: + FIXED_WIDTH_CASE(1) + case ::arrow::Type::INT16: + case ::arrow::Type::UINT16: + case ::arrow::Type::HALF_FLOAT: + FIXED_WIDTH_CASE(2) + case ::arrow::Type::INT32: + case ::arrow::Type::UINT32: + case ::arrow::Type::FLOAT: + case ::arrow::Type::DATE32: + case ::arrow::Type::TIME32: + FIXED_WIDTH_CASE(4) + case ::arrow::Type::INT64: + case ::arrow::Type::UINT64: + case ::arrow::Type::DOUBLE: + case ::arrow::Type::DATE64: + case ::arrow::Type::TIME64: + case ::arrow::Type::TIMESTAMP: + case ::arrow::Type::DURATION: + FIXED_WIDTH_CASE(8) + case ::arrow::Type::DECIMAL128: Review Comment: Should also decimal32 and decimal64. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + int64_t min_size, int64_t max_size, + int8_t norm_factor) + : level_info_(level_info), + min_size_(min_size), + max_size_(max_size), + hash_mask_(GetMask(min_size, max_size, norm_factor)) { + if (min_size_ < 0) { + throw ParquetException("min_size must be non-negative"); + } + if (max_size_ < 0) { + throw ParquetException("max_size must be non-negative"); + } + if (min_size_ > max_size_) { + throw ParquetException("min_size must be less than or equal to max_size"); + } +} + +void ContentDefinedChunker::Roll(const bool value) { + if (chunk_size_++ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); +} + +template <int ByteWidth> +void ContentDefinedChunker::Roll(const uint8_t* value) { + chunk_size_ += ByteWidth; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < ByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +template <typename T> +void ContentDefinedChunker::Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); +} + +void ContentDefinedChunker::Roll(const uint8_t* value, int64_t length) { + chunk_size_ += length; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +bool ContentDefinedChunker::NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has exponential distribution, and + // we use central limit theorem to approximate normal distribution) + if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) { + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) { + // we have a hard limit on the maximum chunk size, note that we don't reset the + // rolling hash state here, so the next NeedNewChunk() call will continue from the + // current state + chunk_size_ = 0; + return true; + } + return false; +} + +template <typename RollFunc> +const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const RollFunc& RollValue) { + std::vector<Chunk> chunks; + int64_t offset; + int64_t prev_offset = 0; + int64_t prev_value_offset = 0; + bool has_def_levels = level_info_.def_level > 0; + bool has_rep_levels = level_info_.rep_level > 0; + + if (!has_rep_levels && !has_def_levels) { + // fastest path for non-nested non-null data + for (offset = 0; offset < num_levels; ++offset) { + RollValue(offset); + if (NeedNewChunk()) { + chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else if (!has_rep_levels) { + // non-nested data with nulls + int16_t def_level; + for (int64_t offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + + Roll(&def_level); + if (def_level == level_info_.def_level) { + RollValue(offset); + } + if (NeedNewChunk()) { + chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else { + // nested data with nulls + int16_t def_level; + int16_t rep_level; + int64_t value_offset = 0; + + for (offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + rep_level = rep_levels[offset]; + + Roll(&def_level); + Roll(&rep_level); + if (def_level == level_info_.def_level) { + RollValue(value_offset); + } + + if ((rep_level == 0) && NeedNewChunk()) { + // if we are at a record boundary and need a new chunk, we create a new chunk + auto levels_to_write = offset - prev_offset; + if (levels_to_write > 0) { + chunks.emplace_back(prev_offset, prev_value_offset, levels_to_write); + prev_offset = offset; + prev_value_offset = value_offset; + } + } + if (def_level >= level_info_.repeated_ancestor_def_level) { + // we only increment the value offset if we have a leaf value + ++value_offset; + } + } + } + + // add the last chunk if we have any levels left + if (prev_offset < num_levels) { + chunks.emplace_back(prev_offset, prev_value_offset, num_levels - prev_offset); + } + return chunks; +} + +#define FIXED_WIDTH_CASE(ByteWidth) \ + { \ + const auto raw_values = values.data()->GetValues<uint8_t>(1); \ + return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { \ + return Roll<ByteWidth>(raw_values + i * ByteWidth); \ + }); \ + } + +#define BINARY_LIKE_CASE(ArrayType) \ + { \ + const auto& array = static_cast<const ArrayType&>(values); \ + const uint8_t* value; \ + ArrayType::offset_type length; \ + return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { \ + value = array.GetValue(i, &length); \ + Roll(value, length); \ + }); \ + } + +const std::vector<Chunk> ContentDefinedChunker::GetBoundaries( + const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, + const ::arrow::Array& values) { + auto type_id = values.type()->id(); + switch (type_id) { Review Comment: I think it would be easier, nicer and more future-proof to use `VisitType` with a generic lambda and some Calculate helpers. Something like this: ```c++ std::vector<Chunk> CalculateBoolean( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const Array& values) { const auto& bool_array = checked_cast<const ::arrow::BooleanArray&>(values); return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { return Roll(bool_array.Value(i)); }); } template <int kByteWidth> std::vector<Chunk> CalculateFixedWidth( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const Array& values) { const uint8_t* raw_values = values.data()->GetValues(1, 0) + values.offset() * kByteWidth; return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { return Roll<kByteWidth>(&raw_values[i * kByteWidth]); }); } template <typename ArrayType> std::vector<Chunk> CalculateBinaryLike( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const Array& values) { const auto& binary_array = checked_cast<const ArrayType&>(values); const uint8_t* raw_values = values.data()->GetValues(1, 0) + values.offset() * kByteWidth; return Calculate(def_levels, rep_levels, num_levels, [&](int64_t i) { return Roll(binary_array.GetView(i)); }); } std::vector<Chunk> ContentDefinedChunker::GetBoundaries( const int16_t* def_levels, const int16_t* rep_levels, int64_t num_levels, const ::arrow::Array& values) { auto handle_type = [&](auto&& type) { using Type = std::decay_t<decltype(type)>; if constexpr (::arrow::is_boolean(Type::type_id)) { return CalculateBoolean(def_levels, rep_levels, num_levels, values); } else if constexpr (::arrow::is_primitive(Type::type_id)) { using c_type = typename ::arrow::type_traits<Type>::CType; return CalculateFixedWidth<sizeof(c_type)>(def_levels, rep_levels, num_levels, values); } else if constexpr (::arrow::is_binary_like(Type::type_id)) { return CalculateBinaryLike<::arrow::BinaryArray>(def_levels, rep_levels, num_levels, values); } else if constexpr (::arrow::is_large_binary_like(Type::type_id)) { return CalculateBinaryLike<::arrow::LargeBinaryArray>(def_levels, rep_levels, num_levels, values); } /* else etc. */ }; return VisitType(*values.type(), handle_type); } ``` ########## cpp/src/parquet/chunker_internal_codegen.py: ########## @@ -0,0 +1,116 @@ +#!/usr/bin/env python + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Produce the given number gearhash tables for rolling hash calculations. + +Each table consists of 256 64-bit integer values and by default 8 tables are +produced. The tables are written to a header file that can be included in the +C++ code. + +The generated numbers are deterministic "random" numbers created by MD5 hashing +a fixed seed and the table index. This ensures that the tables are the same +across different runs and platforms. The function of generating the numbers is +less important as long as they have sufficiently uniform distribution. + +Reference implementations: +- https://github.com/Borelset/destor/blob/master/src/chunking/fascdc_chunking.c +- https://github.com/nlfiedler/fastcdc-rs/blob/master/examples/table64.rs + +Usage: + python chunker_internal_codegen.py [ntables] + + ntables: Number of gearhash tables to generate (default 8), the + the C++ implementation expects 8 tables so this should not be + changed unless the C++ code is also updated. + + The generated header file is written to ./chunker_internal_generated.h +""" + +import hashlib +import pathlib +import sys +from io import StringIO + + +template = """\ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <cstdint> + +namespace parquet::internal {{ + +constexpr uint64_t kGearhashTable[8][256] = {{ +{content}}}; + +}} // namespace parquet::internal +""" + + +def generate_hash(n: int, seed: int): + """Produce predictable hash values for a given seed and n using MD5.""" + value = bytes([seed] * 64 + [n] * 64) Review Comment: Do you have an idea why `* 64` is being used? Is there an explanation somewhere? ########## cpp/src/parquet/chunker_internal.h: ########## @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} Review Comment: You can probably just `push_back({record_level_offset, record_value_offset, levels_to_write})`? ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + int64_t min_size, int64_t max_size, + int8_t norm_factor) + : level_info_(level_info), + min_size_(min_size), + max_size_(max_size), + hash_mask_(GetMask(min_size, max_size, norm_factor)) { + if (min_size_ < 0) { + throw ParquetException("min_size must be non-negative"); + } + if (max_size_ < 0) { + throw ParquetException("max_size must be non-negative"); + } + if (min_size_ > max_size_) { + throw ParquetException("min_size must be less than or equal to max_size"); + } +} + +void ContentDefinedChunker::Roll(const bool value) { + if (chunk_size_++ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); +} + +template <int ByteWidth> +void ContentDefinedChunker::Roll(const uint8_t* value) { + chunk_size_ += ByteWidth; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < ByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +template <typename T> +void ContentDefinedChunker::Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); +} + +void ContentDefinedChunker::Roll(const uint8_t* value, int64_t length) { + chunk_size_ += length; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +bool ContentDefinedChunker::NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has exponential distribution, and + // we use central limit theorem to approximate normal distribution) + if (ARROW_PREDICT_FALSE(++nth_run_ >= 7)) { + nth_run_ = 0; + chunk_size_ = 0; + return true; + } + } + if (ARROW_PREDICT_FALSE(chunk_size_ >= max_size_)) { + // we have a hard limit on the maximum chunk size, note that we don't reset the + // rolling hash state here, so the next NeedNewChunk() call will continue from the + // current state + chunk_size_ = 0; + return true; + } + return false; +} + +template <typename RollFunc> +const std::vector<Chunk> ContentDefinedChunker::Calculate(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const RollFunc& RollValue) { + std::vector<Chunk> chunks; + int64_t offset; + int64_t prev_offset = 0; + int64_t prev_value_offset = 0; + bool has_def_levels = level_info_.def_level > 0; + bool has_rep_levels = level_info_.rep_level > 0; + + if (!has_rep_levels && !has_def_levels) { + // fastest path for non-nested non-null data + for (offset = 0; offset < num_levels; ++offset) { + RollValue(offset); + if (NeedNewChunk()) { + chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else if (!has_rep_levels) { + // non-nested data with nulls + int16_t def_level; + for (int64_t offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + + Roll(&def_level); + if (def_level == level_info_.def_level) { + RollValue(offset); + } + if (NeedNewChunk()) { + chunks.emplace_back(prev_offset, prev_offset, offset - prev_offset); + prev_offset = offset; + } + } + // set the previous value offset to add the last chunk + prev_value_offset = prev_offset; + } else { + // nested data with nulls + int16_t def_level; + int16_t rep_level; + int64_t value_offset = 0; + + for (offset = 0; offset < num_levels; ++offset) { + def_level = def_levels[offset]; + rep_level = rep_levels[offset]; + + Roll(&def_level); + Roll(&rep_level); + if (def_level == level_info_.def_level) { + RollValue(value_offset); + } + + if ((rep_level == 0) && NeedNewChunk()) { + // if we are at a record boundary and need a new chunk, we create a new chunk + auto levels_to_write = offset - prev_offset; + if (levels_to_write > 0) { + chunks.emplace_back(prev_offset, prev_value_offset, levels_to_write); + prev_offset = offset; + prev_value_offset = value_offset; + } + } + if (def_level >= level_info_.repeated_ancestor_def_level) { + // we only increment the value offset if we have a leaf value + ++value_offset; + } + } + } + + // add the last chunk if we have any levels left + if (prev_offset < num_levels) { + chunks.emplace_back(prev_offset, prev_value_offset, num_levels - prev_offset); + } + return chunks; +} + +#define FIXED_WIDTH_CASE(ByteWidth) \ + { \ + const auto raw_values = values.data()->GetValues<uint8_t>(1); \ Review Comment: You need to take the array offset into account. ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_hashtable.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +// create a fake null array class with a GetView method returning 0 always +class FakeNullArray { + public: + uint8_t GetView(int64_t i) const { return 0; } + + std::shared_ptr<::arrow::DataType> type() const { return ::arrow::null(); } + + int64_t null_count() const { return 0; } +}; + +static uint64_t GetMask(uint64_t min_size, uint64_t max_size, uint8_t norm_factor) { + // we aim for gaussian-like distribution of chunk sizes between min_size and max_size + uint64_t avg_size = (min_size + max_size) / 2; + // we skip calculating gearhash for the first `min_size` bytes, so we are looking for + // a smaller chunk as the average size + uint64_t target_size = avg_size - min_size; + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution + // `norm_factor` narrows the chunk size distribution aroun avg_size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + std::pair<uint64_t, uint64_t> size_range, + uint8_t norm_factor) + : level_info_(level_info), + min_size_(size_range.first), + max_size_(size_range.second), + hash_mask_(GetMask(size_range.first, size_range.second, norm_factor)) {} + +template <typename T> +void ContentDefinedChunker::Roll(const T value) { + constexpr size_t BYTE_WIDTH = sizeof(T); + chunk_size_ += BYTE_WIDTH; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + auto bytes = reinterpret_cast<const uint8_t*>(&value); + for (size_t i = 0; i < BYTE_WIDTH; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + GEARHASH_TABLE[nth_run_][bytes[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); Review Comment: I'm not sure I understand your answer to be honest. You're going to chunk at _value_ boundaries, not at byte boundaries, so why update `has_matched_` at every byte? ########## cpp/src/parquet/chunker_internal.cc: ########## @@ -0,0 +1,319 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/chunker_internal.h" + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "arrow/util/logging.h" +#include "parquet/chunker_internal_generated.h" +#include "parquet/exception.h" +#include "parquet/level_conversion.h" + +namespace parquet::internal { + +/// Calculate the mask to use for the rolling hash, the mask is used to determine if a +/// new chunk should be created based on the rolling hash value. The mask is calculated +/// based on the min_size, max_size and norm_factor parameters. +/// +/// Assuming that the gear hash hash random values with a uniform distribution, then each +/// bit in the actual value of rolling_hash_ has even probability of being set so a mask +/// with the top N bits set has a probability of 1/2^N of matching the rolling hash. This +/// is the judgment criteria for the original gear hash based content-defined chunking. +/// The main drawback of this approach is the non-uniform distribution of the chunk sizes. +/// +/// Later on the FastCDC has improved the process by introducing: +/// - sub-minimum chunk cut-point skipping (not hashing the first `min_size` bytes) +/// - chunk size normalization (using two masks) +/// +/// This implementation uses cut-point skipping because it improves the overall +/// performance and a more accurate alternative to have less skewed chunk size +/// distribution. Instead of using two different masks (one with a lower and one with a +/// probability of matching and switching them based on the actual chunk size), we rather +/// use 8 different gear hash tables and require having 8 consecutive matches while +/// switching between the used hashtables. This approach is based on central limit theorem +/// and approximates normal distribution of the chunk sizes. +// +// @param min_size The minimum chunk size (default 256KiB) +// @param max_size The maximum chunk size (default 1MiB) +// @param norm_factor Normalization factor (default 0) +// @return The mask used to compare against the rolling hash +static uint64_t GetMask(int64_t min_size, int64_t max_size, uint8_t norm_factor) { + // calculate the average size of the chunks + int64_t avg_size = (min_size + max_size) / 2; + // since we are skipping the first `min_size` bytes for each chunk, we need to + // target a smaller chunk size to reach the average size after skipping the first + // `min_size` bytes + int64_t target_size = avg_size - min_size; + // assuming that the gear hash has a uniform distribution, we can calculate the mask + // by taking the log2 of the target size + size_t mask_bits = static_cast<size_t>(std::floor(std::log2(target_size))); + // -3 because we are using 8 hash tables to have more gaussian-like distribution, + // a user defined `norm_factor` can be used to adjust the mask size, hence the matching + // probability, by increasing the norm_factor we increase the probability of matching + // the mask, forcing the distribution closer to the average size + size_t effective_bits = mask_bits - 3 - norm_factor; + return std::numeric_limits<uint64_t>::max() << (64 - effective_bits); +} + +ContentDefinedChunker::ContentDefinedChunker(const LevelInfo& level_info, + int64_t min_size, int64_t max_size, + int8_t norm_factor) + : level_info_(level_info), + min_size_(min_size), + max_size_(max_size), + hash_mask_(GetMask(min_size, max_size, norm_factor)) { + if (min_size_ < 0) { + throw ParquetException("min_size must be non-negative"); + } + if (max_size_ < 0) { + throw ParquetException("max_size must be non-negative"); + } + if (min_size_ > max_size_) { + throw ParquetException("min_size must be less than or equal to max_size"); + } +} + +void ContentDefinedChunker::Roll(const bool value) { + if (chunk_size_++ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); +} + +template <int ByteWidth> +void ContentDefinedChunker::Roll(const uint8_t* value) { + chunk_size_ += ByteWidth; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (size_t i = 0; i < ByteWidth; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +template <typename T> +void ContentDefinedChunker::Roll(const T* value) { + return Roll<sizeof(T)>(reinterpret_cast<const uint8_t*>(value)); +} + +void ContentDefinedChunker::Roll(const uint8_t* value, int64_t length) { + chunk_size_ += length; + if (chunk_size_ < min_size_) { + // short-circuit if we haven't reached the minimum chunk size, this speeds up the + // chunking process since the gearhash doesn't need to be updated + return; + } + for (auto i = 0; i < length; ++i) { + rolling_hash_ = (rolling_hash_ << 1) + kGearhashTable[nth_run_][value[i]]; + has_matched_ = has_matched_ || ((rolling_hash_ & hash_mask_) == 0); + } +} + +bool ContentDefinedChunker::NeedNewChunk() { + // decide whether to create a new chunk based on the rolling hash; has_matched_ is + // set to true if we encountered a match since the last NeedNewChunk() call + if (ARROW_PREDICT_FALSE(has_matched_)) { + has_matched_ = false; + // in order to have a normal distribution of chunk sizes, we only create a new chunk + // if the adjused mask matches the rolling hash 8 times in a row, each run uses a + // different gearhash table (gearhash's chunk size has exponential distribution, and Review Comment: Is there a reference to the "exponential distribution" claim? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
