wgtmac commented on code in PR #45360: URL: https://github.com/apache/arrow/pull/45360#discussion_r1978477938
########## cpp/src/parquet/column_chunker.h: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet { + +namespace internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which are contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... +/// +/// Then the parquet file is being uploaded to a content addressable storage systems (CAS) +/// which split the bytes stream into content defined blobs. The CAS system will calculate +/// a unique identifier for each blob, then store the blob in a key-value store. If the +/// same blob is encountered again, the system can refer to the hash instead of physically +/// storing the blob again. In the example above, the CAS system would phiysically store +/// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the +/// files. +/// While the deduplication is performed by the CAS system, the parquet chunker makes it +/// possible to efficiently deduplicate the data by consistently dividing the data into +/// chunks. +/// +/// Implementation details: +/// +/// Only the parquet writer must be aware of the content defined chunking, the reader +/// doesn't need to know about it. Each parquet column writer holds a +/// ContentDefinedChunker instance depending on the writer's properties. The chunker's +/// state is maintained across the entire column without being reset between pages and row +/// groups. +/// +/// The chunker receives the record shredded column data (def_levels, rep_levels, values) +/// and goes over the (def_level, rep_level, value) triplets one by one while adjusting +/// the column-global rolling hash based on the triplet. Whenever the rolling hash matches +/// a predefined mask, the chunker creates a new chunk. The chunker returns a vector of +/// Chunk objects that represent the boundaries of the chunks/// +/// Note that the boundaries are deterministically calculated exclusively based on the +/// data itself, so the same data will always produce the same chunks - given the same +/// chunker configuration. +/// +/// References: +/// - FastCDC paper: "FastCDC: a Fast and Efficient Content-Defined Chunking Approach for +/// Data Deduplication" +/// https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf +class ContentDefinedChunker { Review Comment: IIUC, this file can be renamed to `column_chunk_internal.h` to avoid being installed accidentally. ########## cpp/src/parquet/column_writer.h: ########## @@ -23,6 +23,7 @@ #include "arrow/type_fwd.h" #include "arrow/util/compression.h" +#include "parquet/column_chunker.h" Review Comment: This seems unnecessary ########## cpp/src/parquet/column_chunker.h: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet { + +namespace internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which are contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... +/// +/// Then the parquet file is being uploaded to a content addressable storage systems (CAS) +/// which split the bytes stream into content defined blobs. The CAS system will calculate +/// a unique identifier for each blob, then store the blob in a key-value store. If the +/// same blob is encountered again, the system can refer to the hash instead of physically +/// storing the blob again. In the example above, the CAS system would phiysically store +/// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the +/// files. Review Comment: ```suggestion /// Then the parquet file is being uploaded to a content addressable storage system (CAS) /// which splits the bytes stream into content defined blobs. The CAS system will calculate /// an unique identifier for each blob, then store the blob in a key-value store. If the /// same blob is encountered again, the system can refer to the hash instead of physically /// storing the blob again. In the example above, the CAS system would physically store /// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the /// files. ``` ########## cpp/src/parquet/column_writer.cc: ########## @@ -1332,13 +1337,38 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< bits_buffer_->ZeroPadding(); } - if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { - return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx, - maybe_parent_nulls); + if (properties_->cdc_enabled()) { + ARROW_ASSIGN_OR_RAISE(auto boundaries, + content_defined_chunker_.GetBoundaries( + def_levels, rep_levels, num_levels, leaf_array)); + for (auto chunk : boundaries) { + auto chunk_array = leaf_array.Slice(chunk.value_offset); + auto chunk_def_levels = AddIfNotNull(def_levels, chunk.level_offset); + auto chunk_rep_levels = AddIfNotNull(rep_levels, chunk.level_offset); + if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { Review Comment: Do we consider same value tuples to be equal even when they are not encoded in the same way? For example, one is dictionary-encoded and the other is not but their values are the same. Same question for other encodings (e.g. plain vs byte_stream_split for double type). ########## cpp/src/parquet/properties.h: ########## @@ -275,10 +282,33 @@ class PARQUET_EXPORT WriterProperties { page_checksum_enabled_(properties.page_checksum_enabled()), size_statistics_level_(properties.size_statistics_level()), sorting_columns_(properties.sorting_columns()), - default_column_properties_(properties.default_column_properties()) {} + default_column_properties_(properties.default_column_properties()), + cdc_enabled_(properties.cdc_enabled()), + cdc_size_range_(properties.cdc_size_range()), + cdc_norm_factor_(properties.cdc_norm_factor()) {} virtual ~Builder() {} + Builder* enable_cdc() { Review Comment: What about using its full name instead of `cdc`? It may not be obvious to people who are not familiar with it and be mixed with another norm `change data capture`. ########## cpp/src/parquet/column_chunker.h: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet { + +namespace internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which are contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... +/// +/// Then the parquet file is being uploaded to a content addressable storage systems (CAS) +/// which split the bytes stream into content defined blobs. The CAS system will calculate +/// a unique identifier for each blob, then store the blob in a key-value store. If the +/// same blob is encountered again, the system can refer to the hash instead of physically +/// storing the blob again. In the example above, the CAS system would phiysically store +/// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the +/// files. +/// While the deduplication is performed by the CAS system, the parquet chunker makes it +/// possible to efficiently deduplicate the data by consistently dividing the data into +/// chunks. +/// +/// Implementation details: +/// +/// Only the parquet writer must be aware of the content defined chunking, the reader +/// doesn't need to know about it. Each parquet column writer holds a +/// ContentDefinedChunker instance depending on the writer's properties. The chunker's +/// state is maintained across the entire column without being reset between pages and row +/// groups. +/// +/// The chunker receives the record shredded column data (def_levels, rep_levels, values) +/// and goes over the (def_level, rep_level, value) triplets one by one while adjusting +/// the column-global rolling hash based on the triplet. Whenever the rolling hash matches +/// a predefined mask, the chunker creates a new chunk. The chunker returns a vector of +/// Chunk objects that represent the boundaries of the chunks/// +/// Note that the boundaries are deterministically calculated exclusively based on the +/// data itself, so the same data will always produce the same chunks - given the same +/// chunker configuration. +/// +/// References: +/// - FastCDC paper: "FastCDC: a Fast and Efficient Content-Defined Chunking Approach for +/// Data Deduplication" +/// https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf +class ContentDefinedChunker { + public: + /// Create a new ContentDefinedChunker instance + /// + /// @param level_info Information about definition and repetition levels + /// @param size_range Min/max chunk size as pair<min_size, max_size>, the chunker will + /// attempt to uniformly distribute the chunks between these extremes. + /// @param norm_factor Normalization factor to center the chunk size around the average + /// size more aggressively. By increasing the normalization factor, + /// probability of finding a chunk boundary increases. + ContentDefinedChunker(const LevelInfo& level_info, + std::pair<uint64_t, uint64_t> size_range, + uint8_t norm_factor = 0); + + /// Get the chunk boundaries for the given column data + /// + /// @param def_levels Definition levels + /// @param rep_levels Repetition levels + /// @param num_levels Number of levels + /// @param values Column values as an Arrow array + /// @return Vector of Chunk objects representing the chunk boundaries + const ::arrow::Result<std::vector<Chunk>> GetBoundaries(const int16_t* def_levels, Review Comment: When will the `::arrow::Result` hold an error? Usually in the parquet module we use throw instead of status. ########## cpp/src/parquet/column_chunker.h: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet { + +namespace internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which are contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... +/// +/// Then the parquet file is being uploaded to a content addressable storage systems (CAS) +/// which split the bytes stream into content defined blobs. The CAS system will calculate +/// a unique identifier for each blob, then store the blob in a key-value store. If the +/// same blob is encountered again, the system can refer to the hash instead of physically +/// storing the blob again. In the example above, the CAS system would phiysically store +/// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the +/// files. +/// While the deduplication is performed by the CAS system, the parquet chunker makes it +/// possible to efficiently deduplicate the data by consistently dividing the data into +/// chunks. +/// +/// Implementation details: +/// +/// Only the parquet writer must be aware of the content defined chunking, the reader +/// doesn't need to know about it. Each parquet column writer holds a +/// ContentDefinedChunker instance depending on the writer's properties. The chunker's +/// state is maintained across the entire column without being reset between pages and row +/// groups. +/// +/// The chunker receives the record shredded column data (def_levels, rep_levels, values) +/// and goes over the (def_level, rep_level, value) triplets one by one while adjusting +/// the column-global rolling hash based on the triplet. Whenever the rolling hash matches +/// a predefined mask, the chunker creates a new chunk. The chunker returns a vector of +/// Chunk objects that represent the boundaries of the chunks/// Review Comment: ```suggestion /// Chunk objects that represent the boundaries of the chunks. ``` ########## cpp/src/parquet/column_writer.cc: ########## @@ -1332,13 +1337,38 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, public TypedColumnWriter< bits_buffer_->ZeroPadding(); } - if (leaf_array.type()->id() == ::arrow::Type::DICTIONARY) { - return WriteArrowDictionary(def_levels, rep_levels, num_levels, leaf_array, ctx, - maybe_parent_nulls); + if (properties_->cdc_enabled()) { + ARROW_ASSIGN_OR_RAISE(auto boundaries, + content_defined_chunker_.GetBoundaries( Review Comment: There are some cases where the parquet writer will further split the input Arrow array into smaller pieces which may affect the precision of the CDC logic here: - Split the input for max_row_group_size: https://github.com/apache/arrow/blob/main/cpp/src/parquet/arrow/writer.cc#L458-L470 - If data page v2 or page index is enabled, page boundary must be a record boundary (i.e. rep_level = 0), this prohibits page cut at certain values: https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_writer.cc#L1142-L1188 ########## cpp/src/parquet/column_chunker.h: ########## @@ -0,0 +1,168 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cmath> +#include <string> +#include <vector> +#include "arrow/array.h" +#include "parquet/level_conversion.h" + +namespace parquet { + +namespace internal { + +// Represents a chunk of data with level offsets and value offsets due to the +// record shredding for nested data. +struct Chunk { + int64_t level_offset; + int64_t value_offset; + int64_t levels_to_write; + + Chunk(int64_t level_offset, int64_t value_offset, int64_t levels_to_write) + : level_offset(level_offset), + value_offset(value_offset), + levels_to_write(levels_to_write) {} +}; + +/// CDC (Content-Defined Chunking) is a technique that divides data into variable-sized +/// chunks based on the content of the data itself, rather than using fixed-size +/// boundaries. +/// +/// For example, given this sequence of values in a column: +/// +/// File1: [1,2,3, 4,5,6, 7,8,9] +/// chunk1 chunk2 chunk3 +/// +/// Assume there is an inserted value between 3 and 4: +/// +/// File2: [1,2,3,0, 4,5,6, 7,8,9] +/// new-chunk chunk2 chunk3 +/// +/// The chunking process will adjust to maintain stable boundaries across data +/// modifications. Each chunk defines a new parquet data page which are contiguously +/// written out to the file. Since each page compressed independently, the files' contents +/// would look like the following with unique page identifiers: +/// +/// File1: [Page1][Page2][Page3]... +/// File2: [Page4][Page2][Page3]... +/// +/// Then the parquet file is being uploaded to a content addressable storage systems (CAS) +/// which split the bytes stream into content defined blobs. The CAS system will calculate +/// a unique identifier for each blob, then store the blob in a key-value store. If the +/// same blob is encountered again, the system can refer to the hash instead of physically +/// storing the blob again. In the example above, the CAS system would phiysically store +/// Page1, Page2, Page3, and Page4 only once and the required metadata to reassemble the +/// files. +/// While the deduplication is performed by the CAS system, the parquet chunker makes it +/// possible to efficiently deduplicate the data by consistently dividing the data into +/// chunks. +/// +/// Implementation details: +/// +/// Only the parquet writer must be aware of the content defined chunking, the reader +/// doesn't need to know about it. Each parquet column writer holds a +/// ContentDefinedChunker instance depending on the writer's properties. The chunker's +/// state is maintained across the entire column without being reset between pages and row +/// groups. +/// +/// The chunker receives the record shredded column data (def_levels, rep_levels, values) +/// and goes over the (def_level, rep_level, value) triplets one by one while adjusting +/// the column-global rolling hash based on the triplet. Whenever the rolling hash matches +/// a predefined mask, the chunker creates a new chunk. The chunker returns a vector of +/// Chunk objects that represent the boundaries of the chunks/// +/// Note that the boundaries are deterministically calculated exclusively based on the +/// data itself, so the same data will always produce the same chunks - given the same +/// chunker configuration. +/// +/// References: +/// - FastCDC paper: "FastCDC: a Fast and Efficient Content-Defined Chunking Approach for +/// Data Deduplication" +/// https://www.usenix.org/system/files/conference/atc16/atc16-paper-xia.pdf +class ContentDefinedChunker { + public: + /// Create a new ContentDefinedChunker instance + /// + /// @param level_info Information about definition and repetition levels + /// @param size_range Min/max chunk size as pair<min_size, max_size>, the chunker will + /// attempt to uniformly distribute the chunks between these extremes. + /// @param norm_factor Normalization factor to center the chunk size around the average + /// size more aggressively. By increasing the normalization factor, + /// probability of finding a chunk boundary increases. + ContentDefinedChunker(const LevelInfo& level_info, + std::pair<uint64_t, uint64_t> size_range, + uint8_t norm_factor = 0); + + /// Get the chunk boundaries for the given column data + /// + /// @param def_levels Definition levels + /// @param rep_levels Repetition levels + /// @param num_levels Number of levels + /// @param values Column values as an Arrow array + /// @return Vector of Chunk objects representing the chunk boundaries + const ::arrow::Result<std::vector<Chunk>> GetBoundaries(const int16_t* def_levels, + const int16_t* rep_levels, + int64_t num_levels, + const ::arrow::Array& values); Review Comment: For `::arrow::Array` we assume that CDC is not supported in the methods provided by TypedColumnWriter: https://github.com/apache/arrow/blob/main/cpp/src/parquet/column_writer.h#L196 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
