This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new 72cd136 Minimize futures dependency into relevant sub-crates (#646)
72cd136 is described below
commit 72cd13608ba9c2c566c609c377bd627281f83501
Author: Adam Gutglick <[email protected]>
AuthorDate: Thu Feb 19 16:30:28 2026 +0000
Minimize futures dependency into relevant sub-crates (#646)
* Minimize futures dependency into relevant sub-crates
* Fix docs
---
Cargo.toml | 5 ++++-
src/aws/mod.rs | 8 ++++----
src/azure/credential.rs | 2 +-
src/azure/mod.rs | 6 +++---
src/buffered.rs | 4 ++--
src/chunked.rs | 8 ++++----
src/client/get.rs | 8 ++++----
src/client/http/body.rs | 10 +++++-----
src/client/http/connection.rs | 6 ++----
src/client/http/spawn.rs | 2 +-
src/client/list.rs | 8 ++++----
src/client/mock_server.rs | 4 ++--
src/client/pagination.rs | 4 ++--
src/client/retry.rs | 2 +-
src/delimited.rs | 11 ++++++-----
src/gcp/credential.rs | 2 +-
src/gcp/mod.rs | 2 +-
src/http/mod.rs | 8 ++++----
src/integration.rs | 10 +++++-----
src/lib.rs | 18 +++++++++---------
src/limit.rs | 4 ++--
src/local.rs | 18 +++++++++---------
src/memory.rs | 8 ++++----
src/prefix.rs | 2 +-
src/throttle.rs | 12 ++++++------
src/upload.rs | 16 ++++++++--------
src/util.rs | 6 +++---
tests/get_range_file.rs | 2 +-
28 files changed, 99 insertions(+), 97 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 5e9f046..81cdf66 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,8 @@ all-features = true
async-trait = "0.1.53"
bytes = "1.0"
chrono = { version = "0.4.34", default-features = false, features = ["clock"] }
-futures = "0.3"
+futures-core = "0.3"
+futures-util = { version = "0.3", features = ["sink"] }
http = "1.2.0"
humantime = "2.1"
itertools = "0.14.0"
@@ -70,6 +71,7 @@ nix = { version = "0.31.1", features = ["fs"] }
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
web-time = { version = "1.1.0" }
wasm-bindgen-futures = "0.4.18"
+futures-channel = {version = "0.3", features = ["sink"]}
[features]
default = ["fs"]
@@ -84,6 +86,7 @@ integration = ["rand", "tokio"]
tokio = ["dep:tokio", "dep:tracing"]
[dev-dependencies] # In alphabetical order
+futures-executor = "0.3"
hyper = { version = "1.2", features = ["server"] }
hyper-util = "0.1"
rand = "0.10"
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 7c8dcf3..0a1c233 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -29,8 +29,8 @@
//! [automatic cleanup]:
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
use async_trait::async_trait;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::BoxStream;
+use futures_util::{StreamExt, TryStreamExt};
use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
use reqwest::{Method, StatusCode};
use std::{sync::Arc, time::Duration};
@@ -275,7 +275,7 @@ impl ObjectStore for AmazonS3 {
client
.bulk_delete_request(locations)
.await
- .map(futures::stream::iter)
+ .map(futures_util::stream::iter)
}
})
.buffered(20)
@@ -298,7 +298,7 @@ impl ObjectStore for AmazonS3 {
return self
.client
.list(prefix)
- .try_filter(move |f| futures::future::ready(f.location >
offset))
+ .try_filter(move |f| futures_util::future::ready(f.location >
offset))
.boxed();
}
diff --git a/src/azure/credential.rs b/src/azure/credential.rs
index 66ce655..1fbb101 100644
--- a/src/azure/credential.rs
+++ b/src/azure/credential.rs
@@ -1064,7 +1064,7 @@ impl CredentialProvider for AzureCliCredential {
#[cfg(test)]
mod tests {
- use futures::executor::block_on;
+ use futures_executor::block_on;
use http::{Response, StatusCode};
use http_body_util::BodyExt;
use reqwest::{Client, Method};
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 8a2e5d1..4460040 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -32,7 +32,7 @@ use crate::{
signer::Signer,
};
use async_trait::async_trait;
-use futures::stream::{BoxStream, StreamExt, TryStreamExt};
+use futures_util::stream::{BoxStream, StreamExt, TryStreamExt};
use reqwest::Method;
use std::fmt::Debug;
use std::sync::Arc;
@@ -134,7 +134,7 @@ impl ObjectStore for MicrosoftAzure {
// See
https://github.com/Azure/Azurite/issues/2619#issuecomment-3660701055
let offset = offset.clone();
self.list(prefix)
- .try_filter(move |f| futures::future::ready(f.location >
offset))
+ .try_filter(move |f| futures_util::future::ready(f.location >
offset))
.boxed()
} else {
self.client.list_with_offset(prefix, offset)
@@ -157,7 +157,7 @@ impl ObjectStore for MicrosoftAzure {
client
.bulk_delete_request(locations)
.await
- .map(futures::stream::iter)
+ .map(futures_util::stream::iter)
}
})
.buffered(20)
diff --git a/src/buffered.rs b/src/buffered.rs
index d22ecda..5a95343 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -23,8 +23,8 @@ use crate::{
PutOptions, PutPayloadMut, TagSet, WriteMultipart,
};
use bytes::Bytes;
-use futures::future::{BoxFuture, FutureExt};
-use futures::ready;
+use futures_util::future::{BoxFuture, FutureExt};
+use futures_util::ready;
use std::cmp::Ordering;
use std::io::{Error, ErrorKind, SeekFrom};
use std::pin::Pin;
diff --git a/src/chunked.rs b/src/chunked.rs
index b362366..870540a 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -23,8 +23,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut};
-use futures::StreamExt;
-use futures::stream::BoxStream;
+use futures_util::StreamExt;
+use futures_util::stream::BoxStream;
use crate::path::Path;
use crate::{
@@ -89,7 +89,7 @@ impl ObjectStore for ChunkedStore {
}
GetResultPayload::Stream(stream) => {
let buffer = BytesMut::new();
- futures::stream::unfold(
+ futures_util::stream::unfold(
(stream, buffer, false, self.chunk_size),
|(mut stream, mut buffer, mut exhausted, chunk_size)|
async move {
// Keep accumulating bytes until we reach capacity as
long as
@@ -173,7 +173,7 @@ impl ObjectStore for ChunkedStore {
#[cfg(test)]
mod tests {
- use futures::StreamExt;
+ use futures_util::StreamExt;
use crate::ObjectStoreExt;
#[cfg(feature = "fs")]
diff --git a/src/client/get.rs b/src/client/get.rs
index 5a5ec37..80b3f15 100644
--- a/src/client/get.rs
+++ b/src/client/get.rs
@@ -25,8 +25,8 @@ use crate::{
};
use async_trait::async_trait;
use bytes::Bytes;
-use futures::StreamExt;
-use futures::stream::BoxStream;
+use futures_util::StreamExt;
+use futures_util::stream::BoxStream;
use http::StatusCode;
use http::header::{
CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_RANGE,
@@ -201,7 +201,7 @@ impl<T: GetClient> GetContext<T> {
etag: Option<String>,
range: Range<u64>,
) -> BoxStream<'static, Result<Bytes>> {
- futures::stream::try_unfold(
+ futures_util::stream::try_unfold(
(self, body, etag, range),
|(mut ctx, mut body, etag, mut range)| async move {
while let Some(ret) = body.frame().await {
@@ -489,7 +489,7 @@ mod http_tests {
use crate::path::Path;
use crate::{ClientOptions, ObjectStoreExt, RetryConfig};
use bytes::Bytes;
- use futures::FutureExt;
+ use futures_util::FutureExt;
use http::header::{CONNECTION, CONTENT_LENGTH, CONTENT_RANGE, ETAG, RANGE};
use http::{Response, StatusCode};
use hyper::body::Frame;
diff --git a/src/client/http/body.rs b/src/client/http/body.rs
index f59da97..e22ccea 100644
--- a/src/client/http/body.rs
+++ b/src/client/http/body.rs
@@ -18,8 +18,8 @@
use crate::client::{HttpError, HttpErrorKind};
use crate::{PutPayload, collect_bytes};
use bytes::Bytes;
-use futures::StreamExt;
-use futures::stream::BoxStream;
+use futures_util::StreamExt;
+use futures_util::stream::BoxStream;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body, Frame, SizeHint};
@@ -43,9 +43,9 @@ impl HttpRequestBody {
pub(crate) fn into_reqwest(self) -> reqwest::Body {
match self.0 {
Inner::Bytes(b) => b.into(),
- Inner::PutPayload(_, payload) =>
reqwest::Body::wrap_stream(futures::stream::iter(
- payload.into_iter().map(Ok::<_, HttpError>),
- )),
+ Inner::PutPayload(_, payload) => reqwest::Body::wrap_stream(
+ futures_util::stream::iter(payload.into_iter().map(Ok::<_,
HttpError>)),
+ ),
}
}
diff --git a/src/client/http/connection.rs b/src/client/http/connection.rs
index ae474ce..69c8436 100644
--- a/src/client/http/connection.rs
+++ b/src/client/http/connection.rs
@@ -231,10 +231,8 @@ impl HttpService for reqwest::Client {
#[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
impl HttpService for reqwest::Client {
async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
- use futures::{
- SinkExt, StreamExt, TryStreamExt,
- channel::{mpsc, oneshot},
- };
+ use futures_channel::{mpsc, oneshot};
+ use futures_util::{SinkExt, StreamExt, TryStreamExt};
use http_body_util::{Empty, StreamBody};
use wasm_bindgen_futures::spawn_local;
diff --git a/src/client/http/spawn.rs b/src/client/http/spawn.rs
index 5450b63..80f3a87 100644
--- a/src/client/http/spawn.rs
+++ b/src/client/http/spawn.rs
@@ -142,7 +142,7 @@ mod tests {
let url = mock.url().to_string();
let thread = std::thread::spawn(|| {
- futures::executor::block_on(async move {
+ futures_executor::block_on(async move {
let retry = RetryConfig::default();
let ret = client.get(url).send_retry(&retry).await.unwrap();
let payload = ret.into_body().bytes().await.unwrap();
diff --git a/src/client/list.rs b/src/client/list.rs
index b7192bf..6ad87db 100644
--- a/src/client/list.rs
+++ b/src/client/list.rs
@@ -21,8 +21,8 @@ use crate::list::{PaginatedListOptions, PaginatedListResult};
use crate::path::{DELIMITER, Path};
use crate::{ListResult, ObjectMeta};
use async_trait::async_trait;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::BoxStream;
+use futures_util::{StreamExt, TryStreamExt};
use std::borrow::Cow;
use std::collections::BTreeSet;
@@ -93,7 +93,7 @@ impl<T: ListClient + Clone> ListClientExt for T {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
self.list_paginated(prefix, false, None)
- .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+ .map_ok(|r|
futures_util::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed()
}
@@ -104,7 +104,7 @@ impl<T: ListClient + Clone> ListClientExt for T {
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_paginated(prefix, false, Some(offset))
- .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+ .map_ok(|r|
futures_util::stream::iter(r.objects.into_iter().map(Ok)))
.try_flatten()
.boxed()
}
diff --git a/src/client/mock_server.rs b/src/client/mock_server.rs
index c27fc9f..0d0703d 100644
--- a/src/client/mock_server.rs
+++ b/src/client/mock_server.rs
@@ -16,8 +16,8 @@
// under the License.
use crate::client::{HttpResponse, HttpResponseBody};
-use futures::FutureExt;
-use futures::future::BoxFuture;
+use futures_util::FutureExt;
+use futures_util::future::BoxFuture;
use hyper::body::Incoming;
use hyper::server::conn::http1;
use hyper::service::service_fn;
diff --git a/src/client/pagination.rs b/src/client/pagination.rs
index d789c74..3d6a74a 100644
--- a/src/client/pagination.rs
+++ b/src/client/pagination.rs
@@ -16,7 +16,7 @@
// under the License.
use crate::Result;
-use futures::Stream;
+use futures_util::Stream;
use std::future::Future;
/// Takes a paginated operation `op` that when called with:
@@ -51,7 +51,7 @@ where
Done,
}
- futures::stream::unfold(PaginationState::Start(state), move |state| {
+ futures_util::stream::unfold(PaginationState::Start(state), move |state| {
let client = client.clone();
async move {
let (s, page_token) = match state {
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 5437896..b0e6b0c 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -21,7 +21,7 @@ use crate::PutPayload;
use crate::client::backoff::{Backoff, BackoffConfig};
use crate::client::builder::HttpRequestBuilder;
use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest,
HttpResponse};
-use futures::future::BoxFuture;
+use futures_util::future::BoxFuture;
use http::{Method, Uri};
use reqwest::StatusCode;
use reqwest::header::LOCATION;
diff --git a/src/delimited.rs b/src/delimited.rs
index 0994b4f..b9f8842 100644
--- a/src/delimited.rs
+++ b/src/delimited.rs
@@ -20,7 +20,7 @@
use std::collections::VecDeque;
use bytes::Bytes;
-use futures::{Stream, StreamExt};
+use futures_util::{Stream, StreamExt};
use super::Result;
@@ -155,7 +155,7 @@ where
{
let delimiter = LineDelimiter::new();
- futures::stream::unfold(
+ futures_util::stream::unfold(
(s, delimiter, false),
|(mut s, mut delimiter, mut exhausted)| async move {
loop {
@@ -184,7 +184,7 @@ where
#[cfg(test)]
mod tests {
- use futures::stream::{BoxStream, TryStreamExt};
+ use futures_util::stream::{BoxStream, TryStreamExt};
use super::*;
@@ -231,7 +231,8 @@ mod tests {
#[tokio::test]
async fn test_delimiter_stream() {
let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
- let input_stream = futures::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
+ let input_stream =
+ futures_util::stream::iter(input.into_iter().map(|s|
Ok(Bytes::from(s))));
let stream = newline_delimited_stream(input_stream);
let results: Vec<_> = stream.try_collect().await.unwrap();
@@ -246,7 +247,7 @@ mod tests {
}
#[tokio::test]
async fn test_delimiter_unfold_stream() {
- let input_stream: BoxStream<'static, Result<Bytes>> =
futures::stream::unfold(
+ let input_stream: BoxStream<'static, Result<Bytes>> =
futures_util::stream::unfold(
VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
|mut input| async move {
if !input.is_empty() {
diff --git a/src/gcp/credential.rs b/src/gcp/credential.rs
index 75de68c..dfba358 100644
--- a/src/gcp/credential.rs
+++ b/src/gcp/credential.rs
@@ -27,7 +27,7 @@ use async_trait::async_trait;
use base64::Engine;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use chrono::{DateTime, Utc};
-use futures::TryFutureExt;
+use futures_util::TryFutureExt;
use http::{HeaderMap, Method};
use itertools::Itertools;
use percent_encoding::utf8_percent_encode;
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 4e1679e..e1157a8 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -48,7 +48,7 @@ use crate::{
};
use async_trait::async_trait;
use client::GoogleCloudStorageClient;
-use futures::stream::{BoxStream, StreamExt};
+use futures_util::stream::{BoxStream, StreamExt};
use http::Method;
use url::Url;
diff --git a/src/http/mod.rs b/src/http/mod.rs
index 9e5af9a..4d4081b 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -34,8 +34,8 @@
use std::sync::Arc;
use async_trait::async_trait;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::BoxStream;
+use futures_util::{StreamExt, TryStreamExt};
use itertools::Itertools;
use url::Url;
@@ -160,7 +160,7 @@ impl ObjectStore for HttpStore {
let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
let prefix = prefix.cloned();
let client = Arc::clone(&self.client);
- futures::stream::once(async move {
+ futures_util::stream::once(async move {
let status = client.list(prefix.as_ref(), "infinity").await?;
let iter = status
@@ -174,7 +174,7 @@ impl ObjectStore for HttpStore {
// Filter out exact prefix matches
.filter_ok(move |r| r.location.as_ref().len() > prefix_len);
- Ok::<_, crate::Error>(futures::stream::iter(iter))
+ Ok::<_, crate::Error>(futures_util::stream::iter(iter))
})
.try_flatten()
.boxed()
diff --git a/src/integration.rs b/src/integration.rs
index 731f7d2..352bd4b 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -32,8 +32,8 @@ use crate::{
ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion,
WriteMultipart,
};
use bytes::Bytes;
-use futures::stream::FuturesUnordered;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::FuturesUnordered;
+use futures_util::{StreamExt, TryStreamExt};
use rand::{RngExt, rng};
use std::collections::HashSet;
use std::slice;
@@ -411,7 +411,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
storage.put(&paths[4], "foo".into()).await.unwrap();
let out_paths = storage
- .delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
+
.delete_stream(futures_util::stream::iter(paths.clone()).map(Ok).boxed())
.collect::<Vec<_>>()
.await;
@@ -713,7 +713,7 @@ pub async fn stream_get(storage: &DynObjectStore) {
let bytes_expected = data.concat();
let mut upload = storage.put_multipart(&location).await.unwrap();
let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
- futures::future::try_join_all(uploads).await.unwrap();
+ futures_util::future::try_join_all(uploads).await.unwrap();
// Object should not yet exist in store
let meta_res = storage.head(&location).await;
@@ -1020,7 +1020,7 @@ pub async fn multipart(storage: &dyn ObjectStore,
multipart: &dyn MultipartStore
let id = multipart.create_multipart(&path).await.unwrap();
- let parts: Vec<_> = futures::stream::iter(chunks)
+ let parts: Vec<_> = futures_util::stream::iter(chunks)
.enumerate()
.map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
.buffered(2)
diff --git a/src/lib.rs b/src/lib.rs
index 288d169..8124d8f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -174,7 +174,7 @@
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
//! # use object_store::{path::Path, ObjectStore};
-//! # use futures::stream::StreamExt;
+//! # use futures_util::stream::StreamExt;
//! # // use LocalFileSystem for example
//! # fn get_object_store() -> Arc<dyn ObjectStore> {
//! # Arc::new(LocalFileSystem::new())
@@ -215,7 +215,7 @@
//! from remote storage or files in the local filesystem as a stream.
//!
//! ```ignore-wasm32
-//! # use futures::TryStreamExt;
+//! # use futures_util::TryStreamExt;
//! # use object_store::local::LocalFileSystem;
//! # use std::sync::Arc;
//! # use bytes::Bytes;
@@ -610,7 +610,7 @@ use crate::util::maybe_spawn_blocking;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
-use futures::{StreamExt, TryStreamExt, stream::BoxStream};
+use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
use std::fmt::{Debug, Formatter};
use std::ops::Range;
use std::sync::Arc;
@@ -931,14 +931,14 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// return Ok. If it is an error, it will be [`Error::NotFound`].
///
/// ```ignore-wasm32
- /// # use futures::{StreamExt, TryStreamExt};
+ /// # use futures_util::{StreamExt, TryStreamExt};
/// # use object_store::local::LocalFileSystem;
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let root = tempfile::TempDir::new().unwrap();
/// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
/// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
/// # use object_store::path::Path;
- /// # use futures::{StreamExt, TryStreamExt};
+ /// # use futures_util::{StreamExt, TryStreamExt};
/// #
/// // Create two objects
/// store.put(&Path::from("foo"), "foo".into()).await?;
@@ -964,7 +964,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
///
/// ```
/// # use async_trait::async_trait;
- /// # use futures::stream::{BoxStream, StreamExt};
+ /// # use futures_util::stream::{BoxStream, StreamExt};
/// # use object_store::path::Path;
/// # use object_store::{
/// # CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
@@ -1049,7 +1049,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// #
/// # async fn example() {
/// # let store = ExampleStore { client: Arc::new(ExampleClient) };
- /// # let paths = futures::stream::iter(vec![Ok(Path::from("foo")),
Ok(Path::from("bar"))]).boxed();
+ /// # let paths =
futures_util::stream::iter(vec![Ok(Path::from("foo")),
Ok(Path::from("bar"))]).boxed();
/// # let results =
store.delete_stream(paths).collect::<Vec<_>>().await;
/// # assert_eq!(results.len(), 2);
/// # assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
@@ -1091,7 +1091,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
) -> BoxStream<'static, Result<ObjectMeta>> {
let offset = offset.clone();
self.list(prefix)
- .try_filter(move |f| futures::future::ready(f.location > offset))
+ .try_filter(move |f| futures_util::future::ready(f.location >
offset))
.boxed()
}
@@ -1364,7 +1364,7 @@ where
async fn delete(&self, location: &Path) -> Result<()> {
let location = location.clone();
let mut stream =
- self.delete_stream(futures::stream::once(async move { Ok(location)
}).boxed());
+ self.delete_stream(futures_util::stream::once(async move {
Ok(location) }).boxed());
let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
store: "ext",
source: "`delete_stream` with one location should yield once but
didn't".into(),
diff --git a/src/limit.rs b/src/limit.rs
index 30fe2b6..fa29d1b 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -24,7 +24,7 @@ use crate::{
};
use async_trait::async_trait;
use bytes::Bytes;
-use futures::{FutureExt, Stream};
+use futures_util::{FutureExt, Stream};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
@@ -245,7 +245,7 @@ mod tests {
use crate::integration::*;
use crate::limit::LimitStore;
use crate::memory::InMemory;
- use futures::stream::StreamExt;
+ use futures_util::stream::StreamExt;
use std::pin::Pin;
use std::time::Duration;
use tokio::time::timeout;
diff --git a/src/local.rs b/src/local.rs
index 907b589..98dda95 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -31,8 +31,8 @@ use std::{collections::VecDeque, path::PathBuf};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
-use futures::{FutureExt, TryStreamExt};
-use futures::{StreamExt, stream::BoxStream};
+use futures_util::{FutureExt, TryStreamExt};
+use futures_util::{StreamExt, stream::BoxStream};
use parking_lot::Mutex;
use url::Url;
use walkdir::{DirEntry, WalkDir};
@@ -709,7 +709,7 @@ impl LocalFileSystem {
let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
- Err(e) => return
futures::future::ready(Err(e)).into_stream().boxed(),
+ Err(e) => return
futures_util::future::ready(Err(e)).into_stream().boxed(),
},
None => config.root.to_file_path().unwrap(),
};
@@ -754,14 +754,14 @@ impl LocalFileSystem {
// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
- return futures::stream::iter(s).boxed();
+ return futures_util::stream::iter(s).boxed();
}
// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;
let buffer = VecDeque::with_capacity(CHUNK_SIZE);
- futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async
move {
+ futures_util::stream::try_unfold((s, buffer), |(mut s, mut buffer)|
async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
@@ -929,7 +929,7 @@ pub(crate) fn chunked_stream(
range: Range<u64>,
chunk_size: usize,
) -> BoxStream<'static, Result<Bytes, super::Error>> {
- futures::stream::once(async move {
+ futures_util::stream::once(async move {
let requested = range.end - range.start;
let (file, path) = maybe_spawn_blocking(move || {
@@ -939,7 +939,7 @@ pub(crate) fn chunked_stream(
})
.await?;
- let stream = futures::stream::try_unfold(
+ let stream = futures_util::stream::try_unfold(
(file, path, requested),
move |(mut file, path, remaining)| {
maybe_spawn_blocking(move || {
@@ -1254,7 +1254,7 @@ fn convert_walkdir_result(
mod tests {
use std::fs;
- use futures::TryStreamExt;
+ use futures_util::TryStreamExt;
use tempfile::TempDir;
#[cfg(target_family = "unix")]
@@ -1287,7 +1287,7 @@ mod tests {
fn test_non_tokio() {
let root = TempDir::new().unwrap();
let integration =
LocalFileSystem::new_with_prefix(root.path()).unwrap();
- futures::executor::block_on(async move {
+ futures_executor::block_on(async move {
put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
diff --git a/src/memory.rs b/src/memory.rs
index d6c7c0c..1af2584 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
-use futures::{StreamExt, stream::BoxStream};
+use futures_util::{StreamExt, stream::BoxStream};
use parking_lot::RwLock;
use crate::multipart::{MultipartStore, PartId};
@@ -259,7 +259,7 @@ impl ObjectStore for InMemory {
}
None => (0..entry.data.len() as u64, entry.data),
};
- let stream = futures::stream::once(futures::future::ready(Ok(data)));
+ let stream =
futures_util::stream::once(futures_util::future::ready(Ok(data)));
Ok(GetResult {
payload: GetResultPayload::Stream(stream.boxed()),
@@ -334,7 +334,7 @@ impl ObjectStore for InMemory {
})
.collect();
- futures::stream::iter(values).boxed()
+ futures_util::stream::iter(values).boxed()
}
/// The memory implementation returns all results, as opposed to the cloud
@@ -512,7 +512,7 @@ struct InMemoryUpload {
impl MultipartUpload for InMemoryUpload {
fn put_part(&mut self, payload: PutPayload) -> UploadPart {
self.parts.push(payload);
- Box::pin(futures::future::ready(Ok(())))
+ Box::pin(futures_util::future::ready(Ok(())))
}
async fn complete(&mut self) -> Result<PutResult> {
diff --git a/src/prefix.rs b/src/prefix.rs
index 67456fd..2850c52 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -17,7 +17,7 @@
//! An object store wrapper handling a constant path prefix
use bytes::Bytes;
-use futures::{StreamExt, TryStreamExt, stream::BoxStream};
+use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
use std::ops::Range;
use crate::multipart::{MultipartStore, PartId};
diff --git a/src/throttle.rs b/src/throttle.rs
index 1fc90d7..fb668f0 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -28,7 +28,7 @@ use crate::{
};
use async_trait::async_trait;
use bytes::Bytes;
-use futures::{FutureExt, StreamExt, stream::BoxStream};
+use futures_util::{FutureExt, StreamExt, stream::BoxStream};
use std::time::Duration;
/// Configuration settings for throttled store
@@ -212,7 +212,7 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
fn list(&self, prefix: Option<&Path>) -> BoxStream<'static,
Result<ObjectMeta>> {
let stream = self.inner.list(prefix);
let config = Arc::clone(&self.config);
- futures::stream::once(async move {
+ futures_util::stream::once(async move {
let config = *config.lock();
let wait_list_per_entry = config.wait_list_per_entry;
sleep(config.wait_list_per_call).await;
@@ -229,7 +229,7 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
) -> BoxStream<'static, Result<ObjectMeta>> {
let stream = self.inner.list_with_offset(prefix, offset);
let config = Arc::clone(&self.config);
- futures::stream::once(async move {
+ futures_util::stream::once(async move {
let config = *config.lock();
let wait_list_per_entry = config.wait_list_per_entry;
sleep(config.wait_list_per_call).await;
@@ -299,7 +299,7 @@ where
stream
.then(move |result| {
let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
- sleep(delay).then(|_| futures::future::ready(result))
+ sleep(delay).then(|_| futures_util::future::ready(result))
})
.boxed()
}
@@ -368,7 +368,7 @@ mod tests {
use crate::GetResultPayload;
use crate::ObjectStoreExt;
use crate::{integration::*, memory::InMemory};
- use futures::TryStreamExt;
+ use futures_util::TryStreamExt;
use tokio::time::Duration;
use tokio::time::Instant;
@@ -580,7 +580,7 @@ mod tests {
// materialize the paths so that the throttle time for listing is not
counted
let paths = store.list(Some(&prefix)).collect::<Vec<_>>().await;
- let paths = futures::stream::iter(paths)
+ let paths = futures_util::stream::iter(paths)
.map(|x| x.map(|m| m.location))
.boxed();
diff --git a/src/upload.rs b/src/upload.rs
index d7b760d..01e6b28 100644
--- a/src/upload.rs
+++ b/src/upload.rs
@@ -22,7 +22,7 @@ use std::task::{Context, Poll};
use crate::PutPayloadMut;
use crate::{PutPayload, PutResult, Result};
use async_trait::async_trait;
-use futures::future::BoxFuture;
+use futures_util::future::BoxFuture;
#[cfg(feature = "tokio")]
use tokio::task::JoinSet;
@@ -52,7 +52,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
/// returned futures in parallel
///
/// ```no_run
- /// # use futures::StreamExt;
+ /// # use futures_util::StreamExt;
/// # use object_store::MultipartUpload;
/// #
/// # async fn test() {
@@ -60,7 +60,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
/// let mut upload: Box<&dyn MultipartUpload> = todo!();
/// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
/// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
- /// futures::future::try_join(p1, p2).await.unwrap();
+ /// futures_util::future::try_join(p1, p2).await.unwrap();
/// upload.complete().await.unwrap();
/// # }
/// ```
@@ -117,7 +117,7 @@ impl<W: MultipartUpload + ?Sized> MultipartUpload for
Box<W> {
/// allowing back pressure on producers, prior to buffering the next part.
However, unlike
/// [`Sink`] this back pressure is optional, allowing integration with
synchronous producers
///
-/// [`Sink`]: futures::sink::Sink
+/// [`Sink`]: futures_util::sink::Sink
#[cfg(feature = "tokio")]
#[derive(Debug)]
pub struct WriteMultipart {
@@ -156,7 +156,7 @@ impl WriteMultipart {
max_concurrency: usize,
) -> Poll<Result<()>> {
while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
- futures::ready!(self.tasks.poll_join_next(cx)).unwrap()??
+ futures_core::ready!(self.tasks.poll_join_next(cx)).unwrap()??
}
Poll::Ready(Ok(()))
}
@@ -165,7 +165,7 @@ impl WriteMultipart {
///
/// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
pub async fn wait_for_capacity(&mut self, max_concurrency: usize) ->
Result<()> {
- futures::future::poll_fn(|cx| self.poll_for_capacity(cx,
max_concurrency)).await
+ futures_util::future::poll_fn(|cx| self.poll_for_capacity(cx,
max_concurrency)).await
}
/// Write data to this [`WriteMultipart`]
@@ -247,7 +247,7 @@ mod tests {
use std::sync::Arc;
use std::time::Duration;
- use futures::FutureExt;
+ use futures_util::FutureExt;
use parking_lot::Mutex;
use rand::prelude::StdRng;
use rand::{RngExt, SeedableRng};
@@ -287,7 +287,7 @@ mod tests {
impl MultipartUpload for InstrumentedUpload {
fn put_part(&mut self, data: PutPayload) -> UploadPart {
self.chunks.lock().push(data);
- futures::future::ready(Ok(())).boxed()
+ futures_util::future::ready(Ok(())).boxed()
}
async fn complete(&mut self) -> Result<PutResult> {
diff --git a/src/util.rs b/src/util.rs
index 565c1ce..162c077 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -23,7 +23,7 @@ use std::{
use super::Result;
use bytes::Bytes;
-use futures::{Stream, TryStreamExt, stream::StreamExt};
+use futures_util::{Stream, TryStreamExt, stream::StreamExt};
#[cfg(any(feature = "azure", feature = "http"))]
pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
@@ -114,7 +114,7 @@ where
{
let fetch_ranges = merge_ranges(ranges, coalesce);
- let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
+ let fetched: Vec<_> =
futures_util::stream::iter(fetch_ranges.iter().cloned())
.map(fetch)
.buffered(OBJECT_STORE_COALESCE_PARALLEL)
.try_collect()
@@ -348,7 +348,7 @@ mod tests {
fetches.push(range.clone());
let start = usize::try_from(range.start).unwrap();
let end = usize::try_from(range.end).unwrap();
-
futures::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
+
futures_util::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
},
coalesce,
)
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index 95027eb..819456a 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -19,7 +19,7 @@
use async_trait::async_trait;
use bytes::Bytes;
-use futures::stream::BoxStream;
+use futures_util::stream::BoxStream;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::*;