pitrou commented on code in PR #14351: URL: https://github.com/apache/arrow/pull/14351#discussion_r1046027914
########## cpp/src/parquet/column_writer.cc: ########## @@ -378,7 +381,12 @@ class SerializedPageWriter : public PageWriter { format::PageHeader page_header; page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size)); page_header.__set_compressed_page_size(static_cast<int32_t>(output_data_len)); - // TODO(PARQUET-594) crc checksum + + if (use_page_checksum_verification_ && page.type() == PageType::DATA_PAGE) { Review Comment: Same here: should also handle v2 data pages. ########## cpp/src/parquet/properties.h: ########## @@ -95,12 +95,18 @@ class PARQUET_EXPORT ReaderProperties { return file_decryption_properties_; } + bool use_page_checksum_verification() const { return use_page_checksum_verification_; } + void set_use_page_checksum_verification(bool check_crc) { + use_page_checksum_verification_ = check_crc; + } + private: MemoryPool* pool_; int64_t buffer_size_ = kDefaultBufferSize; int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit; int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit; bool buffered_stream_enabled_ = false; + bool use_page_checksum_verification_ = false; Review Comment: Let's make naming a bit more consistent ```suggestion bool data_page_checksum_verification_ = false; ``` ########## cpp/src/parquet/properties.h: ########## @@ -95,12 +95,18 @@ class PARQUET_EXPORT ReaderProperties { return file_decryption_properties_; } + bool use_page_checksum_verification() const { return use_page_checksum_verification_; } + void set_use_page_checksum_verification(bool check_crc) { + use_page_checksum_verification_ = check_crc; + } Review Comment: Perhaps rename these functions: * `bool data_page_checksum_verification()` * `void set_data_page_checksum_verification(bool is_enabled)` ########## cpp/src/parquet/reader_test.cc: ########## @@ -502,6 +514,140 @@ TEST_F(TestLocalFile, OpenWithMetadata) { ASSERT_EQ(metadata.get(), reader2->metadata().get()); } +TEST(TestDataPageV1Checksum, CorruptPage) { + // works when not checking crc. + { + auto reader = ParquetFileReader::OpenFile(data_page_v1_corrupt_checksum()); + auto metadata_ptr = reader->metadata(); + EXPECT_EQ(1U, metadata_ptr->num_row_groups()); + auto rg = reader->RowGroup(0); + auto column0 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(0)); + auto column1 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(1)); + EXPECT_NE(nullptr, column0); + EXPECT_NE(nullptr, column1); + const int kPageSize = 1024 * 10; + const int kMembers = kPageSize * 2 / sizeof(int32_t); + size_t column_a_size = 0; + size_t column_b_size = 0; + std::array<int32_t, 1024> values{}; + while (column0->HasNext()) { + int64_t values_read; + int64_t real_read = + column0->ReadBatch(1024, nullptr, nullptr, values.data(), &values_read); + EXPECT_EQ(real_read, values_read); + column_a_size += values_read; + } + + while (column1->HasNext()) { + int64_t values_read; + int64_t real_read = + column1->ReadBatch(1024, nullptr, nullptr, values.data(), &values_read); + EXPECT_EQ(real_read, values_read); + column_b_size += values_read; + } + + EXPECT_EQ(kMembers, column_a_size); + EXPECT_EQ(kMembers, column_b_size); + } + // check crc will read failed + { + ReaderProperties readerProperties; + readerProperties.set_use_page_checksum_verification(true); + auto reader = ParquetFileReader::OpenFile(data_page_v1_corrupt_checksum(), false, + readerProperties); + auto metadata_ptr = reader->metadata(); + EXPECT_EQ(1U, metadata_ptr->num_row_groups()); + auto rg = reader->RowGroup(0); + + auto column0 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(0)); + auto column1 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(1)); + EXPECT_NE(nullptr, column0); + EXPECT_NE(nullptr, column1); + + auto column_a_page_reader = rg->GetColumnPageReader(0); + auto column_b_page_reader = rg->GetColumnPageReader(1); + + EXPECT_THROW(column_a_page_reader->NextPage(), ParquetException); Review Comment: Is it possible to test the exception message as well? `ParquetException` is quite generic and could be throw for other reasons. ########## cpp/src/parquet/properties.h: ########## @@ -273,6 +280,11 @@ class PARQUET_EXPORT WriterProperties { return this; } + Builder* page_write_checksum_enabled(bool enable_checksum) { Review Comment: As with `enable_dictionary`/`disable_dictionary`, we probably want two functions: * `enable_data_page_checksum()` * `disable_data_page_checksum()` ########## cpp/src/parquet/reader_test.cc: ########## @@ -502,6 +514,140 @@ TEST_F(TestLocalFile, OpenWithMetadata) { ASSERT_EQ(metadata.get(), reader2->metadata().get()); } +TEST(TestDataPageV1Checksum, CorruptPage) { + // works when not checking crc. + { + auto reader = ParquetFileReader::OpenFile(data_page_v1_corrupt_checksum()); + auto metadata_ptr = reader->metadata(); + EXPECT_EQ(1U, metadata_ptr->num_row_groups()); + auto rg = reader->RowGroup(0); + auto column0 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(0)); + auto column1 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(1)); + EXPECT_NE(nullptr, column0); + EXPECT_NE(nullptr, column1); + const int kPageSize = 1024 * 10; + const int kMembers = kPageSize * 2 / sizeof(int32_t); + size_t column_a_size = 0; + size_t column_b_size = 0; + std::array<int32_t, 1024> values{}; + while (column0->HasNext()) { + int64_t values_read; + int64_t real_read = + column0->ReadBatch(1024, nullptr, nullptr, values.data(), &values_read); + EXPECT_EQ(real_read, values_read); + column_a_size += values_read; + } + + while (column1->HasNext()) { + int64_t values_read; + int64_t real_read = + column1->ReadBatch(1024, nullptr, nullptr, values.data(), &values_read); + EXPECT_EQ(real_read, values_read); + column_b_size += values_read; + } + + EXPECT_EQ(kMembers, column_a_size); + EXPECT_EQ(kMembers, column_b_size); + } + // check crc will read failed + { + ReaderProperties readerProperties; + readerProperties.set_use_page_checksum_verification(true); + auto reader = ParquetFileReader::OpenFile(data_page_v1_corrupt_checksum(), false, + readerProperties); + auto metadata_ptr = reader->metadata(); + EXPECT_EQ(1U, metadata_ptr->num_row_groups()); + auto rg = reader->RowGroup(0); + + auto column0 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(0)); + auto column1 = std::dynamic_pointer_cast<TypedColumnReader<Int32Type>>(rg->Column(1)); + EXPECT_NE(nullptr, column0); + EXPECT_NE(nullptr, column1); + + auto column_a_page_reader = rg->GetColumnPageReader(0); + auto column_b_page_reader = rg->GetColumnPageReader(1); + + EXPECT_THROW(column_a_page_reader->NextPage(), ParquetException); + EXPECT_NE(nullptr, column_b_page_reader->NextPage()); + EXPECT_THROW(column_b_page_reader->NextPage(), ParquetException); + } +} + +void testCheckCrc(const std::string& local_file_name) { + // works when not checking crc. + { Review Comment: Can we avoid copy/pasting code around? You can define a test fixture and put helper code in fixture methods. ########## cpp/src/parquet/file_deserialize_test.cc: ########## @@ -315,6 +325,127 @@ TEST_F(TestPageSerde, LZONotSupported) { ASSERT_THROW(InitSerializedPageReader(data_size, Compression::LZO), ParquetException); } +TEST_F(TestPageSerde, CrcEnabled) { Review Comment: Given that `file_reader.cc` already contains CRC read tests, I'm not sure theses tests are useful. What do you think? ########## cpp/src/parquet/column_writer.h: ########## @@ -88,7 +88,7 @@ class PARQUET_EXPORT PageWriter { int compression_level, ColumnChunkMetaDataBuilder* metadata, int16_t row_group_ordinal = -1, int16_t column_chunk_ordinal = -1, ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), - bool buffered_row_group = false, + bool buffered_row_group = false, bool page_write_checksum_enabled = false, Review Comment: You should add this parameter after all the others, so that API compatibility is not broken. ########## cpp/src/parquet/column_writer_test.cc: ########## @@ -645,6 +645,32 @@ TEST(TestWriter, NullValuesBuffer) { valid_bits_offset, values); } +TEST(TestWriter, EnableChecksum) { Review Comment: This is a bit too minimal IMHO. We should have roundtrip tests for both v1 and v2 data pages, to make sure that the writer generates checksums as expected by the reader. (ideally there's also a way to check that checksums are really written out...) ########## cpp/src/parquet/properties.h: ########## @@ -501,6 +515,8 @@ class PARQUET_EXPORT WriterProperties { inline std::string created_by() const { return parquet_created_by_; } + inline bool page_write_checksum_enabled() const { return page_write_checksum_enabled_; } Review Comment: ```suggestion inline bool data_page_checksum_enabled() const { return data_page_checksum_enabled_; } ``` ########## cpp/src/parquet/column_reader.cc: ########## @@ -400,6 +401,19 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() { ParquetException::EofException(ss.str()); } + const PageType::type page_type = LoadEnumSafe(¤t_page_header_.type); + + if (properties_.use_page_checksum_verification() && + page_type == PageType::DATA_PAGE && current_page_header_.__isset.crc) { Review Comment: Should we also accept `DATA_PAGE_V2` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org