This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 9086fcc18f refactor(core/raw): Align oio::BlockingRead API with
oio::Read (#4349)
9086fcc18f is described below
commit 9086fcc18f49ed86b0140e23f083308a72a903a6
Author: Xuanwo <[email protected]>
AuthorDate: Wed Mar 13 01:09:38 2024 +0800
refactor(core/raw): Align oio::BlockingRead API with oio::Read (#4349)
* Remove next in read
Signed-off-by: Xuanwo <[email protected]>
* Refactor core
Signed-off-by: Xuanwo <[email protected]>
* fix binding c
Signed-off-by: Xuanwo <[email protected]>
* Fix cpp
Signed-off-by: Xuanwo <[email protected]>
* Address python
Signed-off-by: Xuanwo <[email protected]>
* Fix test
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
bin/ofs/Cargo.lock | 20 +--
bin/oli/Cargo.lock | 107 ++++++++++++++-
bindings/c/src/reader.rs | 15 ++-
bindings/cpp/src/reader.rs | 5 +-
bindings/nodejs/src/lib.rs | 6 +-
bindings/ocaml/src/operator/reader.rs | 6 +-
bindings/python/src/file.rs | 8 +-
core/src/layers/blocking.rs | 18 +--
core/src/layers/chaos.rs | 12 +-
core/src/layers/concurrent_limit.rs | 8 +-
core/src/layers/dtrace.rs | 30 +----
core/src/layers/error_context.rs | 16 +--
core/src/layers/logging.rs | 46 +------
core/src/layers/metrics.rs | 23 +---
core/src/layers/minitrace.rs | 10 +-
core/src/layers/oteltrace.rs | 8 +-
core/src/layers/prometheus.rs | 31 +----
core/src/layers/prometheus_client.rs | 24 +---
core/src/layers/retry.rs | 23 +---
core/src/layers/throttle.rs | 8 +-
core/src/layers/tracing.rs | 12 +-
core/src/raw/enum_utils.rs | 48 ++-----
core/src/raw/oio/cursor.rs | 40 +++---
core/src/raw/oio/read/api.rs | 72 +---------
core/src/raw/oio/read/buffer_reader.rs | 181 +++++++++++---------------
core/src/raw/oio/read/file_read.rs | 109 +++-------------
core/src/raw/oio/read/into_streamable_read.rs | 32 ++---
core/src/raw/oio/read/lazy_read.rs | 13 +-
core/src/raw/oio/read/range_read.rs | 136 +++++++------------
core/src/raw/oio/read/std_read.rs | 42 +++---
core/src/raw/oio/read/tokio_read.rs | 16 +--
core/src/raw/tests/read.rs | 9 +-
core/src/types/operator/blocking_operator.rs | 6 +-
core/src/types/reader.rs | 109 ++++++++++++++--
34 files changed, 494 insertions(+), 755 deletions(-)
diff --git a/bin/ofs/Cargo.lock b/bin/ofs/Cargo.lock
index 591f06532f..a3d40c91b6 100644
--- a/bin/ofs/Cargo.lock
+++ b/bin/ofs/Cargo.lock
@@ -114,9 +114,9 @@ checksum =
"d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backon"
-version = "0.4.1"
+version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
+checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458"
dependencies = [
"fastrand",
"futures-core",
@@ -364,12 +364,9 @@ dependencies = [
[[package]]
name = "fastrand"
-version = "1.9.0"
+version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be"
-dependencies = [
- "instant",
-]
+checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5"
[[package]]
name = "flagset"
@@ -686,15 +683,6 @@ dependencies = [
"hashbrown",
]
-[[package]]
-name = "instant"
-version = "0.1.12"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
-dependencies = [
- "cfg-if",
-]
-
[[package]]
name = "ipnet"
version = "2.9.0"
diff --git a/bin/oli/Cargo.lock b/bin/oli/Cargo.lock
index a8f048c0cd..f28a894261 100644
--- a/bin/oli/Cargo.lock
+++ b/bin/oli/Cargo.lock
@@ -17,6 +17,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
+[[package]]
+name = "aes"
+version = "0.8.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
+dependencies = [
+ "cfg-if",
+ "cipher",
+ "cpufeatures",
+]
+
[[package]]
name = "aho-corasick"
version = "1.1.2"
@@ -365,11 +376,11 @@ dependencies = [
[[package]]
name = "backon"
-version = "0.4.1"
+version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0c1a6197b2120bb2185a267f6515038558b019e92b832bb0320e96d66268dcf9"
+checksum = "c491fa80d69c03084223a4e73c378dd9f9a1e612eb54051213f88b2d5249b458"
dependencies = [
- "fastrand 1.9.0",
+ "fastrand 2.0.1",
"futures-core",
"pin-project",
"tokio",
@@ -457,6 +468,15 @@ dependencies = [
"generic-array",
]
+[[package]]
+name = "block-padding"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
+dependencies = [
+ "generic-array",
+]
+
[[package]]
name = "blocking"
version = "1.5.1"
@@ -550,6 +570,15 @@ dependencies = [
"serde_json",
]
+[[package]]
+name = "cbc"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
+dependencies = [
+ "cipher",
+]
+
[[package]]
name = "cc"
version = "1.0.83"
@@ -589,6 +618,16 @@ dependencies = [
"windows-targets 0.52.0",
]
+[[package]]
+name = "cipher"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
+dependencies = [
+ "crypto-common",
+ "inout",
+]
+
[[package]]
name = "clang-sys"
version = "1.7.0"
@@ -1423,6 +1462,16 @@ dependencies = [
"hashbrown 0.14.3",
]
+[[package]]
+name = "inout"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
+dependencies = [
+ "block-padding",
+ "generic-array",
+]
+
[[package]]
name = "instant"
version = "0.1.12"
@@ -1993,6 +2042,16 @@ dependencies = [
"windows-targets 0.48.5",
]
+[[package]]
+name = "pbkdf2"
+version = "0.12.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2"
+dependencies = [
+ "digest",
+ "hmac",
+]
+
[[package]]
name = "peeking_take_while"
version = "0.1.2"
@@ -2088,6 +2147,21 @@ dependencies = [
"spki",
]
+[[package]]
+name = "pkcs5"
+version = "0.7.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e847e2c91a18bfa887dd028ec33f2fe6f25db77db3619024764914affe8b69a6"
+dependencies = [
+ "aes",
+ "cbc",
+ "der",
+ "pbkdf2",
+ "scrypt",
+ "sha2",
+ "spki",
+]
+
[[package]]
name = "pkcs8"
version = "0.10.2"
@@ -2095,6 +2169,8 @@ source =
"registry+https://github.com/rust-lang/crates.io-index"
checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7"
dependencies = [
"der",
+ "pkcs5",
+ "rand_core",
"spki",
]
@@ -2446,9 +2522,9 @@ checksum =
"c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "reqsign"
-version = "0.14.7"
+version = "0.14.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ed08ac3aa0676637644b1b892202f1ae789c28c15ebfa906128d111ae8086062"
+checksum = "43e319d9de9ff4d941abf4ac718897118b0fe04577ea3f8e0f5788971784eef5"
dependencies = [
"anyhow",
"async-trait",
@@ -2556,6 +2632,7 @@ dependencies = [
"pkcs1",
"pkcs8",
"rand_core",
+ "sha2",
"signature",
"spki",
"subtle",
@@ -2675,6 +2752,15 @@ version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c"
+[[package]]
+name = "salsa20"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "97a22f5af31f73a954c10289c93e8a50cc23d971e80ee446f1f6f7137a088213"
+dependencies = [
+ "cipher",
+]
+
[[package]]
name = "same-file"
version = "1.0.6"
@@ -2699,6 +2785,17 @@ version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
+[[package]]
+name = "scrypt"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f"
+dependencies = [
+ "pbkdf2",
+ "salsa20",
+ "sha2",
+]
+
[[package]]
name = "sct"
version = "0.7.1"
diff --git a/bindings/c/src/reader.rs b/bindings/c/src/reader.rs
index d0604aeebb..8f233dd8e4 100644
--- a/bindings/c/src/reader.rs
+++ b/bindings/c/src/reader.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::io::Read;
-
use ::opendal as core;
use super::*;
@@ -51,12 +49,15 @@ impl opendal_reader {
let buf = unsafe { std::slice::from_raw_parts_mut(buf, len) };
let inner = unsafe { &mut *(*reader).inner };
- let r = inner.read(buf);
+ let r = inner.read(buf.len());
match r {
- Ok(n) => opendal_result_reader_read {
- size: n,
- error: std::ptr::null_mut(),
- },
+ Ok(bs) => {
+ buf[..bs.len()].copy_from_slice(&bs);
+ opendal_result_reader_read {
+ size: bs.len(),
+ error: std::ptr::null_mut(),
+ }
+ }
Err(e) => opendal_result_reader_read {
size: 0,
error: opendal_error::new(
diff --git a/bindings/cpp/src/reader.rs b/bindings/cpp/src/reader.rs
index 6b56d1a4b5..6f9bfd8fe9 100644
--- a/bindings/cpp/src/reader.rs
+++ b/bindings/cpp/src/reader.rs
@@ -16,7 +16,6 @@
// under the License.
use anyhow::Result;
-use od::raw::oio::BlockingRead;
use opendal as od;
use super::ffi;
@@ -25,7 +24,9 @@ pub struct Reader(pub od::BlockingReader);
impl Reader {
pub fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- Ok(self.0.read(buf)?)
+ let bs = self.0.read(buf.len())?;
+ buf[..bs.len()].copy_from_slice(&bs);
+ Ok(bs.len())
}
pub fn seek(&mut self, offset: u64, dir: ffi::SeekFrom) -> Result<u64> {
diff --git a/bindings/nodejs/src/lib.rs b/bindings/nodejs/src/lib.rs
index 4a0a9e188d..fec882b879 100644
--- a/bindings/nodejs/src/lib.rs
+++ b/bindings/nodejs/src/lib.rs
@@ -26,7 +26,6 @@ use std::time::Duration;
use futures::TryStreamExt;
use napi::bindgen_prelude::*;
-use opendal::raw::oio::BlockingRead;
#[napi]
pub struct Operator(opendal::Operator);
@@ -648,7 +647,10 @@ pub struct BlockingReader(opendal::BlockingReader);
impl BlockingReader {
#[napi]
pub fn read(&mut self, mut buf: Buffer) -> Result<usize> {
- self.0.read(buf.as_mut()).map_err(format_napi_error)
+ let buf = buf.as_mut();
+ let bs = self.0.read(buf.len()).map_err(format_napi_error)?;
+ buf[..bs.len()].copy_from_slice(&bs);
+ Ok(bs.len())
}
}
diff --git a/bindings/ocaml/src/operator/reader.rs
b/bindings/ocaml/src/operator/reader.rs
index b3bc7bc587..8ae7b4583b 100644
--- a/bindings/ocaml/src/operator/reader.rs
+++ b/bindings/ocaml/src/operator/reader.rs
@@ -17,14 +17,14 @@
use std::io;
-use opendal::raw::oio::BlockingRead;
-
use super::*;
#[ocaml::func]
#[ocaml::sig("reader -> bytes -> (int, string) Result.t ")]
pub fn reader_read(reader: &mut Reader, buf: &mut [u8]) -> Result<usize,
String> {
- map_res_error(reader.0.read(buf))
+ let bs = map_res_error(reader.0.read(buf.len()))?;
+ buf[..bs.len()].copy_from_slice(&bs);
+ Ok(bs.len())
}
#[ocaml::func]
diff --git a/bindings/python/src/file.rs b/bindings/python/src/file.rs
index 806306ca5a..f8a98c8f51 100644
--- a/bindings/python/src/file.rs
+++ b/bindings/python/src/file.rs
@@ -18,7 +18,6 @@
// Remove this `allow` after
<https://github.com/rust-lang/rust-clippy/issues/12039> fixed.
#![allow(clippy::unnecessary_fallible_conversions)]
-use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
@@ -77,11 +76,10 @@ impl File {
let buffer = match size {
Some(size) => {
- let mut buffer = vec![0; size];
- reader
- .read_exact(&mut buffer)
+ let bs = reader
+ .read_exact(size)
.map_err(|err| PyIOError::new_err(err.to_string()))?;
- buffer
+ bs.to_vec()
}
None => {
let mut buffer = Vec::new();
diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs
index 3476dc6e90..84a0c12292 100644
--- a/core/src/layers/blocking.rs
+++ b/core/src/layers/blocking.rs
@@ -17,7 +17,7 @@
use async_trait::async_trait;
use bytes;
-use bytes::{BufMut, Bytes};
+use bytes::Bytes;
use futures::future::poll_fn;
use tokio::runtime::Handle;
@@ -288,25 +288,13 @@ impl<I> BlockingWrapper<I> {
}
impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
- fn read(&mut self, mut buf: &mut [u8]) -> Result<usize> {
- let bs = self.handle.block_on(self.inner.read(buf.len()));
- let bs = bs?;
- buf.put_slice(&bs);
- Ok(bs.len())
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.handle.block_on(self.inner.read(limit))
}
fn seek(&mut self, pos: std::io::SeekFrom) -> Result<u64> {
self.handle.block_on(self.inner.seek(pos))
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- let bs = self.handle.block_on(self.inner.read(4 * 1024 * 1024));
- match bs {
- Ok(bs) if bs.is_empty() => None,
- Ok(bs) => Some(Ok(bs)),
- Err(err) => Some(Err(err)),
- }
- }
}
impl<I: oio::Write + 'static> oio::BlockingWrite for BlockingWrapper<I> {
diff --git a/core/src/layers/chaos.rs b/core/src/layers/chaos.rs
index 079500c444..55a28a554a 100644
--- a/core/src/layers/chaos.rs
+++ b/core/src/layers/chaos.rs
@@ -192,9 +192,9 @@ impl<R: oio::Read> oio::Read for ChaosReader<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for ChaosReader<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.i_feel_lucky() {
- self.inner.read(buf)
+ self.inner.read(limit)
} else {
Err(Self::unexpected_eof())
}
@@ -207,12 +207,4 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
ChaosReader<R> {
Err(Self::unexpected_eof())
}
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- if self.i_feel_lucky() {
- self.inner.next()
- } else {
- Some(Err(Self::unexpected_eof()))
- }
- }
}
diff --git a/core/src/layers/concurrent_limit.rs
b/core/src/layers/concurrent_limit.rs
index 36fbcb92e7..8a663cfc47 100644
--- a/core/src/layers/concurrent_limit.rs
+++ b/core/src/layers/concurrent_limit.rs
@@ -267,17 +267,13 @@ impl<R: oio::Read> oio::Read for
ConcurrentLimitWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.inner.read(limit)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.inner.seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next()
- }
}
impl<R: oio::Write> oio::Write for ConcurrentLimitWrapper<R> {
diff --git a/core/src/layers/dtrace.rs b/core/src/layers/dtrace.rs
index 4e488dd499..4c4af268a2 100644
--- a/core/src/layers/dtrace.rs
+++ b/core/src/layers/dtrace.rs
@@ -376,14 +376,14 @@ impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr());
self.inner
- .read(buf)
- .map(|n| {
- probe_lazy!(opendal, blocking_reader_read_ok, c_path.as_ptr(),
n);
- n
+ .read(limit)
+ .map(|bs| {
+ probe_lazy!(opendal, blocking_reader_read_ok, c_path.as_ptr(),
bs.len());
+ bs
})
.map_err(|e| {
probe_lazy!(opendal, blocking_reader_read_error,
c_path.as_ptr());
@@ -405,26 +405,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
DtraceLayerWrapper<R> {
e
})
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- let c_path = CString::new(self.path.clone()).unwrap();
- probe_lazy!(opendal, blocking_reader_next_start, c_path.as_ptr());
- self.inner.next().map(|res| match res {
- Ok(bytes) => {
- probe_lazy!(
- opendal,
- blocking_reader_next_ok,
- c_path.as_ptr(),
- bytes.len()
- );
- Ok(bytes)
- }
- Err(e) => {
- probe_lazy!(opendal, blocking_reader_next_error,
c_path.as_ptr());
- Err(e)
- }
- })
- }
}
impl<R: oio::Write> oio::Write for DtraceLayerWrapper<R> {
diff --git a/core/src/layers/error_context.rs b/core/src/layers/error_context.rs
index 43b08ea540..f8112951a1 100644
--- a/core/src/layers/error_context.rs
+++ b/core/src/layers/error_context.rs
@@ -367,12 +367,12 @@ impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
}
impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf).map_err(|err| {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.inner.read(limit).map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
- .with_context("read_buf", buf.len().to_string())
+ .with_context("limit", limit.to_string())
})
}
@@ -384,16 +384,6 @@ impl<T: oio::BlockingRead> oio::BlockingRead for
ErrorContextWrapper<T> {
.with_context("seek", format!("{pos:?}"))
})
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next().map(|v| {
- v.map_err(|err| {
- err.with_operation(ReadOperation::BlockingNext)
- .with_context("service", self.scheme)
- .with_context("path", &self.path)
- })
- })
- }
}
#[async_trait::async_trait]
diff --git a/core/src/layers/logging.rs b/core/src/layers/logging.rs
index 2f9fed02b1..02abb3f3df 100644
--- a/core/src/layers/logging.rs
+++ b/core/src/layers/logging.rs
@@ -1057,10 +1057,10 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- match self.inner.read(buf) {
- Ok(n) => {
- self.read += n as u64;
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ match self.inner.read(limit) {
+ Ok(bs) => {
+ self.read += bs.len() as u64;
trace!(
target: LOGGING_TARGET,
"service={} operation={} path={} read={} -> data read {}B",
@@ -1068,9 +1068,9 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
LoggingReader<R> {
ReadOperation::BlockingRead,
self.path,
self.read,
- n
+ bs.len()
);
- Ok(n)
+ Ok(bs)
}
Err(err) => {
if let Some(lvl) = self.ctx.error_level(&err) {
@@ -1121,40 +1121,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
LoggingReader<R> {
}
}
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- match self.inner.next() {
- Some(Ok(bs)) => {
- self.read += bs.len() as u64;
- trace!(
- target: LOGGING_TARGET,
- "service={} operation={} path={} read={} -> data read {}B",
- self.ctx.scheme,
- ReadOperation::BlockingNext,
- self.path,
- self.read,
- bs.len()
- );
- Some(Ok(bs))
- }
- Some(Err(err)) => {
- if let Some(lvl) = self.ctx.error_level(&err) {
- log!(
- target: LOGGING_TARGET,
- lvl,
- "service={} operation={} path={} read={} -> data read
failed: {}",
- self.ctx.scheme,
- ReadOperation::BlockingNext,
- self.path,
- self.read,
- self.ctx.error_print(&err),
- )
- }
- Some(Err(err))
- }
- None => None,
- }
- }
}
pub struct LoggingWriter<W> {
diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs
index c6001b8678..43b24f803d 100644
--- a/core/src/layers/metrics.rs
+++ b/core/src/layers/metrics.rs
@@ -798,12 +798,12 @@ impl<R: oio::Read> oio::Read for MetricWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for MetricWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner
- .read(buf)
- .map(|n| {
- self.bytes += n as u64;
- n
+ .read(limit)
+ .map(|bs| {
+ self.bytes += bs.len() as u64;
+ bs
})
.map_err(|e| {
self.handle.increment_errors_total(self.op, e.kind());
@@ -817,19 +817,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MetricWrapper<R> {
err
})
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next().map(|res| match res {
- Ok(bytes) => {
- self.bytes += bytes.len() as u64;
- Ok(bytes)
- }
- Err(e) => {
- self.handle.increment_errors_total(self.op, e.kind());
- Err(e)
- }
- })
- }
}
impl<R: oio::Write> oio::Write for MetricWrapper<R> {
diff --git a/core/src/layers/minitrace.rs b/core/src/layers/minitrace.rs
index fbf4b2f064..fdc6a1dc92 100644
--- a/core/src/layers/minitrace.rs
+++ b/core/src/layers/minitrace.rs
@@ -309,10 +309,10 @@ impl<R: oio::Read> oio::Read for MinitraceWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for MinitraceWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
let _g = self.span.set_local_parent();
let _span =
LocalSpan::enter_with_local_parent(ReadOperation::BlockingRead.into_static());
- self.inner.read(buf)
+ self.inner.read(limit)
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
@@ -320,12 +320,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
MinitraceWrapper<R> {
let _span =
LocalSpan::enter_with_local_parent(ReadOperation::BlockingSeek.into_static());
self.inner.seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- let _g = self.span.set_local_parent();
- let _span =
LocalSpan::enter_with_local_parent(ReadOperation::BlockingNext.into_static());
- self.inner.next()
- }
}
impl<R: oio::Write> oio::Write for MinitraceWrapper<R> {
diff --git a/core/src/layers/oteltrace.rs b/core/src/layers/oteltrace.rs
index f8f85be255..90e8d65392 100644
--- a/core/src/layers/oteltrace.rs
+++ b/core/src/layers/oteltrace.rs
@@ -288,17 +288,13 @@ impl<R: oio::Read> oio::Read for OtelTraceWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for OtelTraceWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.inner.read(limit)
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
self.inner.seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next()
- }
}
impl<R: oio::Write> oio::Write for OtelTraceWrapper<R> {
diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs
index 0037217d99..a5179908eb 100644
--- a/core/src/layers/prometheus.rs
+++ b/core/src/layers/prometheus.rs
@@ -719,20 +719,20 @@ impl<R: oio::Read> oio::Read for
PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
let labels = self.stats.generate_metric_label(
self.scheme.into_static(),
Operation::BlockingRead.into_static(),
&self.path,
);
self.inner
- .read(buf)
- .map(|n| {
+ .read(limit)
+ .map(|bs| {
self.stats
.bytes_total
.with_label_values(&labels)
- .observe(n as f64);
- n
+ .observe(bs.len() as f64);
+ bs
})
.map_err(|e| {
self.stats.increment_errors_total(self.op, e.kind());
@@ -746,27 +746,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
PrometheusMetricWrapper<R> {
err
})
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- let labels = self.stats.generate_metric_label(
- self.scheme.into_static(),
- Operation::BlockingRead.into_static(),
- &self.path,
- );
- self.inner.next().map(|res| match res {
- Ok(bytes) => {
- self.stats
- .bytes_total
- .with_label_values(&labels)
- .observe(bytes.len() as f64);
- Ok(bytes)
- }
- Err(e) => {
- self.stats.increment_errors_total(self.op, e.kind());
- Err(e)
- }
- })
- }
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
diff --git a/core/src/layers/prometheus_client.rs
b/core/src/layers/prometheus_client.rs
index 7c12f3ecfd..95e9bd1631 100644
--- a/core/src/layers/prometheus_client.rs
+++ b/core/src/layers/prometheus_client.rs
@@ -566,12 +566,12 @@ impl<R: oio::Read> oio::Read for
PrometheusMetricWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
self.inner
- .read(buf)
- .map(|n| {
- self.bytes_total += n;
- n
+ .read(limit)
+ .map(|bs| {
+ self.bytes_total += bs.len();
+ bs
})
.map_err(|e| {
self.metrics
@@ -587,20 +587,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
PrometheusMetricWrapper<R> {
err
})
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next().map(|res| match res {
- Ok(bytes) => {
- self.bytes_total += bytes.len();
- Ok(bytes)
- }
- Err(e) => {
- self.metrics
- .increment_errors_total(self.scheme, self.op, e.kind());
- Err(e)
- }
- })
- }
}
impl<R: oio::Write> oio::Write for PrometheusMetricWrapper<R> {
diff --git a/core/src/layers/retry.rs b/core/src/layers/retry.rs
index 5c4fce8dcd..e6582114e8 100644
--- a/core/src/layers/retry.rs
+++ b/core/src/layers/retry.rs
@@ -737,8 +737,8 @@ impl<R: oio::Read, I: RetryInterceptor> oio::Read for
RetryWrapper<R, I> {
}
impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for
RetryWrapper<R, I> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- { || self.inner.as_mut().unwrap().read(buf) }
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ { || self.inner.as_mut().unwrap().read(limit) }
.retry(&self.builder)
.when(|e| e.is_temporary())
.notify(|err, dur| {
@@ -772,25 +772,6 @@ impl<R: oio::BlockingRead, I: RetryInterceptor>
oio::BlockingRead for RetryWrapp
.call()
.map_err(|e| e.set_persistent())
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- { || self.inner.as_mut().unwrap().next().transpose() }
- .retry(&self.builder)
- .when(|e| e.is_temporary())
- .notify(|err, dur| {
- self.notify.intercept(
- err,
- dur,
- &[
- ("operation",
ReadOperation::BlockingNext.into_static()),
- ("path", &self.path),
- ],
- );
- })
- .call()
- .map_err(|e| e.set_persistent())
- .transpose()
- }
}
impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> {
diff --git a/core/src/layers/throttle.rs b/core/src/layers/throttle.rs
index 3e42839f46..0082a62735 100644
--- a/core/src/layers/throttle.rs
+++ b/core/src/layers/throttle.rs
@@ -196,18 +196,14 @@ impl<R: oio::Read> oio::Read for ThrottleWrapper<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for ThrottleWrapper<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
// TODO: How can we handle buffer reads with a limiter?
- self.inner.read(buf)
+ self.inner.read(limit)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.inner.seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next()
- }
}
impl<R: oio::Write> oio::Write for ThrottleWrapper<R> {
diff --git a/core/src/layers/tracing.rs b/core/src/layers/tracing.rs
index 6dfd34138d..017264cd32 100644
--- a/core/src/layers/tracing.rs
+++ b/core/src/layers/tracing.rs
@@ -290,8 +290,8 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
TracingWrapper<R> {
parent = &self.span,
level = "trace",
skip_all)]
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.inner.read(limit)
}
#[tracing::instrument(
@@ -301,14 +301,6 @@ impl<R: oio::BlockingRead> oio::BlockingRead for
TracingWrapper<R> {
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
self.inner.seek(pos)
}
-
- #[tracing::instrument(
- parent = &self.span,
- level = "trace",
- skip_all)]
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next()
- }
}
impl<R: oio::Write> oio::Write for TracingWrapper<R> {
diff --git a/core/src/raw/enum_utils.rs b/core/src/raw/enum_utils.rs
index e2d2a10dff..430e513774 100644
--- a/core/src/raw/enum_utils.rs
+++ b/core/src/raw/enum_utils.rs
@@ -74,10 +74,10 @@ impl<ONE: oio::Read, TWO: oio::Read> oio::Read for
TwoWays<ONE, TWO> {
}
impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead> oio::BlockingRead for
TwoWays<ONE, TWO> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
- Self::One(v) => v.read(buf),
- Self::Two(v) => v.read(buf),
+ Self::One(v) => v.read(limit),
+ Self::Two(v) => v.read(limit),
}
}
@@ -87,13 +87,6 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead>
oio::BlockingRead for TwoWa
Self::Two(v) => v.seek(pos),
}
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- match self {
- Self::One(v) => v.next(),
- Self::Two(v) => v.next(),
- }
- }
}
impl<ONE: oio::Write, TWO: oio::Write> oio::Write for TwoWays<ONE, TWO> {
@@ -152,11 +145,11 @@ impl<ONE: oio::Read, TWO: oio::Read, THREE: oio::Read>
oio::Read for ThreeWays<O
impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead, THREE: oio::BlockingRead>
oio::BlockingRead
for ThreeWays<ONE, TWO, THREE>
{
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
- Self::One(v) => v.read(buf),
- Self::Two(v) => v.read(buf),
- Self::Three(v) => v.read(buf),
+ Self::One(v) => v.read(limit),
+ Self::Two(v) => v.read(limit),
+ Self::Three(v) => v.read(limit),
}
}
@@ -167,14 +160,6 @@ impl<ONE: oio::BlockingRead, TWO: oio::BlockingRead,
THREE: oio::BlockingRead> o
Self::Three(v) => v.seek(pos),
}
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- match self {
- Self::One(v) => v.next(),
- Self::Two(v) => v.next(),
- Self::Three(v) => v.next(),
- }
- }
}
impl<ONE: oio::Write, TWO: oio::Write, THREE: oio::Write> oio::Write
@@ -252,12 +237,12 @@ where
THREE: oio::BlockingRead,
FOUR: oio::BlockingRead,
{
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
match self {
- Self::One(v) => v.read(buf),
- Self::Two(v) => v.read(buf),
- Self::Three(v) => v.read(buf),
- Self::Four(v) => v.read(buf),
+ Self::One(v) => v.read(limit),
+ Self::Two(v) => v.read(limit),
+ Self::Three(v) => v.read(limit),
+ Self::Four(v) => v.read(limit),
}
}
@@ -269,15 +254,6 @@ where
Self::Four(v) => v.seek(pos),
}
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- match self {
- Self::One(v) => v.next(),
- Self::Two(v) => v.next(),
- Self::Three(v) => v.next(),
- Self::Four(v) => v.next(),
- }
- }
}
impl<ONE, TWO, THREE, FOUR> oio::List for FourWays<ONE, TWO, THREE, FOUR>
diff --git a/core/src/raw/oio/cursor.rs b/core/src/raw/oio/cursor.rs
index a920652451..75a034f102 100644
--- a/core/src/raw/oio/cursor.rs
+++ b/core/src/raw/oio/cursor.rs
@@ -16,7 +16,6 @@
// under the License.
use std::cmp::min;
-use std::io::Read;
use std::io::SeekFrom;
use std::task::Context;
use std::task::Poll;
@@ -72,6 +71,18 @@ impl From<Vec<u8>> for Cursor {
}
impl oio::Read for Cursor {
+ async fn read(&mut self, limit: usize) -> Result<Bytes> {
+ if self.is_empty() {
+ Ok(Bytes::new())
+ } else {
+ // The clone here is required as we don't want to change it.
+ let mut bs = self.inner.clone().split_off(self.pos as usize);
+ let bs = bs.split_to(min(bs.len(), limit));
+ self.pos += bs.len() as u64;
+ Ok(bs)
+ }
+ }
+
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let (base, amt) = match pos {
SeekFrom::Start(n) => (0, n as i64),
@@ -91,8 +102,10 @@ impl oio::Read for Cursor {
self.pos = n;
Ok(n)
}
+}
- async fn read(&mut self, limit: usize) -> Result<Bytes> {
+impl oio::BlockingRead for Cursor {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.is_empty() {
Ok(Bytes::new())
} else {
@@ -103,18 +116,6 @@ impl oio::Read for Cursor {
Ok(bs)
}
}
-}
-
-impl oio::BlockingRead for Cursor {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- let n = Read::read(&mut self.remaining_slice(), buf).map_err(|err| {
- Error::new(ErrorKind::Unexpected, "read data from Cursor")
- .with_context("source", "Cursor")
- .set_source(err)
- })?;
- self.pos += n as u64;
- Ok(n)
- }
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
let (base, amt) = match pos {
@@ -135,17 +136,6 @@ impl oio::BlockingRead for Cursor {
self.pos = n;
Ok(n)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- if self.is_empty() {
- None
- } else {
- // The clone here is required as we don't want to change it.
- let bs = self.inner.clone().split_off(self.pos as usize);
- self.pos += bs.len() as u64;
- Some(Ok(bs))
- }
- }
}
impl oio::Stream for Cursor {
diff --git a/core/src/raw/oio/read/api.rs b/core/src/raw/oio/read/api.rs
index 22865375ab..d8ed05c4ec 100644
--- a/core/src/raw/oio/read/api.rs
+++ b/core/src/raw/oio/read/api.rs
@@ -22,7 +22,6 @@ use std::ops::DerefMut;
use bytes::Bytes;
use futures::Future;
-use tokio::io::ReadBuf;
use crate::raw::BoxedFuture;
use crate::*;
@@ -177,65 +176,15 @@ pub type BlockingReader = Box<dyn BlockingRead>;
/// is optional. We use `Read` to make users life easier.
pub trait BlockingRead: Send + Sync {
/// Read synchronously.
- fn read(&mut self, buf: &mut [u8]) -> Result<usize>;
+ fn read(&mut self, limit: usize) -> Result<Bytes>;
/// Seek synchronously.
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64>;
-
- /// Iterating [`Bytes`] from underlying reader.
- fn next(&mut self) -> Option<Result<Bytes>>;
-
- /// Read all data of current reader to the end of buf.
- fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
- let start_len = buf.len();
- let start_cap = buf.capacity();
-
- loop {
- if buf.len() == buf.capacity() {
- buf.reserve(32); // buf is full, need more space
- }
-
- let spare = buf.spare_capacity_mut();
- let mut read_buf: ReadBuf = ReadBuf::uninit(spare);
-
- // SAFETY: These bytes were initialized but not filled in the
previous loop
- unsafe {
- read_buf.assume_init(read_buf.capacity());
- }
-
- match self.read(read_buf.initialize_unfilled()) {
- Ok(0) => return Ok(buf.len() - start_len),
- Ok(n) => {
- // SAFETY: Read API makes sure that returning `n` is
correct.
- unsafe {
- buf.set_len(buf.len() + n);
- }
- }
- Err(e) => return Err(e),
- }
-
- // The buffer might be an exact fit. Let's read into a probe buffer
- // and see if it returns `Ok(0)`. If so, we've avoided an
- // unnecessary doubling of the capacity. But if not, append the
- // probe buffer to the primary buffer and let its capacity grow.
- if buf.len() == buf.capacity() && buf.capacity() == start_cap {
- let mut probe = [0u8; 32];
-
- match self.read(&mut probe) {
- Ok(0) => return Ok(buf.len() - start_len),
- Ok(n) => {
- buf.extend_from_slice(&probe[..n]);
- }
- Err(e) => return Err(e),
- }
- }
- }
- }
}
impl BlockingRead for () {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- let _ = buf;
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ let _ = limit;
unimplemented!("read is required to be implemented for
oio::BlockingRead")
}
@@ -248,27 +197,16 @@ impl BlockingRead for () {
"output blocking reader doesn't support seeking",
))
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- Some(Err(Error::new(
- ErrorKind::Unsupported,
- "output reader doesn't support iterating",
- )))
- }
}
/// `Box<dyn BlockingRead>` won't implement `BlockingRead` automatically.
/// To make BlockingReader work as expected, we must add this impl.
impl<T: BlockingRead + ?Sized> BlockingRead for Box<T> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- (**self).read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ (**self).read(limit)
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
(**self).seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- (**self).next()
- }
}
diff --git a/core/src/raw/oio/read/buffer_reader.rs
b/core/src/raw/oio/read/buffer_reader.rs
index 33709b2355..733878d1cc 100644
--- a/core/src/raw/oio/read/buffer_reader.rs
+++ b/core/src/raw/oio/read/buffer_reader.rs
@@ -18,7 +18,6 @@
use std::cmp::min;
use std::io::SeekFrom;
-use bytes::BufMut;
use bytes::Bytes;
use tokio::io::ReadBuf;
@@ -136,30 +135,6 @@ impl<R> oio::Read for BufferReader<R>
where
R: oio::Read,
{
- async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
- match pos {
- SeekFrom::Start(new_pos) => {
- // TODO(weny): Check the overflowing.
- let Some(offset) = (new_pos as i64).checked_sub(self.cur as
i64) else {
- return self.inner_seek(pos).await;
- };
-
- match self.seek_relative(offset) {
- Some(cur) => Ok(cur),
- None => self.inner_seek(pos).await,
- }
- }
- SeekFrom::Current(offset) => match self.seek_relative(offset) {
- Some(cur) => Ok(cur),
- None => {
- self.inner_seek(SeekFrom::Current(offset -
self.unconsumed_buffer_len()))
- .await
- }
- },
- SeekFrom::End(_) => self.inner_seek(pos).await,
- }
- }
-
async fn read(&mut self, limit: usize) -> Result<Bytes> {
if limit == 0 {
return Ok(Bytes::new());
@@ -190,6 +165,30 @@ where
self.consume(bytes.len());
Ok(bytes)
}
+
+ async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+ match pos {
+ SeekFrom::Start(new_pos) => {
+ // TODO(weny): Check the overflowing.
+ let Some(offset) = (new_pos as i64).checked_sub(self.cur as
i64) else {
+ return self.inner_seek(pos).await;
+ };
+
+ match self.seek_relative(offset) {
+ Some(cur) => Ok(cur),
+ None => self.inner_seek(pos).await,
+ }
+ }
+ SeekFrom::Current(offset) => match self.seek_relative(offset) {
+ Some(cur) => Ok(cur),
+ None => {
+ self.inner_seek(SeekFrom::Current(offset -
self.unconsumed_buffer_len()))
+ .await
+ }
+ },
+ SeekFrom::End(_) => self.inner_seek(pos).await,
+ }
+ }
}
impl<R> BufferReader<R>
@@ -210,11 +209,12 @@ where
let mut buf = ReadBuf::uninit(dst);
unsafe { buf.assume_init(cap) };
- let n = self.r.read(buf.initialized_mut())?;
- unsafe { self.buf.set_len(n) }
+ let bs = self.r.read(cap)?;
+ buf.put_slice(&bs);
+ unsafe { self.buf.set_len(bs.len()) }
self.pos = 0;
- self.filled = n;
+ self.filled = bs.len();
}
Ok(&self.buf[self.pos..self.filled])
@@ -233,32 +233,35 @@ impl<R> BlockingRead for BufferReader<R>
where
R: BlockingRead,
{
- fn read(&mut self, mut dst: &mut [u8]) -> Result<usize> {
- // Sanity check for normal cases.
- if dst.is_empty() {
- return Ok(0);
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ if limit == 0 {
+ return Ok(Bytes::new());
}
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
- if self.pos == self.filled && dst.len() >= self.capacity() {
- let res = self.r.read(dst);
+ if self.pos == self.filled && limit >= self.capacity() {
+ let res = self.r.read(limit);
self.discard_buffer();
return match res {
- Ok(nread) => {
- self.cur += nread as u64;
- Ok(nread)
+ Ok(bs) => {
+ self.cur += bs.len() as u64;
+ Ok(bs)
}
Err(err) => Err(err),
};
}
- let rem = self.blocking_fill_buf()?;
- let amt = min(rem.len(), dst.len());
- dst.put(&rem[..amt]);
- self.consume(amt);
- Ok(amt)
+ let bytes = self.blocking_fill_buf()?;
+
+ if bytes.is_empty() {
+ return Ok(Bytes::new());
+ }
+ let size = min(bytes.len(), limit);
+ let bytes = Bytes::copy_from_slice(&bytes[..size]);
+ self.consume(bytes.len());
+ Ok(bytes)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
@@ -282,21 +285,6 @@ where
SeekFrom::End(_) => self.blocking_inner_seek(pos),
}
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- match self.blocking_fill_buf() {
- Ok(bytes) => {
- if bytes.is_empty() {
- return None;
- }
-
- let bytes = Bytes::copy_from_slice(bytes);
- self.consume(bytes.len());
- Some(Ok(bytes))
- }
- Err(err) => Some(Err(err)),
- }
- }
}
#[cfg(test)]
@@ -397,8 +385,8 @@ mod tests {
}
impl BlockingRead for MockReader {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.inner.read(limit)
}
fn seek(&mut self, _pos: SeekFrom) -> Result<u64> {
@@ -407,10 +395,6 @@ mod tests {
"output reader doesn't support seeking",
))
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- self.inner.next()
- }
}
#[tokio::test]
@@ -634,20 +618,17 @@ mod tests {
let r = Box::new(BufferReader::new(r, buf_cap)) as oio::BlockingReader;
let mut r = BlockingReader::new(r);
- let mut dst = [0u8; 5];
- let nread = r.read(&mut dst)?;
- assert_eq!(nread, dst.len());
- assert_eq!(&dst, b"Hello");
+ let buf = r.read(5)?;
+ assert_eq!(buf.len(), 5);
+ assert_eq!(buf.as_ref(), b"Hello");
- let mut dst = [0u8; 5];
- let nread = r.read(&mut dst)?;
- assert_eq!(nread, dst.len());
- assert_eq!(&dst, b", Wor");
+ let buf = r.read(5)?;
+ assert_eq!(buf.len(), 5);
+ assert_eq!(buf.as_ref(), b", Wor");
- let mut dst = [0u8; 3];
- let nread = r.read(&mut dst)?;
- assert_eq!(nread, dst.len());
- assert_eq!(&dst, b"ld!");
+ let buf = r.read(3)?;
+ assert_eq!(buf.len(), 3);
+ assert_eq!(buf.as_ref(), b"ld!");
Ok(())
}
@@ -661,33 +642,29 @@ mod tests {
let mut r = BlockingReader::new(r);
// The underlying reader buffers the b"Hello, Wor".
- let mut dst = [0u8; 5];
- let nread = r.read(&mut dst)?;
- assert_eq!(nread, dst.len());
- assert_eq!(&dst, b"Hello");
+ let buf = r.read(5)?;
+ assert_eq!(buf.len(), 5);
+ assert_eq!(buf.as_ref(), b"Hello");
let pos = r.seek(SeekFrom::Start(7))?;
assert_eq!(pos, 7);
- let mut dst = [0u8; 5];
- let nread = r.read(&mut dst)?;
- assert_eq!(&dst[..nread], &bs[7..10]);
- assert_eq!(nread, 3);
+ let buf = r.read(5)?;
+ assert_eq!(&buf[..], &bs[7..10]);
+ assert_eq!(buf.len(), 3);
// Should perform a relative seek.
let pos = r.seek(SeekFrom::Start(0))?;
assert_eq!(pos, 0);
- let mut dst = [0u8; 9];
- let nread = r.read(&mut dst)?;
- assert_eq!(&dst[..nread], &bs[0..9]);
- assert_eq!(nread, 9);
+ let buf = r.read(9)?;
+ assert_eq!(&buf[..], &bs[0..9]);
+ assert_eq!(buf.len(), 9);
// Should perform a non-relative seek.
let pos = r.seek(SeekFrom::Start(11))?;
assert_eq!(pos, 11);
- let mut dst = [0u8; 9];
- let nread = r.read(&mut dst)?;
- assert_eq!(&dst[..nread], &bs[11..13]);
- assert_eq!(nread, 2);
+ let buf = r.read(9)?;
+ assert_eq!(&buf[..], &bs[11..13]);
+ assert_eq!(buf.len(), 2);
Ok(())
}
@@ -734,9 +711,8 @@ mod tests {
let mut cur = 0;
for _ in 0..3 {
- let mut dst = [0u8; 5];
- let nread = r.read(&mut dst)?;
- assert_eq!(nread, 5);
+ let bs = r.read(5)?;
+ assert_eq!(bs.len(), 5);
cur += 5;
}
@@ -757,9 +733,8 @@ mod tests {
let mut cur = 0;
for _ in 0..3 {
- let mut dst = [0u8; 6];
- let nread = r.read(&mut dst)?;
- assert_eq!(nread, 6);
+ let bs = r.read(6)?;
+ assert_eq!(bs.len(), 6);
cur += 6;
}
@@ -771,8 +746,6 @@ mod tests {
#[tokio::test]
async fn test_blocking_read_part() -> anyhow::Result<()> {
- use std::io::Read;
-
let (bs, _) = gen_bytes();
let acc = Arc::new(MockReadService::new(bs.clone()));
let r = Box::new(RangeReader::new(
@@ -784,7 +757,7 @@ mod tests {
let mut r = BlockingReader::new(r);
let mut buf = Vec::new();
- BlockingRead::read_to_end(&mut r, &mut buf)?;
+ r.read_to_end(&mut buf)?;
assert_eq!(4096, buf.len(), "read size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[4096..4096 + 4096])),
@@ -796,7 +769,7 @@ mod tests {
assert_eq!(n, 0, "seek position must be 0");
let mut buf = Vec::new();
- BlockingRead::read_to_end(&mut r, &mut buf)?;
+ r.read_to_end(&mut buf)?;
assert_eq!(4096, buf.len(), "read twice size");
assert_eq!(
format!("{:x}", Sha256::digest(&bs[4096..4096 + 4096])),
@@ -807,8 +780,7 @@ mod tests {
let n = r.seek(SeekFrom::Start(1024))?;
assert_eq!(1024, n, "seek to 1024");
- let mut buf = vec![0; 1024];
- r.read_exact(&mut buf)?;
+ let buf = r.read_exact(1024)?;
assert_eq!(
format!("{:x}", Sha256::digest(&bs[4096 + 1024..4096 + 2048])),
format!("{:x}", Sha256::digest(&buf)),
@@ -818,8 +790,7 @@ mod tests {
let n = r.seek(SeekFrom::Current(1024))?;
assert_eq!(3072, n, "seek to 3072");
- let mut buf = vec![0; 1024];
- r.read_exact(&mut buf)?;
+ let buf = r.read_exact(1024)?;
assert_eq!(
format!("{:x}", Sha256::digest(&bs[4096 + 3072..4096 + 3072 +
1024])),
format!("{:x}", Sha256::digest(&buf)),
diff --git a/core/src/raw/oio/read/file_read.rs
b/core/src/raw/oio/read/file_read.rs
index 6a27733634..64596c754a 100644
--- a/core/src/raw/oio/read/file_read.rs
+++ b/core/src/raw/oio/read/file_read.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::cmp;
use std::io::SeekFrom;
use std::sync::Arc;
@@ -40,7 +39,6 @@ pub struct FileReader<A: Accessor, R> {
cur: u64,
reader: Option<R>,
- buf: oio::AdaptiveBuf,
/// Do we need to reset our cursor?
seek_dirty: bool,
}
@@ -63,7 +61,6 @@ where
offset: None,
size: None,
cur: 0,
- buf: oio::AdaptiveBuf::default(),
reader: None,
seek_dirty: false,
}
@@ -213,6 +210,24 @@ where
A: Accessor<Reader = R>,
R: oio::Read,
{
+ async fn read(&mut self, limit: usize) -> Result<Bytes> {
+ if self.reader.is_none() {
+ // FileReader doesn't support range, we will always use full range
to open a file.
+ let op = self.op.clone().with_range(BytesRange::from(..));
+ let (_, r) = self.acc.read(&self.path, op).await?;
+ self.reader = Some(r);
+ }
+
+ let r = self.reader.as_mut().expect("reader must be valid");
+
+ // We should know where to start read the data.
+ if self.offset.is_none() {
+ (self.offset, self.size) = Self::offset(r, self.op.range()).await?;
+ }
+
+ r.read(limit).await
+ }
+
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
if self.reader.is_none() {
// FileReader doesn't support range, we will always use full range
to open a file.
@@ -245,24 +260,6 @@ where
self.cur = pos - self.offset.unwrap();
Ok(self.cur)
}
-
- async fn read(&mut self, limit: usize) -> Result<Bytes> {
- if self.reader.is_none() {
- // FileReader doesn't support range, we will always use full range
to open a file.
- let op = self.op.clone().with_range(BytesRange::from(..));
- let (_, r) = self.acc.read(&self.path, op).await?;
- self.reader = Some(r);
- }
-
- let r = self.reader.as_mut().expect("reader must be valid");
-
- // We should know where to start read the data.
- if self.offset.is_none() {
- (self.offset, self.size) = Self::offset(r, self.op.range()).await?;
- }
-
- r.read(limit).await
- }
}
impl<A, R> oio::BlockingRead for FileReader<A, R>
@@ -270,7 +267,7 @@ where
A: Accessor<BlockingReader = R>,
R: oio::BlockingRead,
{
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
if self.reader.is_none() {
// FileReader doesn't support range, we will always use full range
to open a file.
let op = self.op.clone().with_range(BytesRange::from(..));
@@ -285,25 +282,7 @@ where
(self.offset, self.size) = Self::calculate_offset(r,
self.op.range())?;
}
- let size = if let Some(size) = self.size {
- // Sanity check.
- if self.cur >= size {
- return Ok(0);
- }
- cmp::min(buf.len(), (size - self.cur) as usize)
- } else {
- buf.len()
- };
-
- match r.read(&mut buf[..size]) {
- Ok(0) => Ok(0),
- Ok(n) => {
- self.cur += n as u64;
- Ok(n)
- }
- // We don't need to reset state here since it's ok to poll the
same reader.
- Err(err) => Err(err),
- }
+ r.read(limit)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
@@ -337,52 +316,4 @@ where
self.cur = pos - self.offset.unwrap();
Ok(self.cur)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- if self.reader.is_none() {
- // FileReader doesn't support range, we will always use full range
to open a file.
- let op = self.op.clone().with_range(BytesRange::from(..));
- let (_, r) = match self.acc.blocking_read(&self.path, op) {
- Ok(v) => v,
- Err(err) => return Some(Err(err)),
- };
- self.reader = Some(r);
- }
-
- let r = self.reader.as_mut().expect("reader must be valid");
-
- // We should know where to start read the data.
- if self.offset.is_none() {
- (self.offset, self.size) = match Self::calculate_offset(r,
self.op.range()) {
- Ok(v) => v,
- Err(err) => return Some(Err(err)),
- }
- }
-
- self.buf.reserve();
-
- let mut buf = self.buf.initialized_mut();
- let buf = buf.initialized_mut();
-
- let size = if let Some(size) = self.size {
- // Sanity check.
- if self.cur >= size {
- return None;
- }
- cmp::min(buf.len(), (size - self.cur) as usize)
- } else {
- buf.len()
- };
-
- match r.read(&mut buf[..size]) {
- Ok(0) => None,
- Ok(n) => {
- self.cur += n as u64;
- self.buf.record(n);
- Some(Ok(self.buf.split(n)))
- }
- // We don't need to reset state here since it's ok to poll the
same reader.
- Err(err) => Some(Err(err)),
- }
- }
}
diff --git a/core/src/raw/oio/read/into_streamable_read.rs
b/core/src/raw/oio/read/into_streamable_read.rs
index e234eb654c..6fe21a2c60 100644
--- a/core/src/raw/oio/read/into_streamable_read.rs
+++ b/core/src/raw/oio/read/into_streamable_read.rs
@@ -28,7 +28,6 @@ use crate::*;
pub fn into_streamable_read<R>(r: R, capacity: usize) -> StreamableReader<R> {
StreamableReader {
r,
- cap: capacity,
buf: Vec::with_capacity(capacity),
}
}
@@ -36,7 +35,6 @@ pub fn into_streamable_read<R>(r: R, capacity: usize) ->
StreamableReader<R> {
/// Make given read streamable.
pub struct StreamableReader<R> {
r: R,
- cap: usize,
buf: Vec<u8>,
}
@@ -63,28 +61,13 @@ impl<R: oio::Read> oio::Read for StreamableReader<R> {
}
impl<R: oio::BlockingRead> oio::BlockingRead for StreamableReader<R> {
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.r.read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.r.read(limit)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.r.seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- let dst = self.buf.spare_capacity_mut();
- let mut buf = ReadBuf::uninit(dst);
- unsafe { buf.assume_init(self.cap) };
-
- match self.r.read(buf.initialized_mut()) {
- Err(err) => Some(Err(err)),
- Ok(0) => None,
- Ok(n) => {
- buf.set_filled(n);
- Some(Ok(Bytes::from(buf.filled().to_vec())))
- }
- }
- }
}
#[cfg(test)]
@@ -135,10 +118,13 @@ mod tests {
let r = oio::Cursor::from(content.clone());
let mut s = into_streamable_read(Box::new(r) as oio::BlockingReader,
cap);
- let mut bs = BytesMut::new();
- while let Some(b) = s.next() {
- let b = b.expect("read must success");
- bs.put_slice(&b);
+ let mut bs = BytesMut::with_capacity(size);
+ loop {
+ let buf = s.read(size).expect("read must success");
+ if buf.is_empty() {
+ break;
+ }
+ bs.put_slice(&buf)
}
assert_eq!(bs.freeze().to_vec(), content)
}
diff --git a/core/src/raw/oio/read/lazy_read.rs
b/core/src/raw/oio/read/lazy_read.rs
index 89be1a3b6f..1fd1c71b59 100644
--- a/core/src/raw/oio/read/lazy_read.rs
+++ b/core/src/raw/oio/read/lazy_read.rs
@@ -99,20 +99,11 @@ where
A: Accessor<BlockingReader = R>,
R: oio::BlockingRead,
{
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.blocking_reader()?.read(buf)
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.blocking_reader()?.read(limit)
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
self.blocking_reader()?.seek(pos)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- let r = match self.blocking_reader() {
- Ok(r) => r,
- Err(err) => return Some(Err(err)),
- };
-
- r.next()
- }
}
diff --git a/core/src/raw/oio/read/range_read.rs
b/core/src/raw/oio/read/range_read.rs
index d0551f86d2..56289d5341 100644
--- a/core/src/raw/oio/read/range_read.rs
+++ b/core/src/raw/oio/read/range_read.rs
@@ -252,6 +252,43 @@ where
A: Accessor<Reader = R>,
R: oio::Read,
{
+ async fn read(&mut self, limit: usize) -> Result<Bytes> {
+ // Sanity check for normal cases.
+ if self.cur >= self.size.unwrap_or(u64::MAX) {
+ return Ok(Bytes::new());
+ }
+
+ if self.offset.is_none() {
+ let rp = match self.stat_future().await {
+ Ok(v) => v,
+ Err(err) => return Err(err),
+ };
+ let length = rp.into_metadata().content_length();
+ self.ensure_offset(length)?
+ }
+ if self.reader.is_none() {
+ let (rp, r) = match self.read_future().await {
+ Ok((rp, r)) => (rp, r),
+ Err(err) => return Err(err),
+ };
+
+ self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
+ self.reader = Some(r);
+ }
+
+ let r = self.reader.as_mut().expect("reader must be valid");
+ match r.read(limit).await {
+ Ok(bs) => {
+ self.cur += bs.len() as u64;
+ Ok(bs)
+ }
+ Err(err) => {
+ self.reader = None;
+ Err(err)
+ }
+ }
+ }
+
async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
// There is an optimization here that we can calculate if users trying
to seek
// the same position, for example, `reader.seek(SeekFrom::Current(0))`.
@@ -292,15 +329,21 @@ where
self.cur = seek_pos;
Ok(self.cur)
}
+}
- async fn read(&mut self, limit: usize) -> Result<Bytes> {
+impl<A, R> oio::BlockingRead for RangeReader<A, R>
+where
+ A: Accessor<BlockingReader = R>,
+ R: oio::BlockingRead,
+{
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
// Sanity check for normal cases.
if self.cur >= self.size.unwrap_or(u64::MAX) {
return Ok(Bytes::new());
}
if self.offset.is_none() {
- let rp = match self.stat_future().await {
+ let rp = match self.stat_action() {
Ok(v) => v,
Err(err) => return Err(err),
};
@@ -308,7 +351,7 @@ where
self.ensure_offset(length)?
}
if self.reader.is_none() {
- let (rp, r) = match self.read_future().await {
+ let (rp, r) = match self.read_action() {
Ok((rp, r)) => (rp, r),
Err(err) => return Err(err),
};
@@ -318,7 +361,7 @@ where
}
let r = self.reader.as_mut().expect("reader must be valid");
- match r.read(limit).await {
+ match r.read(limit) {
Ok(bs) => {
self.cur += bs.len() as u64;
Ok(bs)
@@ -329,48 +372,6 @@ where
}
}
}
-}
-
-impl<A, R> oio::BlockingRead for RangeReader<A, R>
-where
- A: Accessor<BlockingReader = R>,
- R: oio::BlockingRead,
-{
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- // Sanity check for normal cases.
- if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
- return Ok(0);
- }
-
- if self.offset.is_none() {
- let rp = self.stat_action()?;
- let length = rp.into_metadata().content_length();
- self.ensure_offset(length)?;
- }
- if self.reader.is_none() {
- let (rp, r) = self.read_action()?;
-
- self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
- self.reader = Some(r);
- }
-
- let r = self.reader.as_mut().expect("reader must be valid");
- match r.read(buf) {
- Ok(0) => {
- // Reset state to Idle after all data has been consumed.
- self.reader = None;
- Ok(0)
- }
- Ok(n) => {
- self.cur += n as u64;
- Ok(n)
- }
- Err(e) => {
- self.reader = None;
- Err(e)
- }
- }
- }
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
// There is an optimization here that we can calculate if users trying
to seek
@@ -412,49 +413,6 @@ where
self.cur = seek_pos;
Ok(self.cur)
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- // Sanity check for normal cases.
- if self.cur >= self.size.unwrap_or(u64::MAX) {
- return None;
- }
-
- if self.offset.is_none() {
- let rp = match self.stat_action() {
- Ok(rp) => rp,
- Err(err) => return Some(Err(err)),
- };
- let length = rp.into_metadata().content_length();
- if let Err(err) = self.ensure_offset(length) {
- return Some(Err(err));
- }
- }
- if self.reader.is_none() {
- let (rp, r) = match self.read_action() {
- Ok((rp, r)) => (rp, r),
- Err(err) => return Some(Err(err)),
- };
-
- self.ensure_size(rp.range().unwrap_or_default().size(), rp.size());
- self.reader = Some(r);
- }
-
- let r = self.reader.as_mut().expect("reader must be valid");
- match r.next() {
- Some(Ok(bs)) => {
- self.cur += bs.len() as u64;
- Some(Ok(bs))
- }
- Some(Err(err)) => {
- self.reader = None;
- Some(Err(err))
- }
- None => {
- self.reader = None;
- None
- }
- }
- }
}
#[cfg(test)]
diff --git a/core/src/raw/oio/read/std_read.rs
b/core/src/raw/oio/read/std_read.rs
index d28f8d5ca3..8726ebf249 100644
--- a/core/src/raw/oio/read/std_read.rs
+++ b/core/src/raw/oio/read/std_read.rs
@@ -15,11 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+use bytes::Bytes;
use std::io::Read;
use std::io::Seek;
use std::io::SeekFrom;
-
-use bytes::Bytes;
+use tokio::io::ReadBuf;
use crate::raw::*;
use crate::*;
@@ -27,12 +27,16 @@ use crate::*;
/// FuturesReader implements [`oio::BlockingRead`] via [`Read`] + [`Seek`].
pub struct StdReader<R: Read + Seek> {
inner: R,
+ buf: Vec<u8>,
}
impl<R: Read + Seek> StdReader<R> {
/// Create a new std reader.
pub fn new(inner: R) -> Self {
- Self { inner }
+ Self {
+ inner,
+ buf: Vec::with_capacity(64 * 1024),
+ }
}
}
@@ -40,12 +44,27 @@ impl<R> oio::BlockingRead for StdReader<R>
where
R: Read + Seek + Send + Sync,
{
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf).map_err(|err| {
+ fn read(&mut self, limit: usize) -> Result<Bytes> {
+ // Make sure buf has enough space.
+ if self.buf.capacity() < limit {
+ self.buf.reserve(limit);
+ }
+ let buf = self.buf.spare_capacity_mut();
+ let mut read_buf: ReadBuf = ReadBuf::uninit(buf);
+
+ // SAFETY: Read at most `size` bytes into `read_buf`.
+ unsafe {
+ read_buf.assume_init(limit);
+ }
+
+ let n = self.inner.read(read_buf.initialized_mut()).map_err(|err| {
new_std_io_error(err)
- .with_operation(oio::ReadOperation::BlockingRead)
- .with_context("source", "StdReader")
- })
+ .with_operation(oio::ReadOperation::Read)
+ .with_context("source", "TokioReader")
+ })?;
+ read_buf.set_filled(n);
+
+ Ok(Bytes::copy_from_slice(read_buf.filled()))
}
fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
@@ -55,11 +74,4 @@ where
.with_context("source", "StdReader")
})
}
-
- fn next(&mut self) -> Option<Result<Bytes>> {
- Some(Err(Error::new(
- ErrorKind::Unsupported,
- "StdReader doesn't support poll_next",
- )))
- }
}
diff --git a/core/src/raw/oio/read/tokio_read.rs
b/core/src/raw/oio/read/tokio_read.rs
index 6dea5cec39..f89e2ec5f3 100644
--- a/core/src/raw/oio/read/tokio_read.rs
+++ b/core/src/raw/oio/read/tokio_read.rs
@@ -47,14 +47,6 @@ impl<R> oio::Read for TokioReader<R>
where
R: AsyncRead + AsyncSeek + Unpin + Send + Sync,
{
- async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
- self.inner.seek(pos).await.map_err(|err| {
- new_std_io_error(err)
- .with_operation(oio::ReadOperation::Seek)
- .with_context("source", "TokioReader")
- })
- }
-
async fn read(&mut self, limit: usize) -> Result<Bytes> {
// Make sure buf has enough space.
if self.buf.capacity() < limit {
@@ -81,4 +73,12 @@ where
Ok(Bytes::copy_from_slice(read_buf.filled()))
}
+
+ async fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+ self.inner.seek(pos).await.map_err(|err| {
+ new_std_io_error(err)
+ .with_operation(oio::ReadOperation::Seek)
+ .with_context("source", "TokioReader")
+ })
+ }
}
diff --git a/core/src/raw/tests/read.rs b/core/src/raw/tests/read.rs
index 49b44b3682..b2503901a3 100644
--- a/core/src/raw/tests/read.rs
+++ b/core/src/raw/tests/read.rs
@@ -179,16 +179,11 @@ impl ReadChecker {
for action in actions {
match action {
ReadAction::Read(size) => {
- use oio::BlockingRead;
-
- let mut buf = vec![0; *size];
- let n = r.read(&mut buf).expect("read must success");
- self.check_read(*size, &buf[..n]);
+ let bs = r.read(*size).expect("read must success");
+ self.check_read(*size, &bs);
}
ReadAction::Seek(pos) => {
- use oio::BlockingRead;
-
let res = r.seek(*pos);
self.check_seek(*pos, res);
}
diff --git a/core/src/types/operator/blocking_operator.rs
b/core/src/types/operator/blocking_operator.rs
index 9cb46efa30..662f3d770a 100644
--- a/core/src/types/operator/blocking_operator.rs
+++ b/core/src/types/operator/blocking_operator.rs
@@ -18,7 +18,6 @@
use bytes::Bytes;
use super::operator_functions::*;
-use crate::raw::oio::BlockingRead;
use crate::raw::oio::WriteBuf;
use crate::raw::*;
use crate::*;
@@ -410,9 +409,10 @@ impl BlockingOperator {
(range.size().unwrap(), range)
};
- let (_, mut s) = inner.blocking_read(&path,
args.with_range(range))?;
+ let (_, r) = inner.blocking_read(&path,
args.with_range(range))?;
+ let mut r = BlockingReader::new(r);
let mut buf = Vec::with_capacity(size_hint as usize);
- s.read_to_end(&mut buf)?;
+ r.read_to_end(&mut buf)?;
Ok(buf)
},
diff --git a/core/src/types/reader.rs b/core/src/types/reader.rs
index 2af5ca5144..8db4e9f9f4 100644
--- a/core/src/types/reader.rs
+++ b/core/src/types/reader.rs
@@ -26,6 +26,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use futures::Stream;
use tokio::io::ReadBuf;
+use crate::raw::oio::BlockingRead;
use crate::raw::*;
use crate::*;
@@ -441,33 +442,107 @@ impl BlockingReader {
}
/// Create a new reader from an `oio::BlockingReader`.
- #[cfg(test)]
pub(crate) fn new(r: oio::BlockingReader) -> Self {
BlockingReader { inner: r }
}
-}
-impl oio::BlockingRead for BlockingReader {
+ /// Seek to the position of `pos` of reader.
#[inline]
- fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
- self.inner.read(buf)
+ pub fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
+ self.inner.seek(pos)
}
+ /// Read at most `size` bytes of data from reader.
#[inline]
- fn seek(&mut self, pos: io::SeekFrom) -> Result<u64> {
- self.inner.seek(pos)
+ pub fn read(&mut self, limit: usize) -> Result<Bytes> {
+ self.inner.read(limit)
}
- #[inline]
- fn next(&mut self) -> Option<Result<Bytes>> {
- oio::BlockingRead::next(&mut self.inner)
+ /// Read exact `size` bytes of data from reader.
+ pub fn read_exact(&mut self, size: usize) -> Result<Bytes> {
+ // Lucky path.
+ let bs1 = self.inner.read(size)?;
+ debug_assert!(
+ bs1.len() <= size,
+ "read should not return more bytes than expected"
+ );
+ if bs1.len() == size {
+ return Ok(bs1);
+ }
+ if bs1.is_empty() {
+ return Err(
+ Error::new(ErrorKind::ContentIncomplete, "reader got too
little data")
+ .with_context("expect", size.to_string()),
+ );
+ }
+
+ let mut bs = BytesMut::with_capacity(size);
+ bs.put_slice(&bs1);
+
+ let mut remaining = size - bs.len();
+
+ loop {
+ let tmp = self.inner.read(remaining)?;
+ if tmp.is_empty() {
+ return Err(
+ Error::new(ErrorKind::ContentIncomplete, "reader got too
little data")
+ .with_context("expect", size.to_string())
+ .with_context("actual", bs.len().to_string()),
+ );
+ }
+ bs.put_slice(&tmp);
+ debug_assert!(
+ tmp.len() <= remaining,
+ "read should not return more bytes than expected"
+ );
+
+ remaining -= tmp.len();
+ if remaining == 0 {
+ break;
+ }
+ }
+
+ Ok(bs.freeze())
+ }
+ /// Reads all bytes until EOF in this source, placing them into buf.
+ pub fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<usize> {
+ let start_len = buf.len();
+
+ loop {
+ if buf.len() == buf.capacity() {
+ buf.reserve(32); // buf is full, need more space
+ }
+
+ let spare = buf.spare_capacity_mut();
+ let mut read_buf: ReadBuf = ReadBuf::uninit(spare);
+
+ // SAFETY: These bytes were initialized but not filled in the
previous loop
+ unsafe {
+ read_buf.assume_init(read_buf.capacity());
+ }
+
+ match self.read(read_buf.initialized_mut().len()) {
+ Ok(bs) if bs.is_empty() => return Ok(buf.len() - start_len),
+ Ok(bs) => {
+
read_buf.initialized_mut()[..bs.len()].copy_from_slice(&bs);
+
+ // SAFETY: Read API makes sure that returning `n` is
correct.
+ unsafe {
+ buf.set_len(buf.len() + bs.len());
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ }
}
}
impl io::Read for BlockingReader {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- self.inner.read(buf).map_err(format_std_io_error)
+ let bs = self.inner.read(buf.len()).map_err(format_std_io_error)?;
+ buf[..bs.len()].copy_from_slice(&bs);
+ Ok(bs.len())
}
}
@@ -483,9 +558,15 @@ impl Iterator for BlockingReader {
#[inline]
fn next(&mut self) -> Option<Self::Item> {
- self.inner
- .next()
- .map(|v| v.map_err(|err|
io::Error::new(io::ErrorKind::Interrupted, err)))
+ match self
+ .inner
+ .read(4 * 1024 * 1024)
+ .map_err(format_std_io_error)
+ {
+ Ok(bs) if bs.is_empty() => None,
+ Ok(bs) => Some(Ok(bs)),
+ Err(err) => Some(Err(err)),
+ }
}
}