This is an automated email from the ASF dual-hosted git repository. marong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push: new 70ee0f411 [VL] Add BufferedOutputStream to track the memory usage in PrestoSerializer (#5785) 70ee0f411 is described below commit 70ee0f411cde6d6bbf3772ea3d623ef698b9f174 Author: Rong Ma <rong...@intel.com> AuthorDate: Fri May 17 16:28:35 2024 +0800 [VL] Add BufferedOutputStream to track the memory usage in PrestoSerializer (#5785) --- cpp/velox/CMakeLists.txt | 1 + cpp/velox/memory/BufferOutputStream.cc | 45 +++++++++++++++ cpp/velox/memory/BufferOutputStream.h | 42 ++++++++++++++ cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 25 +++------ cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h | 8 +-- cpp/velox/tests/BufferOutputStreamTest.cc | 70 ++++++++++++++++++++++++ cpp/velox/tests/CMakeLists.txt | 1 + 7 files changed, 171 insertions(+), 21 deletions(-) diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt index c058883b6..9bedfe45b 100644 --- a/cpp/velox/CMakeLists.txt +++ b/cpp/velox/CMakeLists.txt @@ -300,6 +300,7 @@ set(VELOX_SRCS jni/VeloxJniWrapper.cc jni/JniFileSystem.cc jni/JniUdf.cc + memory/BufferOutputStream.cc memory/VeloxColumnarBatch.cc memory/VeloxMemoryManager.cc operators/functions/RegistrationAllFunctions.cc diff --git a/cpp/velox/memory/BufferOutputStream.cc b/cpp/velox/memory/BufferOutputStream.cc new file mode 100644 index 000000000..31d7b0936 --- /dev/null +++ b/cpp/velox/memory/BufferOutputStream.cc @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "memory/BufferOutputStream.h" + +namespace gluten { +BufferOutputStream::BufferOutputStream( + facebook::velox::memory::MemoryPool* pool, + int32_t initialSize, + facebook::velox::OutputStreamListener* listener) + : facebook::velox::OutputStream(listener) { + buffer_ = facebook::velox::AlignedBuffer::allocate<char>(initialSize, pool); + buffer_->setSize(0); +} + +void BufferOutputStream::write(const char* s, std::streamsize count) { + facebook::velox::AlignedBuffer::appendTo(&buffer_, s, count); +} + +std::streampos BufferOutputStream::tellp() const { + return buffer_->size(); +} + +void BufferOutputStream::seekp(std::streampos pos) { + buffer_->setSize(pos); +} + +facebook::velox::BufferPtr BufferOutputStream::getBuffer() const { + return buffer_; +} +} // namespace gluten diff --git a/cpp/velox/memory/BufferOutputStream.h b/cpp/velox/memory/BufferOutputStream.h new file mode 100644 index 000000000..49774e09d --- /dev/null +++ b/cpp/velox/memory/BufferOutputStream.h @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/buffer/Buffer.h" +#include "velox/common/memory/ByteStream.h" + +namespace gluten { +class BufferOutputStream : public facebook::velox::OutputStream { + public: + BufferOutputStream( + facebook::velox::memory::MemoryPool* pool, + int32_t initialSize = facebook::velox::memory::AllocationTraits::kPageSize, + facebook::velox::OutputStreamListener* listener = nullptr); + + void write(const char* s, std::streamsize count); + + std::streampos tellp() const; + + void seekp(std::streampos pos); + + facebook::velox::BufferPtr getBuffer() const; + + private: + facebook::velox::BufferPtr buffer_; +}; +} // namespace gluten diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc index b0c2cc8ad..bd56bc62e 100644 --- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc +++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc @@ -84,6 +84,7 @@ arrow::Status VeloxSortBasedShuffleWriter::init() { for (auto pid = 0; pid < numPartitions_; ++pid) { rowVectorIndexMap_[pid].reserve(options_.bufferSize); } + bufferOutputStream_ = std::make_unique<BufferOutputStream>(veloxPool_.get()); return arrow::Status::OK(); } @@ -153,18 +154,12 @@ arrow::Status VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch> return arrow::Status::OK(); } -arrow::Status VeloxSortBasedShuffleWriter::evictBatch( - uint32_t partitionId, - std::ostringstream* output, - facebook::velox::OStreamOutputStream* out, - facebook::velox::RowTypePtr* rowTypePtr) { +arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId, facebook::velox::RowTypePtr* rowTypePtr) { int64_t rawSize = batch_->size(); - batch_->flush(out); - const std::string& outputStr = output->str(); - RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, outputStr.c_str(), outputStr.size())); - batch_.reset(); - output->clear(); - output->str(""); + bufferOutputStream_->seekp(0); + batch_->flush(bufferOutputStream_.get()); + auto buffer = bufferOutputStream_->getBuffer(); + RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, buffer->as<char>(), buffer->size())); batch_ = std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), serde_.get()); batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_); return arrow::Status::OK(); @@ -174,8 +169,6 @@ arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId) int32_t rowNum = 0; const int32_t maxBatchNum = options_.bufferSize; auto rowTypePtr = std::static_pointer_cast<const facebook::velox::RowType>(rowType_.value()); - std::ostringstream output; - facebook::velox::OStreamOutputStream out(&output); if (options_.partitioning != Partitioning::kSingle) { if (auto it = rowVectorIndexMap_.find(partitionId); it != rowVectorIndexMap_.end()) { @@ -219,7 +212,7 @@ arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId) rowNum += groupedSize[pair.first]; if (rowNum >= maxBatchNum) { rowNum = 0; - RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr)); + RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr)); } } @@ -231,13 +224,13 @@ arrow::Status VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId) rowNum += rowVectorPtr->size(); batch_->append(rowVectorPtr); if (rowNum >= maxBatchNum) { - RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr)); + RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr)); rowNum = 0; } } } if (rowNum > 0) { - RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr)); + RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr)); } return arrow::Status::OK(); } diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h index e3ac07dfc..710590184 100644 --- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h +++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h @@ -37,6 +37,7 @@ #include <arrow/type.h> #include "VeloxShuffleWriter.h" +#include "memory/BufferOutputStream.h" #include "memory/VeloxMemoryManager.h" #include "shuffle/PartitionWriter.h" #include "shuffle/Partitioner.h" @@ -65,11 +66,7 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter { arrow::Status evictRowVector(uint32_t partitionId) override; - arrow::Status evictBatch( - uint32_t partitionId, - std::ostringstream* output, - facebook::velox::OStreamOutputStream* out, - facebook::velox::RowTypePtr* rowTypePtr); + arrow::Status evictBatch(uint32_t partitionId, facebook::velox::RowTypePtr* rowTypePtr); private: VeloxSortBasedShuffleWriter( @@ -93,6 +90,7 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter { std::optional<facebook::velox::TypePtr> rowType_; std::unique_ptr<facebook::velox::VectorStreamGroup> batch_; + std::unique_ptr<BufferOutputStream> bufferOutputStream_; // Partition ID -> Row Count // subscript: Partition ID diff --git a/cpp/velox/tests/BufferOutputStreamTest.cc b/cpp/velox/tests/BufferOutputStreamTest.cc new file mode 100644 index 000000000..3b3f78cea --- /dev/null +++ b/cpp/velox/tests/BufferOutputStreamTest.cc @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "memory/BufferOutputStream.h" +#include "memory/VeloxColumnarBatch.h" +#include "velox/common/memory/ByteStream.h" +#include "velox/vector/tests/utils/VectorTestBase.h" + +using namespace facebook::velox; + +namespace gluten { +class BufferOutputStreamTest : public ::testing::Test, public test::VectorTestBase { + protected: + // Velox requires the mem manager to be instanced. + static void SetUpTestCase() { + memory::MemoryManager::testingSetInstance({}); + } + + std::shared_ptr<memory::MemoryPool> veloxPool_ = defaultLeafVeloxMemoryPool(); +}; + +TEST_F(BufferOutputStreamTest, outputStream) { + auto out = std::make_unique<BufferOutputStream>(veloxPool_.get(), 10000); + std::stringstream referenceSStream; + auto reference = std::make_unique<facebook::velox::OStreamOutputStream>(&referenceSStream); + for (auto i = 0; i < 100; ++i) { + std::string data; + data.resize(10000); + std::fill(data.begin(), data.end(), i); + out->write(data.data(), data.size()); + reference->write(data.data(), data.size()); + } + EXPECT_EQ(reference->tellp(), out->tellp()); + for (auto i = 0; i < 100; ++i) { + std::string data; + data.resize(6000); + std::fill(data.begin(), data.end(), i + 10); + out->seekp(i * 10000 + 5000); + reference->seekp(i * 10000 + 5000); + out->write(data.data(), data.size()); + reference->write(data.data(), data.size()); + } + auto str = referenceSStream.str(); + auto numBytes = veloxPool_->currentBytes(); + EXPECT_LT(0, numBytes); + { + auto buffer = out->getBuffer(); + EXPECT_EQ(numBytes, veloxPool_->currentBytes()); + EXPECT_EQ(str, std::string(buffer->as<char>(), buffer->size())); + } + + out.reset(); + // We expect dropping the stream frees the backing memory. + EXPECT_EQ(0, veloxPool_->currentBytes()); +} +} // namespace gluten diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt index 58482fe15..a5bd5b4f7 100644 --- a/cpp/velox/tests/CMakeLists.txt +++ b/cpp/velox/tests/CMakeLists.txt @@ -61,3 +61,4 @@ add_velox_test( FilePathGenerator.cc) add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc) add_velox_test(execution_ctx_test SOURCES RuntimeTest.cc) +add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org For additional commands, e-mail: commits-h...@gluten.apache.org