This is an automated email from the ASF dual-hosted git repository.
pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5a331a9513 GH-49896: [C++] Reject short buffer reads in IPC reader
(#49897)
5a331a9513 is described below
commit 5a331a9513018141fbfabf94e9331cc98953c353
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Apr 30 15:29:57 2026 +0200
GH-49896: [C++] Reject short buffer reads in IPC reader (#49897)
### Rationale for this change
IO methods like `ReadAt` can return less bytes than asked for if the file
is too short, but the IPC reader doesn't always detect for this situation. On
invalid IPC files, this can produce issues down the road such as
half-initialized buffers and large processing times (with a potential denial of
service).
This issue was detected by OSS-Fuzz:
https://issues.oss-fuzz.com/issues/489758017
### What changes are included in this PR?
1. Add `ReadAt` and `ReadAsync` overloads that accept a `bool
allow_short_read` argument
2. Pass `allow_short_read = false` in all suitable places in IPC and
Parquet readers
### Are these changes tested?
Yes, by existing tests and new fuzz regression file.
### Are there any user-facing changes?
No, except potentially better detection of invalid IPC streams and files.
* GitHub Issue: #49896
Authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/dataset/file_parquet_test.cc | 2 +-
cpp/src/arrow/gpu/cuda_memory.cc | 26 ++++++----
cpp/src/arrow/gpu/cuda_memory.h | 6 ++-
cpp/src/arrow/io/caching.cc | 9 ++--
cpp/src/arrow/io/concurrency.h | 14 +++++-
cpp/src/arrow/io/file.cc | 36 ++++++++------
cpp/src/arrow/io/file.h | 6 ++-
cpp/src/arrow/io/file_test.cc | 11 ++++-
cpp/src/arrow/io/interfaces.cc | 45 ++++++++++++++++--
cpp/src/arrow/io/interfaces.h | 39 +++++++++++++--
cpp/src/arrow/io/memory.cc | 50 ++++++++++++++------
cpp/src/arrow/io/memory.h | 9 +++-
cpp/src/arrow/io/memory_test.cc | 5 +-
cpp/src/arrow/io/slow.cc | 13 +++++
cpp/src/arrow/io/slow.h | 4 ++
cpp/src/arrow/io/test_common.cc | 13 ++++-
cpp/src/arrow/io/type_fwd.h | 1 +
cpp/src/arrow/ipc/feather.cc | 17 ++++---
cpp/src/arrow/ipc/message.cc | 42 +++++++----------
cpp/src/arrow/ipc/reader.cc | 55 ++++++++++++----------
cpp/src/arrow/ipc/reader_internal.h | 16 +++++--
cpp/src/parquet/bloom_filter.h | 1 +
cpp/src/parquet/bloom_filter_reader.h | 2 +-
cpp/src/parquet/column_page.h | 2 +
.../encryption/file_system_key_material_store.cc | 1 +
.../parquet/encryption/internal_file_decryptor.h | 1 +
cpp/src/parquet/file_reader.cc | 15 +++---
cpp/src/parquet/page_index.cc | 24 +++++-----
cpp/src/parquet/platform.cc | 1 +
cpp/src/parquet/platform.h | 4 +-
cpp/src/parquet/properties.cc | 10 +---
cpp/src/parquet/properties.h | 1 +
cpp/src/parquet/properties_test.cc | 3 +-
cpp/src/parquet/statistics.cc | 1 -
cpp/src/parquet/statistics.h | 1 +
testing | 2 +-
36 files changed, 332 insertions(+), 156 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc
b/cpp/src/arrow/dataset/file_parquet_test.cc
index 696bda1935..0d86f5eaa0 100644
--- a/cpp/src/arrow/dataset/file_parquet_test.cc
+++ b/cpp/src/arrow/dataset/file_parquet_test.cc
@@ -148,7 +148,7 @@ class DelayedBufferReader : public
::arrow::io::BufferReader {
return DeferNotOk(::arrow::io::internal::SubmitIO(
io_context, [self, position, nbytes]() ->
Result<std::shared_ptr<Buffer>> {
std::this_thread::sleep_for(std::chrono::seconds(1));
- return self->DoReadAt(position, nbytes);
+ return self->DoReadAt(position, nbytes, /*allow_short_read=*/false);
}));
}
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index 87d985a0fc..048455ce3c 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -260,28 +260,38 @@ Status CudaBufferReader::DoSeek(int64_t position) {
}
Result<int64_t> CudaBufferReader::DoReadAt(int64_t position, int64_t nbytes,
- void* buffer) {
+ bool allow_short_read, void*
buffer) {
RETURN_NOT_OK(CheckClosed());
- nbytes = std::min(nbytes, size_ - position);
- RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position,
nbytes));
- return nbytes;
+ auto real_nbytes = std::min(nbytes, size_ - position);
+ if (!allow_short_read && real_nbytes != nbytes) {
+ return Status::IOError("Cuda buffer too short: expected to be able to read
", nbytes,
+ " bytes, got ", real_nbytes);
+ }
+ RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, address_ + position,
real_nbytes));
+ return real_nbytes;
}
Result<int64_t> CudaBufferReader::DoRead(int64_t nbytes, void* buffer) {
RETURN_NOT_OK(CheckClosed());
- ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes,
buffer));
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ DoReadAt(position_, nbytes, /*allow_short_read=*/true,
buffer));
position_ += bytes_read;
return bytes_read;
}
Result<std::shared_ptr<Buffer>> CudaBufferReader::DoReadAt(int64_t position,
- int64_t nbytes) {
+ int64_t nbytes,
+ bool
allow_short_read) {
RETURN_NOT_OK(CheckClosed());
- int64_t size = std::min(nbytes, size_ - position);
- return std::make_shared<CudaBuffer>(buffer_, position, size);
+ auto real_nbytes = std::min(nbytes, size_ - position);
+ if (!allow_short_read && real_nbytes != nbytes) {
+ return Status::IOError("Cuda buffer too short: expected to be able to read
", nbytes,
+ " bytes, got ", real_nbytes);
+ }
+ return std::make_shared<CudaBuffer>(buffer_, position, real_nbytes);
}
Result<std::shared_ptr<Buffer>> CudaBufferReader::DoRead(int64_t nbytes) {
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index 83d65b0e7f..a25b6afd91 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -177,8 +177,10 @@ class ARROW_CUDA_EXPORT CudaBufferReader
Result<int64_t> DoRead(int64_t nbytes, void* buffer);
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
- Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
- Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
+ Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out);
+ Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read);
Result<int64_t> DoTell() const;
Status DoSeek(int64_t position);
diff --git a/cpp/src/arrow/io/caching.cc b/cpp/src/arrow/io/caching.cc
index 74e98170ad..41fdd9f781 100644
--- a/cpp/src/arrow/io/caching.cc
+++ b/cpp/src/arrow/io/caching.cc
@@ -167,7 +167,8 @@ struct ReadRangeCache::Impl {
std::vector<RangeCacheEntry> new_entries;
new_entries.reserve(ranges.size());
for (const auto& range : ranges) {
- new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset,
range.length));
+ new_entries.emplace_back(range, file->ReadAsync(ctx, range.offset,
range.length,
+
/*allow_short_read=*/false));
}
return new_entries;
}
@@ -219,7 +220,8 @@ struct ReadRangeCache::Impl {
++next_it) {
if (!next_it->future.is_valid()) {
next_it->future =
- file->ReadAsync(ctx, next_it->range.offset,
next_it->range.length);
+ file->ReadAsync(ctx, next_it->range.offset,
next_it->range.length,
+ /*allow_short_read=*/false);
}
++num_prefetched;
}
@@ -272,7 +274,8 @@ struct ReadRangeCache::LazyImpl : public
ReadRangeCache::Impl {
Future<std::shared_ptr<Buffer>> MaybeRead(RangeCacheEntry* entry) override {
// Called by superclass Read()/WaitFor() so we have the lock
if (!entry->future.is_valid()) {
- entry->future = file->ReadAsync(ctx, entry->range.offset,
entry->range.length);
+ entry->future = file->ReadAsync(ctx, entry->range.offset,
entry->range.length,
+ /*allow_short_read=*/false);
}
return entry->future;
}
diff --git a/cpp/src/arrow/io/concurrency.h b/cpp/src/arrow/io/concurrency.h
index 35c2aac6a7..8b5d2cb1fc 100644
--- a/cpp/src/arrow/io/concurrency.h
+++ b/cpp/src/arrow/io/concurrency.h
@@ -208,13 +208,23 @@ class RandomAccessFileConcurrencyWrapper : public
RandomAccessFile {
// to use the exclusive_guard.
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) final {
+ return ReadAt(position, nbytes, /*allow_short_read=*/true, out);
+ }
+
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out) final {
auto guard = lock_.shared_guard();
- return derived()->DoReadAt(position, nbytes, out);
+ return derived()->DoReadAt(position, nbytes, allow_short_read, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
final {
+ return ReadAt(position, nbytes, /*allow_short_read=*/true);
+ }
+
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read) final {
auto guard = lock_.shared_guard();
- return derived()->DoReadAt(position, nbytes);
+ return derived()->DoReadAt(position, nbytes, allow_short_read);
}
/*
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index 4ce0b6d662..ed2c8c9a24 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -133,14 +133,21 @@ class OSFile {
return ::arrow::internal::FileRead(fd_.fd(),
reinterpret_cast<uint8_t*>(out), nbytes);
}
- Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) {
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out) {
RETURN_NOT_OK(CheckClosed());
RETURN_NOT_OK(internal::ValidateRange(position, nbytes));
// ReadAt() leaves the file position undefined, so require that we seek
// before calling Read() or Write().
need_seeking_.store(true);
- return ::arrow::internal::FileReadAt(fd_.fd(),
reinterpret_cast<uint8_t*>(out),
- position, nbytes);
+ ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ::arrow::internal::FileReadAt(
+ fd_.fd(),
reinterpret_cast<uint8_t*>(out),
+ position, nbytes));
+ if (!allow_short_read && real_nbytes != nbytes) {
+ return Status::IOError("File too short: expected to be able to read ",
nbytes,
+ " bytes, got ", real_nbytes);
+ }
+ return real_nbytes;
}
Status Seek(int64_t pos) {
@@ -230,21 +237,20 @@ class ReadableFile::ReadableFileImpl : public OSFile {
RETURN_NOT_OK(buffer->Resize(bytes_read));
buffer->ZeroPadding();
}
- // R build with openSUSE155 requires an explicit shared_ptr construction
- return std::shared_ptr<Buffer>(std::move(buffer));
+ return buffer;
}
- Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t
nbytes) {
+ Result<std::shared_ptr<Buffer>> ReadBufferAt(int64_t position, int64_t
nbytes,
+ bool allow_short_read) {
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes, pool_));
- ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
- ReadAt(position, nbytes, buffer->mutable_data()));
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes,
allow_short_read,
+ buffer->mutable_data()));
if (bytes_read < nbytes) {
RETURN_NOT_OK(buffer->Resize(bytes_read));
buffer->ZeroPadding();
}
- // R build with openSUSE155 requires an explicit shared_ptr construction
- return std::shared_ptr<Buffer>(std::move(buffer));
+ return buffer;
}
Status WillNeed(const std::vector<ReadRange>& ranges) {
@@ -322,12 +328,14 @@ Result<int64_t> ReadableFile::DoRead(int64_t nbytes,
void* out) {
return impl_->Read(nbytes, out);
}
-Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes, void*
out) {
- return impl_->ReadAt(position, nbytes, out);
+Result<int64_t> ReadableFile::DoReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read, void* out) {
+ return impl_->ReadAt(position, nbytes, allow_short_read, out);
}
-Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position,
int64_t nbytes) {
- return impl_->ReadBufferAt(position, nbytes);
+Result<std::shared_ptr<Buffer>> ReadableFile::DoReadAt(int64_t position,
int64_t nbytes,
+ bool allow_short_read) {
+ return impl_->ReadBufferAt(position, nbytes, allow_short_read);
}
Result<std::shared_ptr<Buffer>> ReadableFile::DoRead(int64_t nbytes) {
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 50d4f2c4df..a25305b036 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -124,10 +124,12 @@ class ARROW_EXPORT ReadableFile
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
/// \brief Thread-safe implementation of ReadAt
- Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
+ Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out);
/// \brief Thread-safe implementation of ReadAt
- Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
+ Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read);
Result<int64_t> DoGetSize();
Status DoSeek(int64_t position);
diff --git a/cpp/src/arrow/io/file_test.cc b/cpp/src/arrow/io/file_test.cc
index 8970dfe7cc..1e2de3f07a 100644
--- a/cpp/src/arrow/io/file_test.cc
+++ b/cpp/src/arrow/io/file_test.cc
@@ -31,6 +31,7 @@
#include <thread>
#include <vector>
+#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include "arrow/buffer.h"
@@ -399,12 +400,18 @@ TEST_F(TestReadableFile, ReadAsync) {
MakeTestFile();
OpenFile();
- auto fut1 = file_->ReadAsync({}, 1, 10);
- auto fut2 = file_->ReadAsync({}, 0, 4);
+ auto fut1 = file_->ReadAsync(default_io_context(), 1, 10);
+ auto fut2 = file_->ReadAsync(default_io_context(), 0, 4);
+ auto fut3 = file_->ReadAsync(default_io_context(), 1, 10,
/*allow_short_read=*/false);
+ auto fut4 = file_->ReadAsync(default_io_context(), 0, 4,
/*allow_short_read=*/false);
ASSERT_OK_AND_ASSIGN(auto buf1, fut1.result());
ASSERT_OK_AND_ASSIGN(auto buf2, fut2.result());
+ EXPECT_RAISES_WITH_MESSAGE_THAT(IOError, ::testing::HasSubstr("File too
short"),
+ fut3.result());
+ ASSERT_OK_AND_ASSIGN(auto buf4, fut4.result());
AssertBufferEqual(*buf1, "estdata");
AssertBufferEqual(*buf2, "test");
+ AssertBufferEqual(*buf4, "test");
}
TEST_F(TestReadableFile, ReadManyAsync) {
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index cdd2470b62..41a7fdecb2 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -149,12 +149,33 @@ RandomAccessFile::~RandomAccessFile() = default;
RandomAccessFile::RandomAccessFile() : interface_impl_(new Impl()) {}
+Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read, void* out) {
+ ARROW_ASSIGN_OR_RAISE(auto real_nbytes, ReadAt(position, nbytes, out));
+ if (!allow_short_read && real_nbytes != nbytes) {
+ return Status::IOError("File too short: expected to be able to read ",
nbytes,
+ " bytes, got ", real_nbytes);
+ }
+ return real_nbytes;
+}
+
Result<int64_t> RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
void* out) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}
+Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
int64_t nbytes,
+ bool
allow_short_read) {
+ ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(position, nbytes));
+ // XXX the internal `IoRecordedRandomAccessFile` can return a null buffer
+ if (!allow_short_read && buffer && buffer->size() != nbytes) {
+ return Status::IOError("File too short: expected to be able to read ",
nbytes,
+ " bytes, got ", buffer->size());
+ }
+ return buffer;
+}
+
Result<std::shared_ptr<Buffer>> RandomAccessFile::ReadAt(int64_t position,
int64_t nbytes) {
std::lock_guard<std::mutex> lock(interface_impl_->lock_);
@@ -162,25 +183,39 @@ Result<std::shared_ptr<Buffer>>
RandomAccessFile::ReadAt(int64_t position,
return Read(nbytes);
}
-// Default ReadAsync() implementation: simply issue the read on the context's
executor
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext&
ctx,
int64_t position,
int64_t nbytes) {
+ return ReadAsync(ctx, position, nbytes, /*allow_short_read=*/true);
+}
+
+// Default ReadAsync() implementation: simply issue the read on the context's
executor
+Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(const IOContext&
ctx,
+ int64_t position,
+ int64_t nbytes,
+ bool
allow_short_read) {
auto self = std::dynamic_pointer_cast<RandomAccessFile>(shared_from_this());
- return DeferNotOk(internal::SubmitIO(
- ctx, [self, position, nbytes] { return self->ReadAt(position, nbytes);
}));
+ return DeferNotOk(internal::SubmitIO(ctx, [self, position, nbytes,
allow_short_read] {
+ return self->ReadAt(position, nbytes, allow_short_read);
+ }));
}
Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
int64_t nbytes) {
- return ReadAsync(io_context(), position, nbytes);
+ return ReadAsync(io_context(), position, nbytes, /*allow_short_read=*/true);
+}
+
+Future<std::shared_ptr<Buffer>> RandomAccessFile::ReadAsync(int64_t position,
+ int64_t nbytes,
+ bool
allow_short_read) {
+ return ReadAsync(io_context(), position, nbytes, allow_short_read);
}
std::vector<Future<std::shared_ptr<Buffer>>> RandomAccessFile::ReadManyAsync(
const IOContext& ctx, const std::vector<ReadRange>& ranges) {
std::vector<Future<std::shared_ptr<Buffer>>> ret;
for (auto r : ranges) {
- ret.push_back(this->ReadAsync(ctx, r.offset, r.length));
+ ret.push_back(this->ReadAsync(ctx, r.offset, r.length,
/*allow_short_read=*/false));
}
return ret;
}
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index b36c38c6d4..8cb9824707 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -267,8 +267,9 @@ class ARROW_EXPORT RandomAccessFile : public InputStream,
public Seekable {
/// \brief Read data from given file position.
///
- /// At most `nbytes` bytes are read. The number of bytes read is returned
- /// (it can be less than `nbytes` if EOF is reached).
+ /// At most `nbytes` bytes are read. The number of bytes read is returned.
+ /// If `allow_short_read` is true, the number of bytes read can be less than
+ /// `nbytes` if EOF is reached, otherwise an error is returned.
///
/// This method can be safely called from multiple threads concurrently.
/// It is unspecified whether this method updates the file position or not.
@@ -279,13 +280,40 @@ class ARROW_EXPORT RandomAccessFile : public InputStream,
public Seekable {
///
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
+ /// \param[in] allow_short_read Whether to allow reading less than `nbytes`
+ /// \param[out] out The buffer to read bytes into
+ /// \return The number of bytes read, or an error
+ virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out);
+
+ /// \brief Read data from given file position.
+ ///
+ /// Like `ReadAt(position, nbytes, allow_short_read, out)` with
`allow_short_read`
+ /// set to true.
+ ///
+ /// \param[in] position Where to read bytes from
+ /// \param[in] nbytes The number of bytes to read
/// \param[out] out The buffer to read bytes into
/// \return The number of bytes read, or an error
virtual Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out);
/// \brief Read data from given file position.
///
- /// At most `nbytes` bytes are read, but it can be less if EOF is reached.
+ /// At most `nbytes` bytes are read. If `allow_short_read` is true, the
+ /// number of bytes read can be less than `nbytes` if EOF is reached,
+ /// otherwise an error is returned.
+ ///
+ /// \param[in] position Where to read bytes from
+ /// \param[in] nbytes The number of bytes to read
+ /// \param[in] allow_short_read Whether to allow reading less than `nbytes`
+ /// \return A buffer containing the bytes read, or an error
+ virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t
nbytes,
+ bool allow_short_read);
+
+ /// \brief Read data from given file position.
+ ///
+ /// Like `ReadAt(position, nbytes, allow_short_read)` with `allow_short_read`
+ /// set to true.
///
/// \param[in] position Where to read bytes from
/// \param[in] nbytes The number of bytes to read
@@ -293,10 +321,15 @@ class ARROW_EXPORT RandomAccessFile : public InputStream,
public Seekable {
virtual Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t
nbytes);
/// EXPERIMENTAL: Read data asynchronously.
+ virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t
position,
+ int64_t nbytes,
+ bool allow_short_read);
virtual Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t
position,
int64_t nbytes);
/// EXPERIMENTAL: Read data asynchronously, using the file's IOContext.
+ Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes,
+ bool allow_short_read);
Future<std::shared_ptr<Buffer>> ReadAsync(int64_t position, int64_t nbytes);
/// EXPERIMENTAL: Explicit multi-read.
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index d7b118b398..dcbb048c7e 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -317,47 +317,69 @@ Status BufferReader::WillNeed(const
std::vector<ReadRange>& ranges) {
Future<std::shared_ptr<Buffer>> BufferReader::ReadAsync(const IOContext&,
int64_t position,
int64_t nbytes) {
- return Future<std::shared_ptr<Buffer>>::MakeFinished(DoReadAt(position,
nbytes));
+ return Future<std::shared_ptr<Buffer>>::MakeFinished(
+ DoReadAt(position, nbytes, /*allow_short_read=*/true));
}
-Result<int64_t> BufferReader::DoReadAt(int64_t position, int64_t nbytes, void*
buffer) {
+Future<std::shared_ptr<Buffer>> BufferReader::ReadAsync(const IOContext&,
+ int64_t position,
int64_t nbytes,
+ bool allow_short_read)
{
+ return Future<std::shared_ptr<Buffer>>::MakeFinished(
+ DoReadAt(position, nbytes, allow_short_read));
+}
+
+Result<int64_t> BufferReader::DoReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read, void* buffer) {
RETURN_NOT_OK(CheckClosed());
- ARROW_ASSIGN_OR_RAISE(nbytes, internal::ValidateReadRange(position, nbytes,
size_));
+ ARROW_ASSIGN_OR_RAISE(auto real_nbytes,
+ internal::ValidateReadRange(position, nbytes, size_));
DCHECK_GE(nbytes, 0);
- if (nbytes) {
- memcpy(buffer, data_ + position, nbytes);
+ if (!allow_short_read && real_nbytes != nbytes) {
+ return Status::IOError("File too short: expected to be able to read ",
nbytes,
+ " bytes, got ", real_nbytes);
}
- return nbytes;
+ if (real_nbytes) {
+ memcpy(buffer, data_ + position, real_nbytes);
+ }
+ return real_nbytes;
}
-Result<std::shared_ptr<Buffer>> BufferReader::DoReadAt(int64_t position,
int64_t nbytes) {
+Result<std::shared_ptr<Buffer>> BufferReader::DoReadAt(int64_t position,
int64_t nbytes,
+ bool allow_short_read) {
RETURN_NOT_OK(CheckClosed());
- ARROW_ASSIGN_OR_RAISE(nbytes, internal::ValidateReadRange(position, nbytes,
size_));
- DCHECK_GE(nbytes, 0);
+ ARROW_ASSIGN_OR_RAISE(auto real_nbytes,
+ internal::ValidateReadRange(position, nbytes, size_));
+ DCHECK_GE(real_nbytes, 0);
+ if (!allow_short_read && real_nbytes != nbytes) {
+ return Status::IOError("File too short: expected to be able to read ",
nbytes,
+ " bytes, got ", real_nbytes);
+ }
// Arrange for data to be paged in
// RETURN_NOT_OK(::arrow::internal::MemoryAdviseWillNeed(
// {{const_cast<uint8_t*>(data_ + position),
static_cast<size_t>(nbytes)}}));
- if (nbytes > 0 && buffer_ != nullptr) {
- return SliceBuffer(buffer_, position, nbytes);
+ if (real_nbytes > 0 && buffer_ != nullptr) {
+ return SliceBuffer(buffer_, position, real_nbytes);
} else {
- return std::make_shared<Buffer>(data_ + position, nbytes);
+ return std::make_shared<Buffer>(data_ + position, real_nbytes);
}
}
Result<int64_t> BufferReader::DoRead(int64_t nbytes, void* out) {
RETURN_NOT_OK(CheckClosed());
- ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, DoReadAt(position_, nbytes, out));
+ ARROW_ASSIGN_OR_RAISE(int64_t bytes_read,
+ DoReadAt(position_, nbytes, /*allow_short_read=*/true,
out));
position_ += bytes_read;
return bytes_read;
}
Result<std::shared_ptr<Buffer>> BufferReader::DoRead(int64_t nbytes) {
RETURN_NOT_OK(CheckClosed());
- ARROW_ASSIGN_OR_RAISE(auto buffer, DoReadAt(position_, nbytes));
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
+ DoReadAt(position_, nbytes,
/*allow_short_read=*/true));
position_ += buffer->size();
return buffer;
}
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 5ce0204654..77197bbe6e 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -162,6 +162,9 @@ class ARROW_EXPORT BufferReader
// Synchronous ReadAsync override
Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
int64_t nbytes) override;
+ Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext&, int64_t position,
+ int64_t nbytes,
+ bool allow_short_read) override;
Status WillNeed(const std::vector<ReadRange>& ranges) override;
protected:
@@ -171,8 +174,10 @@ class ARROW_EXPORT BufferReader
Result<int64_t> DoRead(int64_t nbytes, void* buffer);
Result<std::shared_ptr<Buffer>> DoRead(int64_t nbytes);
- Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, void* out);
- Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes);
+ Result<int64_t> DoReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out);
+ Result<std::shared_ptr<Buffer>> DoReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read);
Result<std::string_view> DoPeek(int64_t nbytes) override;
Result<int64_t> DoTell() const;
diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc
index 1b2c7bdbf3..3e95e5257a 100644
--- a/cpp/src/arrow/io/memory_test.cc
+++ b/cpp/src/arrow/io/memory_test.cc
@@ -733,9 +733,10 @@ class CountingBufferReader : public BufferReader {
public:
using BufferReader::BufferReader;
Future<std::shared_ptr<Buffer>> ReadAsync(const IOContext& context, int64_t
position,
- int64_t nbytes) override {
+ int64_t nbytes,
+ bool allow_short_read) override {
read_count_++;
- return BufferReader::ReadAsync(context, position, nbytes);
+ return BufferReader::ReadAsync(context, position, nbytes,
allow_short_read);
}
int64_t read_count() const { return read_count_; }
diff --git a/cpp/src/arrow/io/slow.cc b/cpp/src/arrow/io/slow.cc
index 7c11a484fc..44a50c5c25 100644
--- a/cpp/src/arrow/io/slow.cc
+++ b/cpp/src/arrow/io/slow.cc
@@ -134,12 +134,25 @@ Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t
position, int64_t nbytes,
return stream_->ReadAt(position, nbytes, out);
}
+Result<int64_t> SlowRandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read, void* out)
{
+ latencies_->Sleep();
+ return stream_->ReadAt(position, nbytes, allow_short_read, out);
+}
+
Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::ReadAt(int64_t position,
int64_t nbytes) {
latencies_->Sleep();
return stream_->ReadAt(position, nbytes);
}
+Result<std::shared_ptr<Buffer>> SlowRandomAccessFile::ReadAt(int64_t position,
+ int64_t nbytes,
+ bool
allow_short_read) {
+ latencies_->Sleep();
+ return stream_->ReadAt(position, nbytes, allow_short_read);
+}
+
Result<std::string_view> SlowRandomAccessFile::Peek(int64_t nbytes) {
return stream_->Peek(nbytes);
}
diff --git a/cpp/src/arrow/io/slow.h b/cpp/src/arrow/io/slow.h
index fdcc56dfa6..cf8c02c88c 100644
--- a/cpp/src/arrow/io/slow.h
+++ b/cpp/src/arrow/io/slow.h
@@ -106,7 +106,11 @@ class ARROW_EXPORT SlowRandomAccessFile : public
SlowInputStreamBase<RandomAcces
Result<int64_t> Read(int64_t nbytes, void* out) override;
Result<std::shared_ptr<Buffer>> Read(int64_t nbytes) override;
Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out) override;
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override;
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read) override;
Result<std::string_view> Peek(int64_t nbytes) override;
Result<int64_t> GetSize() override;
diff --git a/cpp/src/arrow/io/test_common.cc b/cpp/src/arrow/io/test_common.cc
index b3cfdd0eb2..f3f5073fd6 100644
--- a/cpp/src/arrow/io/test_common.cc
+++ b/cpp/src/arrow/io/test_common.cc
@@ -136,13 +136,22 @@ class TrackedRandomAccessFileImpl : public
TrackedRandomAccessFile {
return delegate_->ReadAt(position, nbytes, out);
}
Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ return ReadAt(position, nbytes, /*allow_short_read=*/true);
+ }
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read) override {
SaveReadRange(position, nbytes);
- return delegate_->ReadAt(position, nbytes);
+ return delegate_->ReadAt(position, nbytes, allow_short_read);
}
Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
int64_t position, int64_t nbytes)
override {
+ return ReadAsync(io_context, position, nbytes, /*allow_short_read=*/true);
+ }
+ Future<std::shared_ptr<Buffer>> ReadAsync(const io::IOContext& io_context,
+ int64_t position, int64_t nbytes,
+ bool allow_short_read) override {
SaveReadRange(position, nbytes);
- return delegate_->ReadAsync(io_context, position, nbytes);
+ return delegate_->ReadAsync(io_context, position, nbytes,
allow_short_read);
}
int64_t num_reads() const override { return read_ranges_.size(); }
diff --git a/cpp/src/arrow/io/type_fwd.h b/cpp/src/arrow/io/type_fwd.h
index a1b9e626bb..40775eeaf4 100644
--- a/cpp/src/arrow/io/type_fwd.h
+++ b/cpp/src/arrow/io/type_fwd.h
@@ -29,6 +29,7 @@ struct FileMode {
struct IOContext;
struct CacheOptions;
+struct ReadRange;
/// EXPERIMENTAL: convenience global singleton for default IOContext settings
ARROW_EXPORT
diff --git a/cpp/src/arrow/ipc/feather.cc b/cpp/src/arrow/ipc/feather.cc
index 6aceaa7f44..54f16103ae 100644
--- a/cpp/src/arrow/ipc/feather.cc
+++ b/cpp/src/arrow/ipc/feather.cc
@@ -154,7 +154,8 @@ class ReaderV1 : public Reader {
int footer_size = magic_size + static_cast<int>(sizeof(uint32_t));
// Now get the footer and verify
- ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(size - footer_size,
footer_size));
+ ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(size - footer_size,
footer_size,
+
/*allow_short_read=*/false));
if (memcmp(buffer->data() + sizeof(uint32_t), kFeatherV1MagicBytes,
magic_size)) {
return Status::Invalid("Feather file footer incomplete");
@@ -164,9 +165,9 @@ class ReaderV1 : public Reader {
if (size < magic_size + footer_size + metadata_length) {
return Status::Invalid("File is smaller than indicated metadata size");
}
- ARROW_ASSIGN_OR_RAISE(
- metadata_buffer_,
- source->ReadAt(size - footer_size - metadata_length, metadata_length));
+ ARROW_ASSIGN_OR_RAISE(metadata_buffer_,
+ source->ReadAt(size - footer_size - metadata_length,
+ metadata_length,
/*allow_short_read=*/false));
metadata_ = fbs::GetCTable(metadata_buffer_->data());
return ReadSchema();
@@ -273,8 +274,9 @@ class ReaderV1 : public Reader {
// Buffer data from the source (may or may not perform a copy depending on
// input source)
- ARROW_ASSIGN_OR_RAISE(auto buffer,
- source_->ReadAt(meta->offset(),
meta->total_bytes()));
+ ARROW_ASSIGN_OR_RAISE(
+ auto buffer,
+ source_->ReadAt(meta->offset(), meta->total_bytes(),
/*allow_short_read=*/false));
int64_t offset = 0;
@@ -783,7 +785,8 @@ Result<std::shared_ptr<Reader>> Reader::Open(
// Determine what kind of file we have. 6 is the max of len(FEA1) and
// len(ARROW1)
constexpr int magic_size = 6;
- ARROW_ASSIGN_OR_RAISE(auto buffer, source->ReadAt(0, magic_size));
+ ARROW_ASSIGN_OR_RAISE(auto buffer,
+ source->ReadAt(0, magic_size,
/*allow_short_read=*/false));
if (memcmp(buffer->data(), kFeatherV1MagicBytes,
strlen(kFeatherV1MagicBytes)) == 0) {
std::shared_ptr<ReaderV1> result = std::make_shared<ReaderV1>();
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 84ee62fe9e..1fef961ff8 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -233,11 +233,8 @@ Result<std::unique_ptr<Message>> Message::ReadFrom(const
int64_t offset,
MessageDecoder decoder(listener, MessageDecoder::State::METADATA,
metadata->size());
ARROW_RETURN_NOT_OK(decoder.Consume(metadata));
- ARROW_ASSIGN_OR_RAISE(auto body, file->ReadAt(offset,
decoder.next_required_size()));
- if (body->size() < decoder.next_required_size()) {
- return Status::IOError("Expected to be able to read ",
decoder.next_required_size(),
- " bytes for message body, got ", body->size());
- }
+ ARROW_ASSIGN_OR_RAISE(auto body, file->ReadAt(offset,
decoder.next_required_size(),
+ /*allow_short_read=*/false));
RETURN_NOT_OK(decoder.Consume(body));
return result;
}
@@ -383,13 +380,8 @@ static Result<std::unique_ptr<Message>>
ReadMessageInternal(
// When body_length is known, read metadata + body in one IO call.
// Otherwise, read only metadata first.
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
- file->ReadAt(offset, metadata_length +
body_length.value_or(0)));
-
- if (metadata->size() < metadata_length) {
- return Status::Invalid("Expected to read ", metadata_length,
- " metadata bytes at offset ", offset, " but got ",
- metadata->size());
- }
+ file->ReadAt(offset, metadata_length +
body_length.value_or(0),
+ /*allow_short_read=*/false));
ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0,
metadata_length)));
@@ -415,21 +407,22 @@ static Result<std::unique_ptr<Message>>
ReadMessageInternal(
decoder.next_required_size(), body));
} else if (body_length.has_value()) {
// Body was already read as part of the combined IO; just slice it out.
+ if (*body_length != decoder.next_required_size()) {
+ // The streaming decoder got out of sync with the actual advertised
+ // metadata and body size, which signals an invalid IPC file.
+ return Status::IOError("Invalid IPC file: advertised body size is ",
+ *body_length, ", but message decoder expects
to read ",
+ decoder.next_required_size(), " bytes
instead");
+ }
body = SliceBuffer(metadata, metadata_length,
std::min(*body_length, metadata->size() -
metadata_length));
} else {
// Body length was unknown; do a separate IO to read the body.
ARROW_ASSIGN_OR_RAISE(
- body, file->ReadAt(offset + metadata_length,
decoder.next_required_size()));
+ body, file->ReadAt(offset + metadata_length,
decoder.next_required_size(),
+ /*allow_short_read=*/false));
}
- if (body->size() != decoder.next_required_size()) {
- // The streaming decoder got out of sync with the actual advertised
- // metadata and body size, which signals an invalid IPC file.
- return Status::IOError("Invalid IPC file: advertised body size is ",
body->size(),
- ", but message decoder expects to read ",
- decoder.next_required_size(), " bytes instead");
- }
RETURN_NOT_OK(decoder.Consume(body));
return result;
}
@@ -472,12 +465,11 @@ Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t
offset, int32_t metada
return Status::Invalid("metadata_length should be at least ",
state->decoder->next_required_size());
}
- return file->ReadAsync(context, offset, metadata_length + body_length)
+ return file
+ ->ReadAsync(context, offset, metadata_length + body_length,
+ /*allow_short_read=*/false)
.Then([=](std::shared_ptr<Buffer> metadata) ->
Result<std::shared_ptr<Message>> {
- if (metadata->size() < metadata_length) {
- return Status::Invalid("Expected to read ", metadata_length,
- " metadata bytes but got ", metadata->size());
- }
+ DCHECK_EQ(metadata->size(), metadata_length + body_length);
ARROW_RETURN_NOT_OK(
state->decoder->Consume(SliceBuffer(metadata, 0,
metadata_length)));
switch (state->decoder->state()) {
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index f7b9c779ab..512305d657 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -210,7 +210,7 @@ class ArrayLoader {
" did not start on 8-byte aligned offset: ",
offset);
}
if (file_) {
- return file_->ReadAt(offset, length).Value(out);
+ return file_->ReadAt(offset, length,
/*allow_short_read=*/false).Value(out);
} else {
if (!AddWithOverflow({read_end.value(), file_offset_}).has_value()) {
return Status::Invalid("Buffer ", buffer_index_, " exceeds IPC file
area");
@@ -1874,19 +1874,15 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
return Status::Invalid("File is too small: ", footer_offset_);
}
- int file_end_size = static_cast<int>(kMagicSize + sizeof(int32_t));
+ constexpr int64_t kTotalMagicSize = kMagicSize + sizeof(int32_t);
auto self =
std::dynamic_pointer_cast<RecordBatchFileReaderImpl>(shared_from_this());
- auto read_magic = file_->ReadAsync(footer_offset_ - file_end_size,
file_end_size);
+ auto read_magic = file_->ReadAsync(footer_offset_ - kTotalMagicSize,
kTotalMagicSize,
+ /*allow_short_read=*/false);
if (executor) read_magic = executor->Transfer(std::move(read_magic));
return read_magic
.Then([=](const std::shared_ptr<Buffer>& buffer)
-> Future<std::shared_ptr<Buffer>> {
- const int64_t expected_footer_size = kMagicSize + sizeof(int32_t);
- if (buffer->size() < expected_footer_size) {
- return Status::Invalid("Unable to read ", expected_footer_size,
- "from end of file");
- }
-
+ DCHECK_EQ(buffer->size(), kTotalMagicSize);
const auto magic_start = buffer->data() + sizeof(int32_t);
if (std::string_view(reinterpret_cast<const char*>(magic_start),
kMagicSize) !=
kArrowMagicBytes) {
@@ -1903,7 +1899,8 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
// Now read the footer
auto read_footer = self->file_->ReadAsync(
- self->footer_offset_ - footer_length - file_end_size,
footer_length);
+ self->footer_offset_ - footer_length - kTotalMagicSize,
footer_length,
+ /*allow_short_read=*/false);
if (executor) read_footer =
executor->Transfer(std::move(read_footer));
return read_footer;
})
@@ -2293,7 +2290,8 @@ Result<std::shared_ptr<SparseIndex>> ReadSparseCOOIndex(
auto* indices_buffer = sparse_index->indicesBuffer();
ARROW_ASSIGN_OR_RAISE(auto indices_data,
- file->ReadAt(indices_buffer->offset(),
indices_buffer->length()));
+ file->ReadAt(indices_buffer->offset(),
indices_buffer->length(),
+ /*allow_short_read=*/false));
std::vector<int64_t> indices_shape({non_zero_length, ndim});
auto* indices_strides = sparse_index->indicesStrides();
std::vector<int64_t> strides(2);
@@ -2329,11 +2327,13 @@ Result<std::shared_ptr<SparseIndex>> ReadSparseCSXIndex(
auto* indptr_buffer = sparse_index->indptrBuffer();
ARROW_ASSIGN_OR_RAISE(auto indptr_data,
- file->ReadAt(indptr_buffer->offset(),
indptr_buffer->length()));
+ file->ReadAt(indptr_buffer->offset(),
indptr_buffer->length(),
+ /*allow_short_read=*/false));
auto* indices_buffer = sparse_index->indicesBuffer();
ARROW_ASSIGN_OR_RAISE(auto indices_data,
- file->ReadAt(indices_buffer->offset(),
indices_buffer->length()));
+ file->ReadAt(indices_buffer->offset(),
indices_buffer->length(),
+ /*allow_short_read=*/false));
std::vector<int64_t> indices_shape({non_zero_length});
const auto indices_minimum_bytes = indices_shape[0] *
indices_type->byte_width();
@@ -2384,12 +2384,13 @@ Result<std::shared_ptr<SparseIndex>> ReadSparseCSFIndex(
sparse_index, &axis_order, &indices_size, &indptr_type, &indices_type));
for (int i = 0; i < static_cast<int>(indptr_buffers->size()); ++i) {
ARROW_ASSIGN_OR_RAISE(indptr_data[i],
file->ReadAt(indptr_buffers->Get(i)->offset(),
-
indptr_buffers->Get(i)->length()));
+
indptr_buffers->Get(i)->length(),
+
/*allow_short_read=*/false));
}
for (int i = 0; i < static_cast<int>(indices_buffers->size()); ++i) {
- ARROW_ASSIGN_OR_RAISE(indices_data[i],
- file->ReadAt(indices_buffers->Get(i)->offset(),
- indices_buffers->Get(i)->length()));
+ ARROW_ASSIGN_OR_RAISE(indices_data[i],
file->ReadAt(indices_buffers->Get(i)->offset(),
+
indices_buffers->Get(i)->length(),
+
/*allow_short_read=*/false));
}
return SparseCSFIndex::Make(indptr_type, indices_type, indices_size,
axis_order,
@@ -2619,7 +2620,8 @@ Result<std::shared_ptr<SparseTensor>>
ReadSparseTensor(const Buffer& metadata,
&non_zero_length,
&sparse_tensor_format_id,
&sparse_tensor, &buffer));
- ARROW_ASSIGN_OR_RAISE(auto data, file->ReadAt(buffer->offset(),
buffer->length()));
+ ARROW_ASSIGN_OR_RAISE(auto data, file->ReadAt(buffer->offset(),
buffer->length(),
+ /*allow_short_read=*/false));
std::shared_ptr<SparseIndex> sparse_index;
switch (sparse_tensor_format_id) {
@@ -2913,8 +2915,12 @@ Status FuzzIpcTensorStream(const uint8_t* data, int64_t
size) {
Result<int64_t> IoRecordedRandomAccessFile::GetSize() { return file_size_; }
Result<int64_t> IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t
nbytes,
- void* out) {
+ bool allow_short_read,
void* out) {
auto num_bytes_read = std::min(file_size_, position + nbytes) - position;
+ if (!allow_short_read && num_bytes_read != nbytes) {
+ return Status::IOError("File too short: expected to be able to read ",
nbytes,
+ " bytes, got ", num_bytes_read);
+ }
if (!read_ranges_.empty() &&
position == read_ranges_.back().offset + read_ranges_.back().length) {
@@ -2927,11 +2933,11 @@ Result<int64_t>
IoRecordedRandomAccessFile::ReadAt(int64_t position, int64_t nby
return num_bytes_read;
}
-Result<std::shared_ptr<Buffer>> IoRecordedRandomAccessFile::ReadAt(int64_t
position,
- int64_t
nbytes) {
- std::shared_ptr<Buffer> out;
- auto result = ReadAt(position, nbytes, &out);
- return out;
+Result<std::shared_ptr<Buffer>> IoRecordedRandomAccessFile::ReadAt(
+ int64_t position, int64_t nbytes, bool allow_short_read) {
+ // We're not supposed to actually read anything, so pass a null output
pointer.
+ RETURN_NOT_OK(ReadAt(position, nbytes, allow_short_read, /*out=*/nullptr));
+ return nullptr;
}
Status IoRecordedRandomAccessFile::Close() {
@@ -2958,6 +2964,7 @@ Result<int64_t> IoRecordedRandomAccessFile::Read(int64_t
nbytes, void* out) {
Result<std::shared_ptr<Buffer>> IoRecordedRandomAccessFile::Read(int64_t
nbytes) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> buffer, ReadAt(position_,
nbytes));
+ // Cannot use buffer->size() since a null buffer is returned...
auto num_bytes_read = std::min(file_size_, position_ + nbytes) - position_;
position_ += num_bytes_read;
return buffer;
diff --git a/cpp/src/arrow/ipc/reader_internal.h
b/cpp/src/arrow/ipc/reader_internal.h
index a71d070bb0..d61ac96f0f 100644
--- a/cpp/src/arrow/ipc/reader_internal.h
+++ b/cpp/src/arrow/ipc/reader_internal.h
@@ -36,7 +36,7 @@ namespace internal {
/// \brief An RandomAccessFile that doesn't perform real IO, but only save all
the IO
/// operations it receives, including read operation's <offset, length>, for
replaying
/// later
-class ARROW_EXPORT IoRecordedRandomAccessFile : public io::RandomAccessFile {
+class ARROW_EXPORT IoRecordedRandomAccessFile final : public
io::RandomAccessFile {
public:
explicit IoRecordedRandomAccessFile(const int64_t file_size)
: file_size_(file_size), position_(0) {}
@@ -55,9 +55,17 @@ class ARROW_EXPORT IoRecordedRandomAccessFile : public
io::RandomAccessFile {
Result<int64_t> GetSize() override;
- Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
-
- Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override;
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override
{
+ return ReadAt(position, nbytes, /*allow_short_read=*/true, out);
+ }
+ Result<int64_t> ReadAt(int64_t position, int64_t nbytes, bool
allow_short_read,
+ void* out) override;
+
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes)
override {
+ return ReadAt(position, nbytes, /*allow_short_read=*/true);
+ }
+ Result<std::shared_ptr<Buffer>> ReadAt(int64_t position, int64_t nbytes,
+ bool allow_short_read) override;
Result<int64_t> Read(int64_t nbytes, void* out) override;
diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h
index e14e0558d3..cabcd5b4a5 100644
--- a/cpp/src/parquet/bloom_filter.h
+++ b/cpp/src/parquet/bloom_filter.h
@@ -20,6 +20,7 @@
#include <cmath>
#include <cstdint>
#include <memory>
+#include <optional>
#include "arrow/util/bit_util.h"
#include "arrow/util/logging.h"
diff --git a/cpp/src/parquet/bloom_filter_reader.h
b/cpp/src/parquet/bloom_filter_reader.h
index cbd267dd19..46e046156d 100644
--- a/cpp/src/parquet/bloom_filter_reader.h
+++ b/cpp/src/parquet/bloom_filter_reader.h
@@ -17,7 +17,7 @@
#pragma once
-#include "arrow/io/interfaces.h"
+#include "arrow/io/type_fwd.h"
#include "parquet/properties.h"
#include "parquet/type_fwd.h"
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index 1b3d4cfd41..f7dbb2526a 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -26,6 +26,8 @@
#include <optional>
#include <string>
+#include "arrow/buffer.h"
+
#include "parquet/size_statistics.h"
#include "parquet/statistics.h"
#include "parquet/types.h"
diff --git a/cpp/src/parquet/encryption/file_system_key_material_store.cc
b/cpp/src/parquet/encryption/file_system_key_material_store.cc
index 7a7db3fa62..fb8c92ceaf 100644
--- a/cpp/src/parquet/encryption/file_system_key_material_store.cc
+++ b/cpp/src/parquet/encryption/file_system_key_material_store.cc
@@ -17,6 +17,7 @@
#include <string_view>
+#include "arrow/buffer.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/json/object_parser.h"
diff --git a/cpp/src/parquet/encryption/internal_file_decryptor.h
b/cpp/src/parquet/encryption/internal_file_decryptor.h
index 1343769ef3..baf2fb646a 100644
--- a/cpp/src/parquet/encryption/internal_file_decryptor.h
+++ b/cpp/src/parquet/encryption/internal_file_decryptor.h
@@ -17,6 +17,7 @@
#pragma once
+#include <functional>
#include <memory>
#include <mutex>
#include <span>
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index af7ccfd7ad..d0552adcee 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -448,8 +448,9 @@ class SerializedFile : public ParquetFileReader::Contents {
metadata_buffer = SliceBuffer(
footer_buffer, footer_read_size - metadata_len - kFooterSize,
metadata_len);
} else {
- PARQUET_ASSIGN_OR_THROW(metadata_buffer,
- source_->ReadAt(metadata_start, metadata_len));
+ PARQUET_ASSIGN_OR_THROW(
+ metadata_buffer,
+ source_->ReadAt(metadata_start, metadata_len,
/*allow_short_read=*/false));
}
// Parse the footer depending on encryption type
@@ -464,8 +465,9 @@ class SerializedFile : public ParquetFileReader::Contents {
// Read the actual footer
metadata_start = read_size.first;
metadata_len = read_size.second;
- PARQUET_ASSIGN_OR_THROW(metadata_buffer,
- source_->ReadAt(metadata_start, metadata_len));
+ PARQUET_ASSIGN_OR_THROW(
+ metadata_buffer,
+ source_->ReadAt(metadata_start, metadata_len,
/*allow_short_read=*/false));
// Fall through
}
@@ -535,7 +537,8 @@ class SerializedFile : public ParquetFileReader::Contents {
std::move(metadata_buffer),
footer_read_size,
metadata_len);
}
- return source_->ReadAsync(metadata_start, metadata_len)
+ return source_
+ ->ReadAsync(metadata_start, metadata_len,
/*allow_short_read=*/false)
.Then([this, footer_buffer, footer_read_size, metadata_len](
const std::shared_ptr<::arrow::Buffer>&
metadata_buffer) {
return ParseMaybeEncryptedMetaDataAsync(footer_buffer,
metadata_buffer,
@@ -563,7 +566,7 @@ class SerializedFile : public ParquetFileReader::Contents {
// Read the actual footer
int64_t metadata_start = read_size.first;
metadata_len = read_size.second;
- return source_->ReadAsync(metadata_start, metadata_len)
+ return source_->ReadAsync(metadata_start, metadata_len,
/*allow_short_read=*/false)
.Then([this, metadata_len, is_encrypted_footer,
file_decryptor = std::move(file_decryptor)](
const std::shared_ptr<::arrow::Buffer>& metadata_buffer) {
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index c06fc77dc5..7434f2828d 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -15,24 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-#include "parquet/page_index.h"
+#include <limits>
+#include <numeric>
+
+#include "arrow/io/interfaces.h"
+#include "arrow/util/int_util_overflow.h"
+#include "arrow/util/logging_internal.h"
+#include "arrow/util/unreachable.h"
+
#include "parquet/encoding.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
#include "parquet/metadata.h"
+#include "parquet/page_index.h"
#include "parquet/schema.h"
#include "parquet/statistics.h"
#include "parquet/thrift_internal.h"
-#include "arrow/util/int_util_overflow.h"
-#include "arrow/util/logging_internal.h"
-#include "arrow/util/unreachable.h"
-
-#include <limits>
-#include <numeric>
-
namespace parquet {
namespace {
@@ -348,11 +349,8 @@ class RowGroupPageIndexReaderImpl : public
RowGroupPageIndexReader {
std::shared_ptr<Buffer> ReadIndexBuffer(int64_t offset, int64_t length,
const char* offset_kind) {
- PARQUET_ASSIGN_OR_THROW(auto buffer, input_->ReadAt(offset, length));
- if (buffer->size() < length) {
- throw ParquetException("Invalid or truncated ", offset_kind, ":
attempted to read ",
- length, " bytes but got only ", buffer->size(), "
bytes");
- }
+ PARQUET_ASSIGN_OR_THROW(auto buffer,
+ input_->ReadAt(offset, length,
/*allow_short_read=*/false));
return buffer;
}
diff --git a/cpp/src/parquet/platform.cc b/cpp/src/parquet/platform.cc
index 98946029fb..5e1eac6f12 100644
--- a/cpp/src/parquet/platform.cc
+++ b/cpp/src/parquet/platform.cc
@@ -21,6 +21,7 @@
#include <memory>
#include <utility>
+#include "arrow/buffer.h"
#include "arrow/io/memory.h"
#include "parquet/exception.h"
diff --git a/cpp/src/parquet/platform.h b/cpp/src/parquet/platform.h
index 92849347d4..e3d3d6691a 100644
--- a/cpp/src/parquet/platform.h
+++ b/cpp/src/parquet/platform.h
@@ -20,11 +20,11 @@
#include <cstdint>
#include <memory>
-#include "arrow/buffer.h" // IWYU pragma: export
-#include "arrow/io/interfaces.h" // IWYU pragma: export
+#include "arrow/io/type_fwd.h" // IWYU pragma: export
#include "arrow/status.h" // IWYU pragma: export
#include "arrow/type_fwd.h" // IWYU pragma: export
#include "arrow/util/macros.h" // IWYU pragma: export
+#include "arrow/util/type_fwd.h" // IWYU pragma: export
#if defined(_WIN32) || defined(__CYGWIN__)
diff --git a/cpp/src/parquet/properties.cc b/cpp/src/parquet/properties.cc
index 94024ad403..6e3cfddd7b 100644
--- a/cpp/src/parquet/properties.cc
+++ b/cpp/src/parquet/properties.cc
@@ -40,14 +40,8 @@ std::shared_ptr<ArrowInputStream>
ReaderProperties::GetStream(
safe_stream,
num_bytes));
return stream;
} else {
- PARQUET_ASSIGN_OR_THROW(auto data, source->ReadAt(start, num_bytes));
-
- if (data->size() != num_bytes) {
- std::stringstream ss;
- ss << "Tried reading " << num_bytes << " bytes starting at position " <<
start
- << " from file but only got " << data->size();
- throw ParquetException(ss.str());
- }
+ PARQUET_ASSIGN_OR_THROW(auto data,
+ source->ReadAt(start, num_bytes,
/*allow_short_read=*/false));
return std::make_shared<::arrow::io::BufferReader>(data);
}
}
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index ee829c4dbc..43657410df 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -23,6 +23,7 @@
#include <unordered_set>
#include <utility>
+#include "arrow/buffer.h"
#include "arrow/io/caching.h"
#include "arrow/type_fwd.h"
#include "arrow/util/compression.h"
diff --git a/cpp/src/parquet/properties_test.cc
b/cpp/src/parquet/properties_test.cc
index a0df0d3048..0743b7ad4d 100644
--- a/cpp/src/parquet/properties_test.cc
+++ b/cpp/src/parquet/properties_test.cc
@@ -147,8 +147,7 @@ TEST(TestReaderProperties, GetStreamInsufficientData) {
FAIL() << "No exception raised";
} catch (const ParquetException& e) {
std::string ex_what =
- ("Tried reading 15 bytes starting at position 12"
- " from file but only got 9");
+ "IOError: File too short: expected to be able to read 15 bytes, got 9";
ASSERT_EQ(ex_what, e.what());
}
}
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index 9b5435e026..09c48ef0ff 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -21,7 +21,6 @@
#include <cmath>
#include <cstring>
#include <limits>
-#include <optional>
#include <type_traits>
#include <utility>
diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h
index c80fb8e3b5..796a889c3f 100644
--- a/cpp/src/parquet/statistics.h
+++ b/cpp/src/parquet/statistics.h
@@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
+#include <optional>
#include <string>
#include <utility>
diff --git a/testing b/testing
index 190638e1b1..4080a40f57 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 190638e1b14af926601dbe0a95caa1940dafedd8
+Subproject commit 4080a40f573c18beca92fc0db68fb322710f5ebb