This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 3586292 ARROW-9424: [C++][Parquet] Disable writing files with LZ4 codec 3586292 is described below commit 3586292d62c8c348e9fb85676eb524cde53179cf Author: Wes McKinney <w...@apache.org> AuthorDate: Tue Jul 14 21:39:47 2020 -0500 ARROW-9424: [C++][Parquet] Disable writing files with LZ4 codec Due to ongoing LZ4 problems with Parquet files, this patch disables writing files with LZ4 codec by throwing a `ParquetException`. In progress: adding exceptions for pyarrow when using LZ4 to write files and updating relevant pytests Mailing list discussion: https://mail-archives.apache.org/mod_mbox/arrow-dev/202007.mbox/%3CCAJPUwMCM4ZaJB720%2BuoM1aSA2oD9jSEnzuwWjJiw6vwXxHk7nw%40mail.gmail.com%3E Jira ticket: https://issues.apache.org/jira/browse/ARROW-9424 Closes #7757 from patrickpai/ARROW-9424 Lead-authored-by: Wes McKinney <w...@apache.org> Co-authored-by: Patrick Pai <patrick.m....@gmail.com> Signed-off-by: Wes McKinney <w...@apache.org> --- cpp/src/parquet/column_reader.cc | 2 +- cpp/src/parquet/column_writer.cc | 2 +- cpp/src/parquet/column_writer_test.cc | 10 ++++++---- cpp/src/parquet/file_deserialize_test.cc | 5 ++--- cpp/src/parquet/file_serialize_test.cc | 2 +- cpp/src/parquet/thrift_internal.h | 1 + cpp/src/parquet/types.cc | 33 ++++++++++++++++++++++++++++---- cpp/src/parquet/types.h | 9 +++++++++ python/pyarrow/tests/test_parquet.py | 16 ++++++++++++++-- 9 files changed, 64 insertions(+), 16 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 0bfc303..bc462ad 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -182,7 +182,7 @@ class SerializedPageReader : public PageReader { InitDecryption(); } max_page_header_size_ = kDefaultMaxPageHeaderSize; - decompressor_ = GetCodec(codec); + decompressor_ = internal::GetReadCodec(codec); } // Implement the PageReader interface diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc index 13f91e3..f9cf37c 100644 --- a/cpp/src/parquet/column_writer.cc +++ b/cpp/src/parquet/column_writer.cc @@ -172,7 +172,7 @@ class SerializedPageWriter : public PageWriter { if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) { InitEncryption(); } - compressor_ = GetCodec(codec, compression_level); + compressor_ = internal::GetWriteCodec(codec, compression_level); thrift_serializer_.reset(new ThriftSerializer); } diff --git a/cpp/src/parquet/column_writer_test.cc b/cpp/src/parquet/column_writer_test.cc index 23554aa..a92d4d2 100644 --- a/cpp/src/parquet/column_writer_test.cc +++ b/cpp/src/parquet/column_writer_test.cc @@ -488,13 +488,15 @@ TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) { #ifdef ARROW_WITH_LZ4 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithLz4Compression) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, false, - LARGE_SIZE); + ASSERT_THROW(this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, + false, LARGE_SIZE), + ParquetException); } TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndLz4Compression) { - this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, true, - LARGE_SIZE); + ASSERT_THROW(this->TestRequiredWithSettings(Encoding::PLAIN, Compression::LZ4, false, + true, LARGE_SIZE), + ParquetException); } #endif diff --git a/cpp/src/parquet/file_deserialize_test.cc b/cpp/src/parquet/file_deserialize_test.cc index 3fe2230..1dd3492 100644 --- a/cpp/src/parquet/file_deserialize_test.cc +++ b/cpp/src/parquet/file_deserialize_test.cc @@ -249,9 +249,8 @@ TEST_F(TestPageSerde, Compression) { codec_types.push_back(Compression::GZIP); #endif -#ifdef ARROW_WITH_LZ4 - codec_types.push_back(Compression::LZ4); -#endif + // TODO: Add LZ4 compression type after PARQUET-1878 is complete. + // Testing for deserializing LZ4 is hard without writing enabled, so it is not included. #ifdef ARROW_WITH_ZSTD codec_types.push_back(Compression::ZSTD); diff --git a/cpp/src/parquet/file_serialize_test.cc b/cpp/src/parquet/file_serialize_test.cc index c5c4df2..72d7d6f 100644 --- a/cpp/src/parquet/file_serialize_test.cc +++ b/cpp/src/parquet/file_serialize_test.cc @@ -309,7 +309,7 @@ TYPED_TEST(TestSerialize, SmallFileGzip) { #ifdef ARROW_WITH_LZ4 TYPED_TEST(TestSerialize, SmallFileLz4) { - ASSERT_NO_FATAL_FAILURE(this->FileSerializeTest(Compression::LZ4)); + ASSERT_THROW(this->FileSerializeTest(Compression::LZ4), ParquetException); } #endif diff --git a/cpp/src/parquet/thrift_internal.h b/cpp/src/parquet/thrift_internal.h index 103dc650..e1959e7 100644 --- a/cpp/src/parquet/thrift_internal.h +++ b/cpp/src/parquet/thrift_internal.h @@ -101,6 +101,7 @@ static inline Compression::type FromThriftUnsafe(format::CompressionCodec::type case format::CompressionCodec::BROTLI: return Compression::BROTLI; case format::CompressionCodec::LZ4: + // ARROW-9424: Existing files use LZ4_RAW but this may need to change return Compression::LZ4; case format::CompressionCodec::ZSTD: return Compression::ZSTD; diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index 7e22da4..1b4bb4a 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -49,12 +49,19 @@ bool IsCodecSupported(Compression::type codec) { } } -std::unique_ptr<Codec> GetCodec(Compression::type codec) { - return GetCodec(codec, Codec::UseDefaultCompressionLevel()); -} +namespace internal { -std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) { +std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level, + bool for_writing) { std::unique_ptr<Codec> result; + if (for_writing && (codec == Compression::LZ4 || codec == Compression::LZ4_FRAME)) { + throw ParquetException( + "Per ARROW-9424, writing files with LZ4 compression has been " + "disabled until implementation issues have been resolved. " + "It is recommended to read any existing files and rewrite them " + "using a different compression."); + } + if (!IsCodecSupported(codec)) { std::stringstream ss; ss << "Codec type " << Codec::GetCodecAsString(codec) @@ -66,6 +73,24 @@ std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) return result; } +std::unique_ptr<Codec> GetReadCodec(Compression::type codec) { + return GetCodec(codec, Codec::UseDefaultCompressionLevel(), /*for_writing=*/false); +} + +std::unique_ptr<Codec> GetWriteCodec(Compression::type codec, int compression_level) { + return GetCodec(codec, compression_level, /*for_writing=*/true); +} + +} // namespace internal + +std::unique_ptr<Codec> GetCodec(Compression::type codec, int compression_level) { + return internal::GetCodec(codec, compression_level, /*for_writing=*/false); +} + +std::unique_ptr<Codec> GetCodec(Compression::type codec) { + return GetCodec(codec, Codec::UseDefaultCompressionLevel()); +} + std::string FormatStatValue(Type::type parquet_type, ::arrow::util::string_view val) { std::stringstream result; diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index a1586b6..f7f7bb7 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -465,6 +465,15 @@ struct Encoding { PARQUET_EXPORT bool IsCodecSupported(Compression::type codec); +namespace internal { + +// ARROW-9424: Separate functions for reading and writing so we can disable LZ4 +// on writing +std::unique_ptr<Codec> GetReadCodec(Compression::type codec); +std::unique_ptr<Codec> GetWriteCodec(Compression::type codec, int compression_level); + +} // namespace internal + PARQUET_EXPORT std::unique_ptr<Codec> GetCodec(Compression::type codec); diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 410eee1..aea3c07 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -698,6 +698,10 @@ def test_pandas_parquet_pyfile_roundtrip(tempdir, use_legacy_dataset): tm.assert_frame_equal(df, df_read) +# ARROW-9424: LZ4 support is currently disabled +SUPPORTED_COMPRESSIONS = ['NONE', 'SNAPPY', 'GZIP', 'ZSTD'] + + @pytest.mark.pandas @parametrize_legacy_dataset def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset): @@ -735,7 +739,7 @@ def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset): df_read = table_read.to_pandas() tm.assert_frame_equal(df, df_read) - for compression in ['NONE', 'SNAPPY', 'GZIP', 'LZ4', 'ZSTD']: + for compression in SUPPORTED_COMPRESSIONS: if (compression != 'NONE' and not pa.lib.Codec.is_available(compression)): continue @@ -747,6 +751,13 @@ def test_pandas_parquet_configuration_options(tempdir, use_legacy_dataset): tm.assert_frame_equal(df, df_read) +# ARROW-9424: LZ4 support is currently disabled +def test_lz4_compression_disabled(): + table = pa.table([pa.array([1, 2, 3, 4, 5])], names=['f0']) + with pytest.raises(IOError): + pq.write_table(table, pa.BufferOutputStream(), compression='lz4') + + def make_sample_file(table_or_df): if isinstance(table_or_df, pa.Table): a_table = table_or_df @@ -828,8 +839,9 @@ def test_compression_level(use_legacy_dataset): # level. # GZIP (zlib) allows for specifying a compression level but as of up # to version 1.2.11 the valid range is [-1, 9]. - invalid_combinations = [("snappy", 4), ("lz4", 5), ("gzip", -1337), + invalid_combinations = [("snappy", 4), ("gzip", -1337), ("None", 444), ("lzo", 14)] + # ARROW-9424: lz4 is disabled for now ("lz4", 5), buf = io.BytesIO() for (codec, level) in invalid_combinations: with pytest.raises((ValueError, OSError)):