Repository: parquet-cpp Updated Branches: refs/heads/master c55dc22b9 -> 491182c22
PARQUET-1044: Use compression libraries from Apache Arrow Depends on https://github.com/apache/arrow/pull/771; will update Arrow version once that's merged Author: Wes McKinney <wes.mckin...@twosigma.com> Closes #362 from wesm/move-compression-to-arrow and squashes the following commits: b055b69 [Wes McKinney] Check status, do not use /WX with Arrow EP build on MSVC 6e9b9cd [Wes McKinney] Fixes for macOS and Windows 6d39a39 [Wes McKinney] Fix function name a0bfa95 [Wes McKinney] Untabify file e095b10 [Wes McKinney] Use compression toolchain from Arrow Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/491182c2 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/491182c2 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/491182c2 Branch: refs/heads/master Commit: 491182c2293e28fa827ef63d3d4012765278aa64 Parents: c55dc22 Author: Wes McKinney <wes.mckin...@twosigma.com> Authored: Sun Jun 25 11:15:45 2017 -0400 Committer: Wes McKinney <wes.mckin...@twosigma.com> Committed: Sun Jun 25 11:15:45 2017 -0400 ---------------------------------------------------------------------- CMakeLists.txt | 22 +- README.md | 8 +- benchmarks/CMakeLists.txt | 5 +- benchmarks/decode_benchmark.cc | 15 +- ci/before_script_travis.sh | 2 - ci/msvc-build.bat | 2 +- ci/travis_script_cpp.sh | 3 + ci/travis_script_toolchain.sh | 3 +- cmake_modules/ThirdpartyToolchain.cmake | 210 +++-------------- src/parquet/CMakeLists.txt | 3 - src/parquet/compression-test.cc | 84 ------- src/parquet/compression.cc | 311 ------------------------- src/parquet/compression.h | 106 --------- src/parquet/exception.h | 25 ++ src/parquet/file/file-deserialize-test.cc | 14 +- src/parquet/file/metadata.h | 1 - src/parquet/file/reader-internal.cc | 11 +- src/parquet/file/reader-internal.h | 8 +- src/parquet/file/writer-internal.cc | 12 +- src/parquet/file/writer-internal.h | 8 +- src/parquet/parquet_version.h | 2 +- src/parquet/schema.cc | 8 +- src/parquet/types.h | 12 +- src/parquet/util/memory.h | 51 ++-- 24 files changed, 144 insertions(+), 782 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index c8047bf..d0c1a53 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -107,9 +107,6 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") option(PARQUET_BUILD_EXECUTABLES "Build the libparquet executable CLI tools" ON) - option(PARQUET_ZLIB_VENDORED - "Build our own zlib (some libz.a aren't configured for static linking)" - ON) option(PARQUET_RPATH_ORIGIN "Build Parquet libraries with RPATH set to \$ORIGIN" OFF) @@ -119,14 +116,8 @@ if ("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") if (MSVC) set(ARROW_MSVC_STATIC_LIB_SUFFIX "_static" CACHE STRING "Arrow static lib suffix used on Windows with MSVC (default _static)") - set(BROTLI_MSVC_STATIC_LIB_SUFFIX "_static" CACHE STRING - "Brotli static lib suffix used on Windows with MSVC (default _static)") - set(SNAPPY_MSVC_STATIC_LIB_SUFFIX "" CACHE STRING - "Snappy static lib suffix used on Windows with MSVC (default is empty string)") set(THRIFT_MSVC_STATIC_LIB_SUFFIX "md" CACHE STRING "Thrift static lib suffix used on Windows with MSVC (default md)") - set(ZLIB_MSVC_STATIC_LIB_SUFFIX "libstatic" CACHE STRING - "Zlib static lib suffix used on Windows with MSVC (default libstatic)") endif() endif() @@ -618,8 +609,6 @@ set(LIBPARQUET_SRCS src/parquet/column/scan-all.cc src/parquet/column/statistics.cc - src/parquet/compression.cc - src/parquet/file/metadata.cc src/parquet/file/printer.cc src/parquet/file/reader.cc @@ -636,15 +625,6 @@ set(LIBPARQUET_SRCS src/parquet/util/memory.cc ) -set(BUNDLED_STATIC_LIBS - brotlistatic_dec - brotlistatic_enc - brotlistatic_common - snappystatic - thriftstatic - zlibstatic -) - # # Ensure that thrift compilation is done before using its generated headers # # in parquet code. add_custom_target(thrift-deps ALL @@ -659,7 +639,7 @@ if (NOT PARQUET_MINIMAL_DEPENDENCY) set(LIBPARQUET_INTERFACE_LINK_LIBS ${ARROW_LINK_LIBS} ${BOOST_LINK_LIBS} - ${BUNDLED_STATIC_LIBS} + thriftstatic ) # Although we don't link parquet_objlib against anything, we need it to depend # on these libs as we may generate their headers via ExternalProject_Add http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 759717a..770f0ca 100644 --- a/README.md +++ b/README.md @@ -66,10 +66,7 @@ Check [Windows developer guide][1] for instructions to build parquet-cpp on Wind ## Third Party Dependencies -- Apache Arrow (memory management, built-in IO, optional Array adapters) -- snappy -- zlib -- brotli +- Apache Arrow (memory management, compression, IO, optional columnar data adapters) - Thrift 0.7+ [install instructions](https://thrift.apache.org/docs/install/) - googletest 1.7.0 (cannot be installed with package managers) - Google Benchmark (only required if building benchmarks) @@ -92,9 +89,6 @@ thirdparty toolchain that parquet-cpp builds automatically. - ARROW_HOME customizes the Apache Arrow installed location. - THRIFT_HOME customizes the Apache Thrift (C++ libraries and compiler installed location. - - SNAPPY_HOME customizes the Snappy installed location. - - ZLIB_HOME customizes the zlib installed location. - - BROTLI_HOME customizes the Brotli installed location. - GTEST_HOME customizes the googletest installed location (if you are building the unit tests). - GBENCHMARK_HOME customizes the Google Benchmark installed location (if http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/benchmarks/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index 2ef8113..61c1491 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -16,7 +16,6 @@ # under the License. SET(LINK_LIBS - snappystatic thriftstatic) if (PARQUET_BUILD_BENCHMARKS) @@ -24,6 +23,6 @@ if (PARQUET_BUILD_BENCHMARKS) # This uses private APIs target_link_libraries(decode_benchmark ${LINK_LIBS} - parquet_static) - + arrow + parquet_static) endif() http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/benchmarks/decode_benchmark.cc ---------------------------------------------------------------------- diff --git a/benchmarks/decode_benchmark.cc b/benchmarks/decode_benchmark.cc index dc687b6..65a6afd 100644 --- a/benchmarks/decode_benchmark.cc +++ b/benchmarks/decode_benchmark.cc @@ -19,8 +19,10 @@ #include <random> #include <stdio.h> -#include "parquet/compression.h" +#include "arrow/util/compression.h" + #include "parquet/encoding-internal.h" +#include "parquet/util/logging.h" #include "parquet/util/stopwatch.h" /** @@ -277,7 +279,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const std::vector<int64_t>& printf("%s rate (batch size = %2d): %0.3fM per second.\n", NAME, BATCH_SIZE, \ mult / elapsed); -void TestPlainIntCompressed(parquet::Codec* codec, const std::vector<int64_t>& data, +void TestPlainIntCompressed(::arrow::Codec* codec, const std::vector<int64_t>& data, int num_iters, int batch_size) { const uint8_t* raw_data = reinterpret_cast<const uint8_t*>(&data[0]); int uncompressed_len = data.size() * sizeof(int64_t); @@ -285,11 +287,12 @@ void TestPlainIntCompressed(parquet::Codec* codec, const std::vector<int64_t>& d int max_compressed_size = codec->MaxCompressedLen(uncompressed_len, raw_data); uint8_t* compressed_data = new uint8_t[max_compressed_size]; - int compressed_len = - codec->Compress(uncompressed_len, raw_data, max_compressed_size, compressed_data); + int64_t compressed_len; + DCHECK(codec->Compress(uncompressed_len, raw_data, max_compressed_size, + compressed_data, &compressed_len).ok()); printf("\n%s:\n Uncompressed len: %d\n Compressed len: %d\n", codec->name(), - uncompressed_len, compressed_len); + uncompressed_len, static_cast<int>(compressed_len)); double mult = num_iters * data.size() * 1000.; parquet::StopWatch sw; @@ -446,7 +449,7 @@ int main(int argc, char** argv) { TestBinaryPackedEncoding("Rand 0-10K", values, 100, 32); TestBinaryPackedEncoding("Rand 0-10K", values, 100, 64); - parquet::SnappyCodec snappy_codec; + ::arrow::SnappyCodec snappy_codec; TestPlainIntCompressed(&snappy_codec, values, 100, 1); TestPlainIntCompressed(&snappy_codec, values, 100, 16); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/ci/before_script_travis.sh ---------------------------------------------------------------------- diff --git a/ci/before_script_travis.sh b/ci/before_script_travis.sh index 2ddf2ec..5b33a9b 100755 --- a/ci/before_script_travis.sh +++ b/ci/before_script_travis.sh @@ -30,11 +30,9 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then cmake -DPARQUET_CXXFLAGS=-Werror \ -DPARQUET_TEST_MEMCHECK=ON \ -DPARQUET_BUILD_BENCHMARKS=ON \ - -DPARQUET_ARROW_LINKAGE=static \ -DPARQUET_GENERATE_COVERAGE=1 \ $TRAVIS_BUILD_DIR else cmake -DPARQUET_CXXFLAGS=-Werror \ - -DPARQUET_ARROW_LINKAGE=static \ $TRAVIS_BUILD_DIR fi http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/ci/msvc-build.bat ---------------------------------------------------------------------- diff --git a/ci/msvc-build.bat b/ci/msvc-build.bat index 9c3f8c1..ad7a7b6 100644 --- a/ci/msvc-build.bat +++ b/ci/msvc-build.bat @@ -30,13 +30,13 @@ if NOT "%CONFIGURATION%" == "Debug" ( if "%CONFIGURATION%" == "Toolchain" ( conda install -y boost-cpp=1.63 brotli=0.6.0 zlib=1.2.11 snappy=1.1.4 thrift-cpp=0.10.0 -c conda-forge + set ARROW_BUILD_TOOLCHAIN=%MINICONDA%/Library set PARQUET_BUILD_TOOLCHAIN=%MINICONDA%/Library cmake -G "%GENERATOR%" ^ -DCMAKE_BUILD_TYPE=Release ^ -DPARQUET_BOOST_USE_SHARED=OFF ^ -DPARQUET_CXXFLAGS=%PARQUET_CXXFLAGS% ^ - -DPARQUET_ZLIB_VENDORED=OFF ^ .. || exit /B cmake --build . --config Release || exit /B http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/ci/travis_script_cpp.sh ---------------------------------------------------------------------- diff --git a/ci/travis_script_cpp.sh b/ci/travis_script_cpp.sh index 3cd64eb..61d1a51 100755 --- a/ci/travis_script_cpp.sh +++ b/ci/travis_script_cpp.sh @@ -38,6 +38,9 @@ if [ $TRAVIS_OS_NAME == "linux" ]; then $TRAVIS_BUILD_DIR/ci/upload_coverage.sh else make -j4 || exit 1 + BUILD_TYPE=debug + EXECUTABLE_DIR=$CPP_BUILD_DIR/$BUILD_TYPE + export LD_LIBRARY_PATH=$EXECUTABLE_DIR:$LD_LIBRARY_PATH ctest -VV -L unittest || { cat $TRAVIS_BUILD_DIR/parquet-build/Testing/Temporary/LastTest.log; exit 1; } fi http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/ci/travis_script_toolchain.sh ---------------------------------------------------------------------- diff --git a/ci/travis_script_toolchain.sh b/ci/travis_script_toolchain.sh index d8b5890..afa3eec 100755 --- a/ci/travis_script_toolchain.sh +++ b/ci/travis_script_toolchain.sh @@ -40,7 +40,7 @@ conda config --set remote_connect_timeout_secs 12 conda info -a conda create -y -q -p $CPP_TOOLCHAIN \ - boost-cpp zlib thrift-cpp snappy brotli cmake git \ + boost-cpp thrift-cpp cmake git \ -c conda-forge # ---------------------------------------------------------------------- @@ -53,7 +53,6 @@ export BOOST_ROOT=$CPP_TOOLCHAIN cmake -DPARQUET_CXXFLAGS=-Werror \ -DPARQUET_TEST_MEMCHECK=ON \ - -DPARQUET_ZLIB_VENDORED=off \ -DPARQUET_GENERATE_COVERAGE=1 \ $TRAVIS_BUILD_DIR http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 7f65786..5c4e565 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -17,9 +17,7 @@ set(GTEST_VERSION "1.8.0") set(GBENCHMARK_VERSION "1.1.0") -set(SNAPPY_VERSION "1.1.3") set(THRIFT_VERSION "0.10.0") -set(BROTLI_VERSION "v0.6.0") string(TOUPPER ${CMAKE_BUILD_TYPE} UPPERCASE_BUILD_TYPE) @@ -37,9 +35,6 @@ endif() if (NOT "$ENV{PARQUET_BUILD_TOOLCHAIN}" STREQUAL "") set(THRIFT_HOME "$ENV{PARQUET_BUILD_TOOLCHAIN}") - set(SNAPPY_HOME "$ENV{PARQUET_BUILD_TOOLCHAIN}") - set(ZLIB_HOME "$ENV{PARQUET_BUILD_TOOLCHAIN}") - set(BROTLI_HOME "$ENV{PARQUET_BUILD_TOOLCHAIN}") set(ARROW_HOME "$ENV{PARQUET_BUILD_TOOLCHAIN}") if (NOT DEFINED ENV{BOOST_ROOT}) @@ -53,18 +48,6 @@ if (DEFINED ENV{THRIFT_HOME}) set(THRIFT_HOME "$ENV{THRIFT_HOME}") endif() -if (DEFINED ENV{SNAPPY_HOME}) - set(SNAPPY_HOME "$ENV{SNAPPY_HOME}") -endif() - -if (DEFINED ENV{ZLIB_HOME}) - set(ZLIB_HOME "$ENV{ZLIB_HOME}") -endif() - -if (DEFINED ENV{BROTLI_HOME}) - set(BROTLI_HOME "$ENV{BROTLI_HOME}") -endif() - if (DEFINED ENV{ARROW_HOME}) set(ARROW_HOME "$ENV{ARROW_HOME}") endif() @@ -126,45 +109,33 @@ set(LIBS ${LIBS} ${Boost_LIBRARIES}) # ---------------------------------------------------------------------- # ZLIB -if (NOT PARQUET_ZLIB_VENDORED) - find_package(ZLIB) -endif() - -if (NOT ZLIB_FOUND) - set(ZLIB_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/zlib_ep/src/zlib_ep-install") - set(ZLIB_HOME "${ZLIB_PREFIX}") - set(ZLIB_INCLUDE_DIR "${ZLIB_PREFIX}/include") - if (MSVC) - if (${UPPERCASE_BUILD_TYPE} STREQUAL "DEBUG") - set(ZLIB_STATIC_LIB_NAME zlibstaticd.lib) - else() - set(ZLIB_STATIC_LIB_NAME zlibstatic.lib) - endif() +set(ZLIB_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/zlib_ep/src/zlib_ep-install") +set(ZLIB_HOME "${ZLIB_PREFIX}") +set(ZLIB_INCLUDE_DIR "${ZLIB_PREFIX}/include") +if (MSVC) + if (${UPPERCASE_BUILD_TYPE} STREQUAL "DEBUG") + set(ZLIB_STATIC_LIB_NAME zlibstaticd.lib) else() - set(ZLIB_STATIC_LIB_NAME libz.a) + set(ZLIB_STATIC_LIB_NAME zlibstatic.lib) endif() - set(ZLIB_STATIC_LIB "${ZLIB_PREFIX}/lib/${ZLIB_STATIC_LIB_NAME}") - set(ZLIB_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} - -DCMAKE_INSTALL_PREFIX=${ZLIB_PREFIX} - -DCMAKE_C_FLAGS=${EP_C_FLAGS} - -DBUILD_SHARED_LIBS=OFF) - ExternalProject_Add(zlib_ep - URL "http://zlib.net/fossils/zlib-1.2.8.tar.gz" - BUILD_BYPRODUCTS "${ZLIB_STATIC_LIB}" - ${ZLIB_BUILD_BYPRODUCTS} - CMAKE_ARGS ${ZLIB_CMAKE_ARGS}) - set(ZLIB_VENDORED 1) else() - set(ZLIB_VENDORED 0) + set(ZLIB_STATIC_LIB_NAME libz.a) endif() +set(ZLIB_STATIC_LIB "${ZLIB_PREFIX}/lib/${ZLIB_STATIC_LIB_NAME}") +set(ZLIB_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} + -DCMAKE_INSTALL_PREFIX=${ZLIB_PREFIX} + -DCMAKE_C_FLAGS=${EP_C_FLAGS} + -DBUILD_SHARED_LIBS=OFF) +ExternalProject_Add(zlib_ep + URL "http://zlib.net/fossils/zlib-1.2.8.tar.gz" + BUILD_BYPRODUCTS "${ZLIB_STATIC_LIB}" + ${ZLIB_BUILD_BYPRODUCTS} + CMAKE_ARGS ${ZLIB_CMAKE_ARGS}) include_directories(SYSTEM ${ZLIB_INCLUDE_DIR}) add_library(zlibstatic STATIC IMPORTED) set_target_properties(zlibstatic PROPERTIES IMPORTED_LOCATION ${ZLIB_STATIC_LIB}) - -if (ZLIB_VENDORED) - add_dependencies(zlibstatic zlib_ep) -endif() +add_dependencies(zlibstatic zlib_ep) # ---------------------------------------------------------------------- # Thrift @@ -220,9 +191,7 @@ if (NOT THRIFT_FOUND) "-DWITH_SHARED_LIB=OFF" "-DWITH_PLUGIN=OFF" ${THRIFT_CMAKE_ARGS}) - if (ZLIB_VENDORED) - set(THRIFT_DEPENDENCIES ${THRIFT_DEPENDENCIES} zlib_ep) - endif() + set(THRIFT_DEPENDENCIES ${THRIFT_DEPENDENCIES} zlib_ep) endif() ExternalProject_Add(thrift_ep @@ -247,127 +216,6 @@ if (THRIFT_VENDORED) add_dependencies(thriftstatic thrift_ep) endif() -# ---------------------------------------------------------------------- -# Snappy - -## Snappy -find_package(Snappy) -if (NOT SNAPPY_FOUND) - set(SNAPPY_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/snappy_ep/src/snappy_ep-install") - set(SNAPPY_HOME "${SNAPPY_PREFIX}") - set(SNAPPY_INCLUDE_DIR "${SNAPPY_PREFIX}/include") - if (MSVC) - set(SNAPPY_STATIC_LIB_NAME snappystatic) - else() - set(SNAPPY_STATIC_LIB_NAME snappy) - endif() - set(SNAPPY_STATIC_LIB "${SNAPPY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${SNAPPY_STATIC_LIB_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(SNAPPY_SRC_URL "https://github.com/google/snappy/releases/download/${SNAPPY_VERSION}/snappy-${SNAPPY_VERSION}.tar.gz") - - if (${UPPERCASE_BUILD_TYPE} EQUAL "RELEASE") - if (APPLE) - set(SNAPPY_CXXFLAGS "CXXFLAGS='-DNDEBUG -O1'") - else() - set(SNAPPY_CXXFLAGS "CXXFLAGS='-DNDEBUG -O2'") - endif() - endif() - - if (MSVC) - set(SNAPPY_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} - "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}" - "-DCMAKE_C_FLAGS=${EX_C_FLAGS}" - "-DCMAKE_INSTALL_PREFIX=${SNAPPY_PREFIX}") - set(SNAPPY_UPDATE_COMMAND ${CMAKE_COMMAND} -E copy - ${CMAKE_SOURCE_DIR}/cmake_modules/SnappyCMakeLists.txt - ./CMakeLists.txt && - ${CMAKE_COMMAND} -E copy - ${CMAKE_SOURCE_DIR}/cmake_modules/SnappyConfig.h - ./config.h) - ExternalProject_Add(snappy_ep - UPDATE_COMMAND ${SNAPPY_UPDATE_COMMAND} - BUILD_IN_SOURCE 1 - BUILD_COMMAND ${MAKE} - INSTALL_DIR ${SNAPPY_PREFIX} - URL ${SNAPPY_SRC_URL} - CMAKE_ARGS ${SNAPPY_CMAKE_ARGS} - BUILD_BYPRODUCTS "${SNAPPY_STATIC_LIB}") - else() - ExternalProject_Add(snappy_ep - CONFIGURE_COMMAND ./configure --with-pic "--prefix=${SNAPPY_PREFIX}" ${SNAPPY_CXXFLAGS} - BUILD_IN_SOURCE 1 - BUILD_COMMAND ${MAKE} - INSTALL_DIR ${SNAPPY_PREFIX} - URL ${SNAPPY_SRC_URL} - BUILD_BYPRODUCTS "${SNAPPY_STATIC_LIB}") - endif() - set(SNAPPY_VENDORED 1) -else() - set(SNAPPY_VENDORED 0) -endif() - -include_directories(SYSTEM ${SNAPPY_INCLUDE_DIR}) -add_library(snappystatic STATIC IMPORTED) -set_target_properties(snappystatic PROPERTIES IMPORTED_LOCATION ${SNAPPY_STATIC_LIB}) - -if (SNAPPY_VENDORED) - add_dependencies(snappystatic snappy_ep) -endif() - -# ---------------------------------------------------------------------- -# Brotli - -find_package(Brotli) -if (NOT BROTLI_FOUND) - set(BROTLI_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/brotli_ep/src/brotli_ep-install") - set(BROTLI_HOME "${BROTLI_PREFIX}") - set(BROTLI_INCLUDE_DIR "${BROTLI_PREFIX}/include") - if (MSVC) - set(BROTLI_LIB_DIR bin) - else() - set(BROTLI_LIB_DIR lib) - endif() - set(BROTLI_STATIC_LIBRARY_ENC "${BROTLI_PREFIX}/${BROTLI_LIB_DIR}/${CMAKE_LIBRARY_ARCHITECTURE}/${CMAKE_STATIC_LIBRARY_PREFIX}brotlienc${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(BROTLI_STATIC_LIBRARY_DEC "${BROTLI_PREFIX}/${BROTLI_LIB_DIR}/${CMAKE_LIBRARY_ARCHITECTURE}/${CMAKE_STATIC_LIBRARY_PREFIX}brotlidec${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(BROTLI_STATIC_LIBRARY_COMMON "${BROTLI_PREFIX}/${BROTLI_LIB_DIR}/${CMAKE_LIBRARY_ARCHITECTURE}/${CMAKE_STATIC_LIBRARY_PREFIX}brotlicommon${CMAKE_STATIC_LIBRARY_SUFFIX}") - set(BROTLI_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} - "-DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS}" - "-DCMAKE_C_FLAGS=${EX_C_FLAGS}" - -DCMAKE_INSTALL_PREFIX=${BROTLI_PREFIX} - -DCMAKE_INSTALL_LIBDIR=lib/${CMAKE_LIBRARY_ARCHITECTURE} - -DBUILD_SHARED_LIBS=OFF) - - ExternalProject_Add(brotli_ep - URL "https://github.com/google/brotli/archive/${BROTLI_VERSION}.tar.gz" - BUILD_BYPRODUCTS "${BROTLI_STATIC_LIBRARY_ENC}" "${BROTLI_STATIC_LIBRARY_DEC}" "${BROTLI_STATIC_LIBRARY_COMMON}" - CMAKE_ARGS ${BROTLI_CMAKE_ARGS} - STEP_TARGETS headers_copy) - if (MSVC) - ExternalProject_Get_Property(brotli_ep SOURCE_DIR) - - ExternalProject_Add_Step(brotli_ep headers_copy - COMMAND xcopy /E /I include ..\\..\\..\\brotli_ep\\src\\brotli_ep-install\\include /Y - DEPENDEES build - WORKING_DIRECTORY ${SOURCE_DIR}) - endif() - set(BROTLI_VENDORED 1) -else() - set(BROTLI_VENDORED 0) -endif() - -include_directories(SYSTEM ${BROTLI_INCLUDE_DIR}) -add_library(brotlistatic_enc STATIC IMPORTED) -set_target_properties(brotlistatic_enc PROPERTIES IMPORTED_LOCATION ${BROTLI_STATIC_LIBRARY_ENC}) -add_library(brotlistatic_dec STATIC IMPORTED) -set_target_properties(brotlistatic_dec PROPERTIES IMPORTED_LOCATION ${BROTLI_STATIC_LIBRARY_DEC}) -add_library(brotlistatic_common STATIC IMPORTED) -set_target_properties(brotlistatic_common PROPERTIES IMPORTED_LOCATION ${BROTLI_STATIC_LIBRARY_COMMON}) - -if (BROTLI_VENDORED) - add_dependencies(brotlistatic_enc brotli_ep) - add_dependencies(brotlistatic_dec brotli_ep) - add_dependencies(brotlistatic_common brotli_ep) -endif() - ## GTest if(PARQUET_BUILD_TESTS AND NOT IGNORE_OPTIONAL_PACKAGES) add_custom_target(unittest ctest -L unittest) @@ -469,13 +317,11 @@ endif() find_package(Arrow) if (NOT ARROW_FOUND) - set(ARROW_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/arrow_ep/src/arrow_ep-install") - set(ARROW_HOME "${ARROW_PREFIX}") + set(ARROW_PREFIX "${BUILD_OUTPUT_ROOT_DIRECTORY}") set(ARROW_INCLUDE_DIR "${ARROW_PREFIX}/include") - set(ARROW_LIB_DIR "${ARROW_PREFIX}/lib") - set(ARROW_BIN_DIR "${ARROW_PREFIX}/bin") + set(ARROW_LIB_DIR "${ARROW_PREFIX}") if (MSVC) - set(ARROW_SHARED_LIB "${ARROW_BIN_DIR}/arrow.dll") + set(ARROW_SHARED_LIB "${ARROW_PREFIX}/bin/arrow.dll") set(ARROW_SHARED_IMPLIB "${ARROW_LIB_DIR}/arrow.lib") set(ARROW_STATIC_LIB "${ARROW_LIB_DIR}/arrow_static.lib") else() @@ -485,8 +331,8 @@ if (NOT ARROW_FOUND) set(ARROW_CMAKE_ARGS -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} - -DCMAKE_CXX_FLAGS=${CMAKE_CXX_FLAGS} - -DCMAKE_C_FLAGS=${CMAKE_C_FLAGS} + -DCMAKE_CXX_FLAGS=${EP_CXX_FLAGS} + -DCMAKE_C_FLAGS=${EP_C_FLAGS} -DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX} -DCMAKE_INSTALL_LIBDIR=${ARROW_LIB_DIR} -DARROW_JEMALLOC=OFF @@ -494,12 +340,8 @@ if (NOT ARROW_FOUND) -DARROW_BOOST_USE_SHARED=${PARQUET_BOOST_USE_SHARED} -DARROW_BUILD_TESTS=OFF) - if (MSVC) - set(ARROW_CMAKE_ARGS -DARROW_CXXFLAGS="/WX" ${ARROW_CMAKE_ARGS}) - endif() - if ("$ENV{PARQUET_ARROW_VERSION}" STREQUAL "") - set(ARROW_VERSION "e209e5865ea58e57925cae24d4bf3f63d58ee21d") + set(ARROW_VERSION "98f7cac6e162d9775d615d07b9867c1ec0030f82") else() set(ARROW_VERSION "$ENV{PARQUET_ARROW_VERSION}") endif() http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index ed3fe56..f0eedcf 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -17,7 +17,6 @@ # Headers: top level install(FILES - compression.h encoding.h exception.h schema.h @@ -41,11 +40,9 @@ install(FILES "${CMAKE_CURRENT_BINARY_DIR}/parquet.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") -ADD_PARQUET_TEST(compression-test) ADD_PARQUET_TEST(encoding-test) ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(types-test) ADD_PARQUET_TEST(reader-test) ADD_PARQUET_TEST(schema-test) - ADD_PARQUET_BENCHMARK(encoding-benchmark) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/compression-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression-test.cc b/src/parquet/compression-test.cc deleted file mode 100644 index feaf9e3..0000000 --- a/src/parquet/compression-test.cc +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <cstdint> -#include <gtest/gtest.h> -#include <string> -#include <vector> - -#include "parquet/compression.h" -#include "parquet/util/test-common.h" - -using std::string; -using std::vector; - -namespace parquet { - -template <typename T> -void CheckCodecRoundtrip(const vector<uint8_t>& data) { - // create multiple compressors to try to break them - T c1; - T c2; - - int max_compressed_len = static_cast<int>(c1.MaxCompressedLen(data.size(), &data[0])); - std::vector<uint8_t> compressed(max_compressed_len); - std::vector<uint8_t> decompressed(data.size()); - - // compress with c1 - int actual_size = static_cast<int>( - c1.Compress(data.size(), &data[0], max_compressed_len, &compressed[0])); - compressed.resize(actual_size); - - // decompress with c2 - c2.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); - - ASSERT_TRUE(test::vector_equal(data, decompressed)); - - // compress with c2 - int actual_size2 = static_cast<int>( - c2.Compress(data.size(), &data[0], max_compressed_len, &compressed[0])); - ASSERT_EQ(actual_size2, actual_size); - - // decompress with c1 - c1.Decompress(compressed.size(), &compressed[0], decompressed.size(), &decompressed[0]); - - ASSERT_TRUE(test::vector_equal(data, decompressed)); -} - -template <typename T> -void CheckCodec() { - int sizes[] = {10000, 100000}; - for (int data_size : sizes) { - vector<uint8_t> data; - test::random_bytes(data_size, 1234, &data); - CheckCodecRoundtrip<T>(data); - } -} - -TEST(TestCompressors, Snappy) { - CheckCodec<SnappyCodec>(); -} - -TEST(TestCompressors, Brotli) { - CheckCodec<BrotliCodec>(); -} - -TEST(TestCompressors, GZip) { - CheckCodec<GZipCodec>(); -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/compression.cc ---------------------------------------------------------------------- diff --git a/src/parquet/compression.cc b/src/parquet/compression.cc deleted file mode 100644 index dc6b93d..0000000 --- a/src/parquet/compression.cc +++ /dev/null @@ -1,311 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/compression.h" - -#include <cstdint> -#include <memory> -#include <string> - -#include <brotli/decode.h> -#include <brotli/encode.h> -#include <snappy.h> -#include <zlib.h> - -#include "parquet/exception.h" -#include "parquet/types.h" - -namespace parquet { - -Codec::~Codec() {} - -std::unique_ptr<Codec> Codec::Create(Compression::type codec_type) { - std::unique_ptr<Codec> result; - switch (codec_type) { - case Compression::UNCOMPRESSED: - break; - case Compression::SNAPPY: - result.reset(new SnappyCodec()); - break; - case Compression::GZIP: - result.reset(new GZipCodec()); - break; - case Compression::LZO: - ParquetException::NYI("LZO codec not implemented"); - break; - case Compression::BROTLI: - result.reset(new BrotliCodec()); - break; - default: - ParquetException::NYI("Unrecognized codec"); - break; - } - return result; -} - -// ---------------------------------------------------------------------- -// gzip implementation - -// These are magic numbers from zlib.h. Not clear why they are not defined -// there. - -// Maximum window size -static constexpr int WINDOW_BITS = 15; - -// Output Gzip. -static constexpr int GZIP_CODEC = 16; - -// Determine if this is libz or gzip from header. -static constexpr int DETECT_CODEC = 32; - -class GZipCodec::GZipCodecImpl { - public: - explicit GZipCodecImpl(GZipCodec::Format format) - : format_(format), - compressor_initialized_(false), - decompressor_initialized_(false) {} - - ~GZipCodecImpl() { - EndCompressor(); - EndDecompressor(); - } - - void InitCompressor() { - EndDecompressor(); - memset(&stream_, 0, sizeof(stream_)); - - int ret; - // Initialize to run specified format - int window_bits = WINDOW_BITS; - if (format_ == DEFLATE) { - window_bits = -window_bits; - } else if (format_ == GZIP) { - window_bits += GZIP_CODEC; - } - if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 9, - Z_DEFAULT_STRATEGY)) != Z_OK) { - throw ParquetException("zlib deflateInit failed: " + std::string(stream_.msg)); - } - - compressor_initialized_ = true; - } - - void EndCompressor() { - if (compressor_initialized_) { (void)deflateEnd(&stream_); } - compressor_initialized_ = false; - } - - void InitDecompressor() { - EndCompressor(); - memset(&stream_, 0, sizeof(stream_)); - int ret; - - // Initialize to run either deflate or zlib/gzip format - int window_bits = format_ == DEFLATE ? -WINDOW_BITS : WINDOW_BITS | DETECT_CODEC; - if ((ret = inflateInit2(&stream_, window_bits)) != Z_OK) { - throw ParquetException("zlib inflateInit failed: " + std::string(stream_.msg)); - } - decompressor_initialized_ = true; - } - - void EndDecompressor() { - if (decompressor_initialized_) { (void)inflateEnd(&stream_); } - decompressor_initialized_ = false; - } - - void Decompress(int64_t input_length, const uint8_t* input, int64_t output_length, - uint8_t* output) { - if (!decompressor_initialized_) { InitDecompressor(); } - if (output_length == 0) { - // The zlib library does not allow *output to be NULL, even when output_length - // is 0 (inflate() will return Z_STREAM_ERROR). We don't consider this an - // error, so bail early if no output is expected. Note that we don't signal - // an error if the input actually contains compressed data. - return; - } - - // Reset the stream for this block - if (inflateReset(&stream_) != Z_OK) { - throw ParquetException("zlib inflateReset failed: " + std::string(stream_.msg)); - } - - int ret = 0; - // gzip can run in streaming mode or non-streaming mode. We only - // support the non-streaming use case where we present it the entire - // compressed input and a buffer big enough to contain the entire - // compressed output. In the case where we don't know the output, - // we just make a bigger buffer and try the non-streaming mode - // from the beginning again. - while (ret != Z_STREAM_END) { - stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); - stream_.avail_in = static_cast<uInt>(input_length); - stream_.next_out = reinterpret_cast<Bytef*>(output); - stream_.avail_out = static_cast<uInt>(output_length); - - // We know the output size. In this case, we can use Z_FINISH - // which is more efficient. - ret = inflate(&stream_, Z_FINISH); - if (ret == Z_STREAM_END || ret != Z_OK) break; - - // Failure, buffer was too small - std::stringstream ss; - ss << "Too small a buffer passed to GZipCodec. InputLength=" << input_length - << " OutputLength=" << output_length; - throw ParquetException(ss.str()); - } - - // Failure for some other reason - if (ret != Z_STREAM_END) { - std::stringstream ss; - ss << "GZipCodec failed: "; - if (stream_.msg != NULL) ss << stream_.msg; - throw ParquetException(ss.str()); - } - } - - int64_t MaxCompressedLen(int64_t input_length, const uint8_t* input) { - // Most be in compression mode - if (!compressor_initialized_) { InitCompressor(); } - // TODO(wesm): deal with zlib < 1.2.3 (see Impala codebase) - return deflateBound(&stream_, static_cast<uLong>(input_length)); - } - - int64_t Compress(int64_t input_length, const uint8_t* input, int64_t output_length, - uint8_t* output) { - if (!compressor_initialized_) { InitCompressor(); } - stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input)); - stream_.avail_in = static_cast<uInt>(input_length); - stream_.next_out = reinterpret_cast<Bytef*>(output); - stream_.avail_out = static_cast<uInt>(output_length); - - int64_t ret = 0; - if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) { - if (ret == Z_OK) { - // will return Z_OK (and stream.msg NOT set) if stream.avail_out is too - // small - throw ParquetException("zlib deflate failed, output buffer too small"); - } - std::stringstream ss; - ss << "zlib deflate failed: " << stream_.msg; - throw ParquetException(ss.str()); - } - - if (deflateReset(&stream_) != Z_OK) { - throw ParquetException("zlib deflateReset failed: " + std::string(stream_.msg)); - } - - // Actual output length - return output_length - stream_.avail_out; - } - - private: - // zlib is stateful and the z_stream state variable must be initialized - // before - z_stream stream_; - - // Realistically, this will always be GZIP, but we leave the option open to - // configure - GZipCodec::Format format_; - - // These variables are mutually exclusive. When the codec is in "compressor" - // state, compressor_initialized_ is true while decompressor_initialized_ is - // false. When it's decompressing, the opposite is true. - // - // Indeed, this is slightly hacky, but the alternative is having separate - // Compressor and Decompressor classes. If this ever becomes an issue, we can - // perform the refactoring then - bool compressor_initialized_; - bool decompressor_initialized_; -}; - -GZipCodec::GZipCodec(Format format) { - impl_.reset(new GZipCodecImpl(format)); -} - -GZipCodec::~GZipCodec() {} - -void GZipCodec::Decompress( - int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { - return impl_->Decompress(input_length, input, output_length, output); -} - -int64_t GZipCodec::MaxCompressedLen(int64_t input_length, const uint8_t* input) { - return impl_->MaxCompressedLen(input_length, input); -} - -int64_t GZipCodec::Compress( - int64_t input_length, const uint8_t* input, int64_t output_length, uint8_t* output) { - return impl_->Compress(input_length, input, output_length, output); -} - -const char* GZipCodec::name() const { - return "gzip"; -} - -// ---------------------------------------------------------------------- -// Snappy implementation - -void SnappyCodec::Decompress( - int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { - if (!snappy::RawUncompress(reinterpret_cast<const char*>(input), - static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer))) { - throw parquet::ParquetException("Corrupt snappy compressed data."); - } -} - -int64_t SnappyCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { - return snappy::MaxCompressedLength(input_len); -} - -int64_t SnappyCodec::Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) { - size_t output_len; - snappy::RawCompress(reinterpret_cast<const char*>(input), - static_cast<size_t>(input_len), reinterpret_cast<char*>(output_buffer), - &output_len); - return output_len; -} - -// ---------------------------------------------------------------------- -// Brotli implementation - -void BrotliCodec::Decompress( - int64_t input_len, const uint8_t* input, int64_t output_len, uint8_t* output_buffer) { - size_t output_size = output_len; - if (BrotliDecoderDecompress(input_len, input, &output_size, output_buffer) != - BROTLI_DECODER_RESULT_SUCCESS) { - throw parquet::ParquetException("Corrupt brotli compressed data."); - } -} - -int64_t BrotliCodec::MaxCompressedLen(int64_t input_len, const uint8_t* input) { - return BrotliEncoderMaxCompressedSize(input_len); -} - -int64_t BrotliCodec::Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) { - size_t output_len = output_buffer_len; - // TODO: Make quality configurable. We use 8 as a default as it is the best - // trade-off for Parquet workload - if (BrotliEncoderCompress(8, BROTLI_DEFAULT_WINDOW, BROTLI_DEFAULT_MODE, input_len, - input, &output_len, output_buffer) == BROTLI_FALSE) { - throw parquet::ParquetException("Brotli compression failure."); - } - return output_len; -} - -} // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/compression.h ---------------------------------------------------------------------- diff --git a/src/parquet/compression.h b/src/parquet/compression.h deleted file mode 100644 index c1a3bf4..0000000 --- a/src/parquet/compression.h +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_COMPRESSION_CODEC_H -#define PARQUET_COMPRESSION_CODEC_H - -#include <cstdint> -#include <memory> - -#include "parquet/exception.h" -#include "parquet/types.h" -#include "parquet/util/visibility.h" - -namespace parquet { - -class PARQUET_EXPORT Codec { - public: - virtual ~Codec(); - - static std::unique_ptr<Codec> Create(Compression::type codec); - - virtual void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) = 0; - - virtual int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) = 0; - - virtual int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) = 0; - - virtual const char* name() const = 0; -}; - -// Snappy codec. -class PARQUET_EXPORT SnappyCodec : public Codec { - public: - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override { return "snappy"; } -}; - -// Brotli codec. -class PARQUET_EXPORT BrotliCodec : public Codec { - public: - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, int64_t output_buffer_len, - uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override { return "brotli"; } -}; - -// GZip codec. -class PARQUET_EXPORT GZipCodec : public Codec { - public: - /// Compression formats supported by the zlib library - enum Format { - ZLIB, - DEFLATE, - GZIP, - }; - - explicit GZipCodec(Format format = GZIP); - virtual ~GZipCodec(); - - void Decompress(int64_t input_len, const uint8_t* input, int64_t output_len, - uint8_t* output_buffer) override; - - int64_t Compress(int64_t input_len, const uint8_t* input, - int64_t output_buffer_len, uint8_t* output_buffer) override; - - int64_t MaxCompressedLen(int64_t input_len, const uint8_t* input) override; - - const char* name() const override; - - private: - // The gzip compressor is stateful - class GZipCodecImpl; - std::unique_ptr<GZipCodecImpl> impl_; -}; - -} // namespace parquet - -#endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/exception.h ---------------------------------------------------------------------- diff --git a/src/parquet/exception.h b/src/parquet/exception.h index b161bf7..40c24e5 100644 --- a/src/parquet/exception.h +++ b/src/parquet/exception.h @@ -19,10 +19,35 @@ #define PARQUET_EXCEPTION_H #include <exception> +#include <sstream> #include <string> +#include "arrow/status.h" + #include "parquet/util/visibility.h" +#define PARQUET_CATCH_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { \ + return ::arrow::Status::IOError(e.what()); \ + } + +#define PARQUET_IGNORE_NOT_OK(s) \ + try { \ + (s); \ + } catch (const ::parquet::ParquetException& e) { UNUSED(e); } + +#define PARQUET_THROW_NOT_OK(s) \ + do { \ + ::arrow::Status _s = (s); \ + if (!_s.ok()) { \ + std::stringstream ss; \ + ss << "Arrow error: " << _s.ToString(); \ + ::parquet::ParquetException::Throw(ss.str()); \ + } \ + } while (0); + namespace parquet { class PARQUET_EXPORT ParquetException : public std::exception { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/file/file-deserialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc index a823591..7c4690e 100644 --- a/src/parquet/file/file-deserialize-test.cc +++ b/src/parquet/file/file-deserialize-test.cc @@ -27,7 +27,6 @@ #include <vector> #include "parquet/column/page.h" -#include "parquet/compression.h" #include "parquet/exception.h" #include "parquet/file/reader-internal.h" #include "parquet/parquet_types.h" @@ -38,6 +37,12 @@ namespace parquet { +#define ASSERT_OK(expr) \ + do { \ + ::arrow::Status s = (expr); \ + if (!s.ok()) { FAIL() << s.ToString(); } \ + } while (0) + using ::arrow::io::BufferReader; // Adds page statistics occupying a certain amount of bytes (for testing very @@ -187,7 +192,7 @@ TEST_F(TestPageSerde, Compression) { test::random_bytes(page_size, 0, &faux_data[i]); } for (auto codec_type : codec_types) { - std::unique_ptr<Codec> codec = Codec::Create(codec_type); + std::unique_ptr<::arrow::Codec> codec = GetCodecFromArrow(codec_type); std::vector<uint8_t> buffer; for (int i = 0; i < num_pages; ++i) { @@ -197,8 +202,9 @@ TEST_F(TestPageSerde, Compression) { int64_t max_compressed_size = codec->MaxCompressedLen(data_size, data); buffer.resize(max_compressed_size); - int64_t actual_size = - codec->Compress(data_size, data, max_compressed_size, &buffer[0]); + int64_t actual_size; + ASSERT_OK(codec->Compress( + data_size, data, max_compressed_size, &buffer[0], &actual_size)); WriteDataPageHeader(1024, data_size, static_cast<int32_t>(actual_size)); out_stream_->Write(buffer.data(), actual_size); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index d663617..50d2114 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -26,7 +26,6 @@ #include "parquet/column/properties.h" #include "parquet/column/statistics.h" -#include "parquet/compression.h" #include "parquet/schema.h" #include "parquet/types.h" #include "parquet/util/memory.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index c542000..1d9ab47 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -24,8 +24,9 @@ #include <string> #include <vector> +#include "arrow/util/compression.h" + #include "parquet/column/page.h" -#include "parquet/compression.h" #include "parquet/exception.h" #include "parquet/schema.h" #include "parquet/thrift.h" @@ -41,13 +42,13 @@ namespace parquet { // assembled in a serialized stream for storing in a Parquet files SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream, - int64_t total_num_rows, Compression::type codec_type, MemoryPool* pool) + int64_t total_num_rows, Compression::type codec, MemoryPool* pool) : stream_(std::move(stream)), decompression_buffer_(AllocateBuffer(pool, 0)), seen_num_rows_(0), total_num_rows_(total_num_rows) { max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - decompressor_ = Codec::Create(codec_type); + decompressor_ = GetCodecFromArrow(codec); } std::shared_ptr<Page> SerializedPageReader::NextPage() { @@ -99,8 +100,8 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() { if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) { PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); } - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - decompression_buffer_->mutable_data()); + PARQUET_THROW_NOT_OK(decompressor_->Decompress(compressed_len, buffer, + uncompressed_len, decompression_buffer_->mutable_data())); buffer = decompression_buffer_->data(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/file/reader-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h index 5d35540..1ac2384 100644 --- a/src/parquet/file/reader-internal.h +++ b/src/parquet/file/reader-internal.h @@ -24,7 +24,6 @@ #include "parquet/column/page.h" #include "parquet/column/properties.h" -#include "parquet/compression.h" #include "parquet/file/metadata.h" #include "parquet/file/reader.h" #include "parquet/parquet_types.h" @@ -32,6 +31,11 @@ #include "parquet/util/memory.h" #include "parquet/util/visibility.h" +namespace arrow { + +class Codec; +}; + namespace parquet { // 16 MB is the default maximum page header size @@ -63,7 +67,7 @@ class PARQUET_EXPORT SerializedPageReader : public PageReader { std::shared_ptr<Page> current_page_; // Compression codec to use. - std::unique_ptr<Codec> decompressor_; + std::unique_ptr<::arrow::Codec> decompressor_; std::shared_ptr<PoolBuffer> decompression_buffer_; // Maximum allowed page size http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 019271f..bb24737 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -17,6 +17,11 @@ #include "parquet/file/writer-internal.h" +#include <cstdint> +#include <memory> + +#include "arrow/util/compression.h" + #include "parquet/column/writer.h" #include "parquet/schema-internal.h" #include "parquet/schema.h" @@ -46,7 +51,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type data_page_offset_(0), total_uncompressed_size_(0), total_compressed_size_(0) { - compressor_ = Codec::Create(codec); + compressor_ = GetCodecFromArrow(codec); } static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) { @@ -81,8 +86,9 @@ void SerializedPageWriter::Compress( // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); - int64_t compressed_size = compressor_->Compress(src_buffer.size(), src_buffer.data(), - max_compressed_size, dest_buffer->mutable_data()); + int64_t compressed_size; + PARQUET_THROW_NOT_OK(compressor_->Compress(src_buffer.size(), src_buffer.data(), + max_compressed_size, dest_buffer->mutable_data(), &compressed_size)); PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index 27dc89e..6ac7927 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -22,12 +22,16 @@ #include <vector> #include "parquet/column/page.h" -#include "parquet/compression.h" #include "parquet/file/metadata.h" #include "parquet/file/writer.h" #include "parquet/parquet_types.h" #include "parquet/util/memory.h" +namespace arrow { + +class Codec; +}; + namespace parquet { // This subclass delimits pages appearing in a serialized stream, each preceded @@ -65,7 +69,7 @@ class SerializedPageWriter : public PageWriter { int64_t total_compressed_size_; // Compression codec to use. - std::unique_ptr<Codec> compressor_; + std::unique_ptr<::arrow::Codec> compressor_; }; // RowGroupWriter::Contents implementation for the Parquet file specification http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/parquet_version.h ---------------------------------------------------------------------- diff --git a/src/parquet/parquet_version.h b/src/parquet/parquet_version.h index 453af5e..5432333 100644 --- a/src/parquet/parquet_version.h +++ b/src/parquet/parquet_version.h @@ -21,4 +21,4 @@ // define the parquet created by version #define CREATED_BY_VERSION "parquet-cpp version 1.1.1-SNAPSHOT" -#endif // PARQUET_VERSION_H +#endif // PARQUET_VERSION_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc index 4efa0b2..5fc51fe 100644 --- a/src/parquet/schema.cc +++ b/src/parquet/schema.cc @@ -264,9 +264,7 @@ int GroupNode::FieldIndex(const std::string& name) const { int GroupNode::FieldIndex(const Node& node) const { int result = FieldIndex(node.name()); - if (result < 0) { - return -1; - } + if (result < 0) { return -1; } DCHECK(result < field_count()); if (!node.Equals(field(result).get())) { // Same name but not the same node @@ -679,9 +677,7 @@ int SchemaDescriptor::ColumnIndex(const std::string& node_path) const { int SchemaDescriptor::ColumnIndex(const Node& node) const { int result = ColumnIndex(node.path()->ToDotString()); - if (result < 0) { - return -1; - } + if (result < 0) { return -1; } DCHECK(result < num_columns()); if (!node.Equals(Column(result)->schema_node().get())) { // Same path but not the same node http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/types.h ---------------------------------------------------------------------- diff --git a/src/parquet/types.h b/src/parquet/types.h index 8504f5d..7ec3825 100644 --- a/src/parquet/types.h +++ b/src/parquet/types.h @@ -270,17 +270,17 @@ inline std::string format_fwf(int width) { return ss.str(); } -std::string PARQUET_EXPORT CompressionToString(Compression::type t); +PARQUET_EXPORT std::string CompressionToString(Compression::type t); -std::string PARQUET_EXPORT EncodingToString(Encoding::type t); +PARQUET_EXPORT std::string EncodingToString(Encoding::type t); -std::string PARQUET_EXPORT LogicalTypeToString(LogicalType::type t); +PARQUET_EXPORT std::string LogicalTypeToString(LogicalType::type t); -std::string PARQUET_EXPORT TypeToString(Type::type t); +PARQUET_EXPORT std::string TypeToString(Type::type t); -std::string PARQUET_EXPORT FormatStatValue(Type::type parquet_type, const char* val); +PARQUET_EXPORT std::string FormatStatValue(Type::type parquet_type, const char* val); -int PARQUET_EXPORT GetTypeByteSize(Type::type t); +PARQUET_EXPORT int GetTypeByteSize(Type::type t); } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/491182c2/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h index ca244a7..4f780c4 100644 --- a/src/parquet/util/memory.h +++ b/src/parquet/util/memory.h @@ -31,35 +31,42 @@ #include "arrow/io/memory.h" #include "arrow/memory_pool.h" #include "arrow/status.h" +#include "arrow/util/compression.h" #include "parquet/exception.h" +#include "parquet/types.h" #include "parquet/util/macros.h" #include "parquet/util/visibility.h" -#define PARQUET_CATCH_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ::parquet::ParquetException& e) { \ - return ::arrow::Status::IOError(e.what()); \ - } - -#define PARQUET_IGNORE_NOT_OK(s) \ - try { \ - (s); \ - } catch (const ::parquet::ParquetException& e) { UNUSED(e); } - -#define PARQUET_THROW_NOT_OK(s) \ - do { \ - ::arrow::Status _s = (s); \ - if (!_s.ok()) { \ - std::stringstream ss; \ - ss << "Arrow error: " << _s.ToString(); \ - ::parquet::ParquetException::Throw(ss.str()); \ - } \ - } while (0); - namespace parquet { +static inline std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec) { + std::unique_ptr<::arrow::Codec> result; + switch (codec) { + case Compression::UNCOMPRESSED: + break; + case Compression::SNAPPY: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result)); + break; + case Compression::GZIP: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::GZIP, &result)); + break; + case Compression::LZO: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::LZO, &result)); + break; + case Compression::BROTLI: + PARQUET_THROW_NOT_OK( + ::arrow::Codec::Create(::arrow::Compression::BROTLI, &result)); + break; + default: + break; + } + return result; +} + static constexpr int64_t kInMemoryDefaultCapacity = 1024; using Buffer = ::arrow::Buffer;