kou commented on code in PR #39164:
URL: https://github.com/apache/arrow/pull/39164#discussion_r1428844231
##########
cpp/src/arrow/ipc/message.cc:
##########
@@ -626,10 +626,24 @@ class MessageDecoder::MessageDecoderImpl {
RETURN_NOT_OK(ConsumeMetadataLengthData(data,
next_required_size_));
break;
case State::METADATA: {
- auto buffer = std::make_shared<Buffer>(data, next_required_size_);
+ // We need to copy metadata because it's used in
+ // ConsumeBody(). ConsumeBody() may be called from another
+ // ConsumeData(). We can't assume that the given data for
+ // the current ConsumeData() call is still valid in the
+ // next ConsumeData() call. So we need to copy metadata
+ // here.
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
+ AllocateBuffer(next_required_size_, pool_));
+ memcpy(buffer->mutable_data(), data, next_required_size_);
Review Comment:
It works but it causes needless copy with `Consume(buffer)` API.
If we use `Buffer::ViewOrCopy()` for copying a buffer, we should do this in
`Consume(data, size)`:
```diff
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..351fa6c6db 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -632,9 +632,10 @@ class MessageDecoder::MessageDecoderImpl {
// the current ConsumeData() call is still valid in the
// next ConsumeData() call. So we need to copy metadata
// here.
- ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer,
- AllocateBuffer(next_required_size_,
pool_));
- memcpy(buffer->mutable_data(), data, next_required_size_);
+ ARROW_ASSIGN_OR_RAISE(
+ auto buffer,
+ Buffer::ViewOrCopy(std::make_shared<Buffer>(data,
next_required_size_),
+ CPUDevice::memory_manager(pool_)));
RETURN_NOT_OK(ConsumeMetadataBuffer(buffer));
} break;
case State::BODY: {
```
BTW, we should use `arrow::default_cpu_memory_manager()` for
`arrow::default_memory_pool()`... I'll open a issue for it later.
```diff
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index fbcd6f139b..a0005a0e59 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -607,6 +607,7 @@ class MessageDecoder::MessageDecoderImpl {
MemoryPool* pool, bool skip_body)
: listener_(std::move(listener)),
pool_(pool),
+ memory_manager_(pool_ == default_memory_pool() ?
default_cpu_memory_manager() : CPUDevice::memory_manager(pool_)),
state_(initial_state),
next_required_size_(initial_next_required_size),
chunks_(),
@@ -823,7 +824,7 @@ class MessageDecoder::MessageDecoderImpl {
metadata_ = buffer;
} else {
ARROW_ASSIGN_OR_RAISE(metadata_,
- Buffer::ViewOrCopy(buffer,
CPUDevice::memory_manager(pool_)));
+ Buffer::ViewOrCopy(buffer, memory_manager_));
}
return ConsumeMetadata();
}
@@ -836,14 +837,14 @@ class MessageDecoder::MessageDecoderImpl {
} else {
ARROW_ASSIGN_OR_RAISE(
metadata_,
- Buffer::ViewOrCopy(chunks_[0],
CPUDevice::memory_manager(pool_)));
+ Buffer::ViewOrCopy(chunks_[0], memory_manager_));
}
chunks_.erase(chunks_.begin());
} else {
metadata_ = SliceBuffer(chunks_[0], 0, next_required_size_);
if (!chunks_[0]->is_cpu()) {
ARROW_ASSIGN_OR_RAISE(
- metadata_, Buffer::ViewOrCopy(metadata_,
CPUDevice::memory_manager(pool_)));
+ metadata_, Buffer::ViewOrCopy(metadata_, memory_manager_));
}
chunks_[0] = SliceBuffer(chunks_[0], next_required_size_);
}
@@ -912,7 +913,7 @@ class MessageDecoder::MessageDecoderImpl {
return util::SafeLoadAs<int32_t>(buffer->data());
} else {
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
- Buffer::ViewOrCopy(buffer,
CPUDevice::memory_manager(pool_)));
+ Buffer::ViewOrCopy(buffer, memory_manager_));
return util::SafeLoadAs<int32_t>(cpu_buffer->data());
}
}
@@ -925,7 +926,7 @@ class MessageDecoder::MessageDecoderImpl {
for (auto& chunk : chunks_) {
if (!chunk->is_cpu()) {
ARROW_ASSIGN_OR_RAISE(
- chunk, Buffer::ViewOrCopy(chunk,
CPUDevice::memory_manager(pool_)));
+ chunk, Buffer::ViewOrCopy(chunk, memory_manager_));
}
auto data = chunk->data();
auto data_size = chunk->size();
@@ -951,6 +952,7 @@ class MessageDecoder::MessageDecoderImpl {
std::shared_ptr<MessageDecoderListener> listener_;
MemoryPool* pool_;
+ std::shared_ptr<MemoryManager> memory_manager_;
State state_;
int64_t next_required_size_;
std::vector<std::shared_ptr<Buffer>> chunks_;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]