This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 70ee0f411 [VL] Add BufferedOutputStream to track the memory usage in 
PrestoSerializer (#5785)
70ee0f411 is described below

commit 70ee0f411cde6d6bbf3772ea3d623ef698b9f174
Author: Rong Ma <rong...@intel.com>
AuthorDate: Fri May 17 16:28:35 2024 +0800

    [VL] Add BufferedOutputStream to track the memory usage in PrestoSerializer 
(#5785)
---
 cpp/velox/CMakeLists.txt                         |  1 +
 cpp/velox/memory/BufferOutputStream.cc           | 45 +++++++++++++++
 cpp/velox/memory/BufferOutputStream.h            | 42 ++++++++++++++
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc | 25 +++------
 cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h  |  8 +--
 cpp/velox/tests/BufferOutputStreamTest.cc        | 70 ++++++++++++++++++++++++
 cpp/velox/tests/CMakeLists.txt                   |  1 +
 7 files changed, 171 insertions(+), 21 deletions(-)

diff --git a/cpp/velox/CMakeLists.txt b/cpp/velox/CMakeLists.txt
index c058883b6..9bedfe45b 100644
--- a/cpp/velox/CMakeLists.txt
+++ b/cpp/velox/CMakeLists.txt
@@ -300,6 +300,7 @@ set(VELOX_SRCS
     jni/VeloxJniWrapper.cc
     jni/JniFileSystem.cc
     jni/JniUdf.cc
+    memory/BufferOutputStream.cc
     memory/VeloxColumnarBatch.cc
     memory/VeloxMemoryManager.cc
     operators/functions/RegistrationAllFunctions.cc
diff --git a/cpp/velox/memory/BufferOutputStream.cc 
b/cpp/velox/memory/BufferOutputStream.cc
new file mode 100644
index 000000000..31d7b0936
--- /dev/null
+++ b/cpp/velox/memory/BufferOutputStream.cc
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "memory/BufferOutputStream.h"
+
+namespace gluten {
+BufferOutputStream::BufferOutputStream(
+    facebook::velox::memory::MemoryPool* pool,
+    int32_t initialSize,
+    facebook::velox::OutputStreamListener* listener)
+    : facebook::velox::OutputStream(listener) {
+  buffer_ = facebook::velox::AlignedBuffer::allocate<char>(initialSize, pool);
+  buffer_->setSize(0);
+}
+
+void BufferOutputStream::write(const char* s, std::streamsize count) {
+  facebook::velox::AlignedBuffer::appendTo(&buffer_, s, count);
+}
+
+std::streampos BufferOutputStream::tellp() const {
+  return buffer_->size();
+}
+
+void BufferOutputStream::seekp(std::streampos pos) {
+  buffer_->setSize(pos);
+}
+
+facebook::velox::BufferPtr BufferOutputStream::getBuffer() const {
+  return buffer_;
+}
+} // namespace gluten
diff --git a/cpp/velox/memory/BufferOutputStream.h 
b/cpp/velox/memory/BufferOutputStream.h
new file mode 100644
index 000000000..49774e09d
--- /dev/null
+++ b/cpp/velox/memory/BufferOutputStream.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "velox/buffer/Buffer.h"
+#include "velox/common/memory/ByteStream.h"
+
+namespace gluten {
+class BufferOutputStream : public facebook::velox::OutputStream {
+ public:
+  BufferOutputStream(
+      facebook::velox::memory::MemoryPool* pool,
+      int32_t initialSize = 
facebook::velox::memory::AllocationTraits::kPageSize,
+      facebook::velox::OutputStreamListener* listener = nullptr);
+
+  void write(const char* s, std::streamsize count);
+
+  std::streampos tellp() const;
+
+  void seekp(std::streampos pos);
+
+  facebook::velox::BufferPtr getBuffer() const;
+
+ private:
+  facebook::velox::BufferPtr buffer_;
+};
+} // namespace gluten
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
index b0c2cc8ad..bd56bc62e 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.cc
@@ -84,6 +84,7 @@ arrow::Status VeloxSortBasedShuffleWriter::init() {
   for (auto pid = 0; pid < numPartitions_; ++pid) {
     rowVectorIndexMap_[pid].reserve(options_.bufferSize);
   }
+  bufferOutputStream_ = std::make_unique<BufferOutputStream>(veloxPool_.get());
 
   return arrow::Status::OK();
 }
@@ -153,18 +154,12 @@ arrow::Status 
VeloxSortBasedShuffleWriter::write(std::shared_ptr<ColumnarBatch>
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxSortBasedShuffleWriter::evictBatch(
-    uint32_t partitionId,
-    std::ostringstream* output,
-    facebook::velox::OStreamOutputStream* out,
-    facebook::velox::RowTypePtr* rowTypePtr) {
+arrow::Status VeloxSortBasedShuffleWriter::evictBatch(uint32_t partitionId, 
facebook::velox::RowTypePtr* rowTypePtr) {
   int64_t rawSize = batch_->size();
-  batch_->flush(out);
-  const std::string& outputStr = output->str();
-  RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, 
outputStr.c_str(), outputStr.size()));
-  batch_.reset();
-  output->clear();
-  output->str("");
+  bufferOutputStream_->seekp(0);
+  batch_->flush(bufferOutputStream_.get());
+  auto buffer = bufferOutputStream_->getBuffer();
+  RETURN_NOT_OK(partitionWriter_->evict(partitionId, rawSize, 
buffer->as<char>(), buffer->size()));
   batch_ = 
std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), 
serde_.get());
   batch_->createStreamTree(*rowTypePtr, options_.bufferSize, &serdeOptions_);
   return arrow::Status::OK();
@@ -174,8 +169,6 @@ arrow::Status 
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
   int32_t rowNum = 0;
   const int32_t maxBatchNum = options_.bufferSize;
   auto rowTypePtr = std::static_pointer_cast<const 
facebook::velox::RowType>(rowType_.value());
-  std::ostringstream output;
-  facebook::velox::OStreamOutputStream out(&output);
 
   if (options_.partitioning != Partitioning::kSingle) {
     if (auto it = rowVectorIndexMap_.find(partitionId); it != 
rowVectorIndexMap_.end()) {
@@ -219,7 +212,7 @@ arrow::Status 
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
         rowNum += groupedSize[pair.first];
         if (rowNum >= maxBatchNum) {
           rowNum = 0;
-          RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+          RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
         }
       }
 
@@ -231,13 +224,13 @@ arrow::Status 
VeloxSortBasedShuffleWriter::evictRowVector(uint32_t partitionId)
       rowNum += rowVectorPtr->size();
       batch_->append(rowVectorPtr);
       if (rowNum >= maxBatchNum) {
-        RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+        RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
         rowNum = 0;
       }
     }
   }
   if (rowNum > 0) {
-    RETURN_NOT_OK(evictBatch(partitionId, &output, &out, &rowTypePtr));
+    RETURN_NOT_OK(evictBatch(partitionId, &rowTypePtr));
   }
   return arrow::Status::OK();
 }
diff --git a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h 
b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
index e3ac07dfc..710590184 100644
--- a/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortBasedShuffleWriter.h
@@ -37,6 +37,7 @@
 #include <arrow/type.h>
 
 #include "VeloxShuffleWriter.h"
+#include "memory/BufferOutputStream.h"
 #include "memory/VeloxMemoryManager.h"
 #include "shuffle/PartitionWriter.h"
 #include "shuffle/Partitioner.h"
@@ -65,11 +66,7 @@ class VeloxSortBasedShuffleWriter : public 
VeloxShuffleWriter {
 
   arrow::Status evictRowVector(uint32_t partitionId) override;
 
-  arrow::Status evictBatch(
-      uint32_t partitionId,
-      std::ostringstream* output,
-      facebook::velox::OStreamOutputStream* out,
-      facebook::velox::RowTypePtr* rowTypePtr);
+  arrow::Status evictBatch(uint32_t partitionId, facebook::velox::RowTypePtr* 
rowTypePtr);
 
  private:
   VeloxSortBasedShuffleWriter(
@@ -93,6 +90,7 @@ class VeloxSortBasedShuffleWriter : public VeloxShuffleWriter 
{
   std::optional<facebook::velox::TypePtr> rowType_;
 
   std::unique_ptr<facebook::velox::VectorStreamGroup> batch_;
+  std::unique_ptr<BufferOutputStream> bufferOutputStream_;
 
   // Partition ID -> Row Count
   // subscript: Partition ID
diff --git a/cpp/velox/tests/BufferOutputStreamTest.cc 
b/cpp/velox/tests/BufferOutputStreamTest.cc
new file mode 100644
index 000000000..3b3f78cea
--- /dev/null
+++ b/cpp/velox/tests/BufferOutputStreamTest.cc
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "memory/BufferOutputStream.h"
+#include "memory/VeloxColumnarBatch.h"
+#include "velox/common/memory/ByteStream.h"
+#include "velox/vector/tests/utils/VectorTestBase.h"
+
+using namespace facebook::velox;
+
+namespace gluten {
+class BufferOutputStreamTest : public ::testing::Test, public 
test::VectorTestBase {
+ protected:
+  // Velox requires the mem manager to be instanced.
+  static void SetUpTestCase() {
+    memory::MemoryManager::testingSetInstance({});
+  }
+
+  std::shared_ptr<memory::MemoryPool> veloxPool_ = 
defaultLeafVeloxMemoryPool();
+};
+
+TEST_F(BufferOutputStreamTest, outputStream) {
+  auto out = std::make_unique<BufferOutputStream>(veloxPool_.get(), 10000);
+  std::stringstream referenceSStream;
+  auto reference = 
std::make_unique<facebook::velox::OStreamOutputStream>(&referenceSStream);
+  for (auto i = 0; i < 100; ++i) {
+    std::string data;
+    data.resize(10000);
+    std::fill(data.begin(), data.end(), i);
+    out->write(data.data(), data.size());
+    reference->write(data.data(), data.size());
+  }
+  EXPECT_EQ(reference->tellp(), out->tellp());
+  for (auto i = 0; i < 100; ++i) {
+    std::string data;
+    data.resize(6000);
+    std::fill(data.begin(), data.end(), i + 10);
+    out->seekp(i * 10000 + 5000);
+    reference->seekp(i * 10000 + 5000);
+    out->write(data.data(), data.size());
+    reference->write(data.data(), data.size());
+  }
+  auto str = referenceSStream.str();
+  auto numBytes = veloxPool_->currentBytes();
+  EXPECT_LT(0, numBytes);
+  {
+    auto buffer = out->getBuffer();
+    EXPECT_EQ(numBytes, veloxPool_->currentBytes());
+    EXPECT_EQ(str, std::string(buffer->as<char>(), buffer->size()));
+  }
+
+  out.reset();
+  // We expect dropping the stream frees the backing memory.
+  EXPECT_EQ(0, veloxPool_->currentBytes());
+}
+} // namespace gluten
diff --git a/cpp/velox/tests/CMakeLists.txt b/cpp/velox/tests/CMakeLists.txt
index 58482fe15..a5bd5b4f7 100644
--- a/cpp/velox/tests/CMakeLists.txt
+++ b/cpp/velox/tests/CMakeLists.txt
@@ -61,3 +61,4 @@ add_velox_test(
     FilePathGenerator.cc)
 add_velox_test(spark_functions_test SOURCES SparkFunctionTest.cc)
 add_velox_test(execution_ctx_test SOURCES RuntimeTest.cc)
+add_velox_test(buffer_outputstream_test SOURCES BufferOutputStreamTest.cc)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@gluten.apache.org
For additional commands, e-mail: commits-h...@gluten.apache.org

Reply via email to