Repository: arrow Updated Branches: refs/heads/master 95f489c4c -> de54eff19
ARROW-659: [C++] Add multithreaded memcpy implementation parallelize memcopy operations for large objects with a multi-threaded implementation. Author: Philipp Moritz <[email protected]> Closes #580 from atumanov/parallel-memcpy and squashes the following commits: 6ea9873 [Philipp Moritz] fix windows build (?) 66dfa74 [Philipp Moritz] linting 9dd6f3f [Philipp Moritz] cleanup e81bad9 [Philipp Moritz] add license header 0beb870 [Philipp Moritz] add pthread library 1d73612 [Philipp Moritz] add test of parallel memcopy 1a27431 [Philipp Moritz] restructure code 70d767c [Philipp Moritz] add benchmarks b320b47 [Philipp Moritz] make memcopy generic f99606a [Philipp Moritz] add parallel memcpy, contributed by Alexey Tumanov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/de54eff1 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/de54eff1 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/de54eff1 Branch: refs/heads/master Commit: de54eff19af024c1ca0e82f4b45c6021443a635b Parents: 95f489c Author: Philipp Moritz <[email protected]> Authored: Mon Apr 24 08:30:08 2017 -0400 Committer: Wes McKinney <[email protected]> Committed: Mon Apr 24 08:30:08 2017 -0400 ---------------------------------------------------------------------- cpp/CMakeLists.txt | 3 +- cpp/src/arrow/io/CMakeLists.txt | 2 + cpp/src/arrow/io/io-memory-benchmark.cc | 75 ++++++++++++++++++++++++++++ cpp/src/arrow/io/io-memory-test.cc | 22 ++++++++ cpp/src/arrow/io/memory.cc | 31 ++++++++++-- cpp/src/arrow/io/memory.h | 8 +++ cpp/src/arrow/ipc/CMakeLists.txt | 1 + cpp/src/arrow/util/memory.h | 69 +++++++++++++++++++++++++ 8 files changed, 207 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 978f70a..2d8c00f 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -902,7 +902,8 @@ set(ARROW_STATIC_PRIVATE_LINK_LIBS if (NOT MSVC) set(ARROW_LINK_LIBS ${ARROW_LINK_LIBS} - ${CMAKE_DL_LIBS}) + ${CMAKE_DL_LIBS} + pthread) endif() if(RAPIDJSON_VENDORED) http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index c0199d7..cd48974 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -22,6 +22,8 @@ ADD_ARROW_TEST(io-file-test) ADD_ARROW_TEST(io-hdfs-test) ADD_ARROW_TEST(io-memory-test) +ADD_ARROW_BENCHMARK(io-memory-benchmark) + # Headers: top level install(FILES file.h http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/io-memory-benchmark.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-memory-benchmark.cc b/cpp/src/arrow/io/io-memory-benchmark.cc new file mode 100644 index 0000000..59b511a --- /dev/null +++ b/cpp/src/arrow/io/io-memory-benchmark.cc @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/api.h" +#include "arrow/io/memory.h" +#include "arrow/test-util.h" + +#include "benchmark/benchmark.h" + +#include <iostream> + +namespace arrow { + +static void BM_SerialMemcopy(benchmark::State& state) { // NOLINT non-const reference + constexpr int64_t kTotalSize = 100 * 1024 * 1024; // 100MB + + auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool()); + buffer1->Resize(kTotalSize); + + auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool()); + buffer2->Resize(kTotalSize); + test::random_bytes(kTotalSize, 0, buffer2->mutable_data()); + + while (state.KeepRunning()) { + io::FixedSizeBufferWriter writer(buffer1); + writer.Write(buffer2->data(), buffer2->size()); + } + state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize); +} + +static void BM_ParallelMemcopy(benchmark::State& state) { // NOLINT non-const reference + constexpr int64_t kTotalSize = 100 * 1024 * 1024; // 100MB + + auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool()); + buffer1->Resize(kTotalSize); + + auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool()); + buffer2->Resize(kTotalSize); + test::random_bytes(kTotalSize, 0, buffer2->mutable_data()); + + while (state.KeepRunning()) { + io::FixedSizeBufferWriter writer(buffer1); + writer.set_memcopy_threads(4); + writer.Write(buffer2->data(), buffer2->size()); + } + state.SetBytesProcessed(int64_t(state.iterations()) * kTotalSize); +} + +BENCHMARK(BM_SerialMemcopy) + ->RangeMultiplier(4) + ->Range(1, 1 << 13) + ->MinTime(1.0) + ->UseRealTime(); + +BENCHMARK(BM_ParallelMemcopy) + ->RangeMultiplier(4) + ->Range(1, 1 << 13) + ->MinTime(1.0) + ->UseRealTime(); + +} // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/io-memory-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc index 4704fe8..33249cb 100644 --- a/cpp/src/arrow/io/io-memory-test.cc +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -17,6 +17,7 @@ #include <cstdint> #include <cstdio> +#include <cstdlib> #include <cstring> #include <memory> #include <string> @@ -114,5 +115,26 @@ TEST(TestBufferReader, RetainParentReference) { ASSERT_EQ(0, std::memcmp(slice2->data(), data.c_str() + 4, 6)); } +TEST(TestMemcopy, ParallelMemcopy) { + for (int i = 0; i < 5; ++i) { + // randomize size so the memcopy alignment is tested + int64_t total_size = 3 * 1024 * 1024 + std::rand() % 100; + + auto buffer1 = std::make_shared<PoolBuffer>(default_memory_pool()); + buffer1->Resize(total_size); + + auto buffer2 = std::make_shared<PoolBuffer>(default_memory_pool()); + buffer2->Resize(total_size); + test::random_bytes(total_size, 0, buffer2->mutable_data()); + + io::FixedSizeBufferWriter writer(buffer1); + writer.set_memcopy_threads(4); + writer.set_memcopy_threshold(1024 * 1024); + writer.Write(buffer2->data(), buffer2->size()); + + ASSERT_EQ(0, memcmp(buffer1->data(), buffer2->data(), buffer1->size())); + } +} + } // namespace io } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 2e701e1..95c6206 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -29,6 +29,7 @@ #include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/util/logging.h" +#include "arrow/util/memory.h" namespace arrow { namespace io { @@ -80,7 +81,7 @@ Status BufferOutputStream::Tell(int64_t* position) { Status BufferOutputStream::Write(const uint8_t* data, int64_t nbytes) { DCHECK(buffer_); RETURN_NOT_OK(Reserve(nbytes)); - std::memcpy(mutable_data_ + position_, data, nbytes); + memcpy(mutable_data_ + position_, data, nbytes); position_ += nbytes; return Status::OK(); } @@ -101,8 +102,15 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { // ---------------------------------------------------------------------- // In-memory buffer writer +static constexpr int kMemcopyDefaultNumThreads = 1; +static constexpr int64_t kMemcopyDefaultBlocksize = 64; +static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024; + /// Input buffer must be mutable, will abort if not -FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) { +FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) + : memcopy_num_threads_(kMemcopyDefaultNumThreads), + memcopy_blocksize_(kMemcopyDefaultBlocksize), + memcopy_threshold_(kMemcopyDefaultThreshold) { buffer_ = buffer; DCHECK(buffer->is_mutable()) << "Must pass mutable buffer"; mutable_data_ = buffer->mutable_data(); @@ -131,7 +139,12 @@ Status FixedSizeBufferWriter::Tell(int64_t* position) { } Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) { - std::memcpy(mutable_data_ + position_, data, nbytes); + if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) { + parallel_memcopy(mutable_data_ + position_, data, nbytes, + memcopy_blocksize_, memcopy_num_threads_); + } else { + memcpy(mutable_data_ + position_, data, nbytes); + } position_ += nbytes; return Status::OK(); } @@ -143,6 +156,18 @@ Status FixedSizeBufferWriter::WriteAt( return Write(data, nbytes); } +void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) { + memcopy_num_threads_ = num_threads; +} + +void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) { + memcopy_blocksize_ = blocksize; +} + +void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) { + memcopy_threshold_ = threshold; +} + // ---------------------------------------------------------------------- // In-memory buffer reader http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/io/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index fbb186b..f1b5990 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -81,12 +81,20 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile { Status Write(const uint8_t* data, int64_t nbytes) override; Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override; + void set_memcopy_threads(int num_threads); + void set_memcopy_blocksize(int64_t blocksize); + void set_memcopy_threshold(int64_t threshold); + private: std::mutex lock_; std::shared_ptr<Buffer> buffer_; uint8_t* mutable_data_; int64_t size_; int64_t position_; + + int memcopy_num_threads_; + int64_t memcopy_blocksize_; + int64_t memcopy_threshold_; }; class ARROW_EXPORT BufferReader : public RandomAccessFile { http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/ipc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt index fc1d53e..41ab5d7 100644 --- a/cpp/src/arrow/ipc/CMakeLists.txt +++ b/cpp/src/arrow/ipc/CMakeLists.txt @@ -95,6 +95,7 @@ if(MSVC) else() set(UTIL_LINK_LIBS arrow_static + pthread ${BOOST_FILESYSTEM_LIBRARY} ${BOOST_SYSTEM_LIBRARY} dl) http://git-wip-us.apache.org/repos/asf/arrow/blob/de54eff1/cpp/src/arrow/util/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h new file mode 100644 index 0000000..7feeb29 --- /dev/null +++ b/cpp/src/arrow/util/memory.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_UTIL_MEMORY_H +#define ARROW_UTIL_MEMORY_H + +#include <thread> +#include <vector> + +namespace arrow { + +uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) { + uintptr_t value = reinterpret_cast<uintptr_t>(address); + return reinterpret_cast<uint8_t*>(value & bits); +} + +// A helper function for doing memcpy with multiple threads. This is required +// to saturate the memory bandwidth of modern cpus. +void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes, + uintptr_t block_size, int num_threads) { + std::vector<std::thread> threadpool(num_threads); + uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1)); + uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1)); + int64_t num_blocks = (right - left) / block_size; + + // Update right address + right = right - (num_blocks % num_threads) * block_size; + + // Now we divide these blocks between available threads. The remainder is + // handled on the main thread. + int64_t chunk_size = (right - left) / num_threads; + int64_t prefix = left - src; + int64_t suffix = src + nbytes - right; + // Now the data layout is | prefix | k * num_threads * block_size | suffix |. + // We have chunk_size = k * block_size, therefore the data layout is + // | prefix | num_threads * chunk_size | suffix |. + // Each thread gets a "chunk" of k blocks. + + // Start all threads first and handle leftovers while threads run. + for (int i = 0; i < num_threads; i++) { + threadpool[i] = std::thread(memcpy, dst + prefix + i * chunk_size, + left + i * chunk_size, chunk_size); + } + + memcpy(dst, src, prefix); + memcpy(dst + prefix + num_threads * chunk_size, right, suffix); + + for (auto& t : threadpool) { + if (t.joinable()) { t.join(); } + } +} + +} // namespace arrow + +#endif // ARROW_UTIL_MEMORY_H
