This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 72cd136  Minimize futures dependency into relevant sub-crates (#646)
72cd136 is described below

commit 72cd13608ba9c2c566c609c377bd627281f83501
Author: Adam Gutglick <[email protected]>
AuthorDate: Thu Feb 19 16:30:28 2026 +0000

    Minimize futures dependency into relevant sub-crates (#646)
    
    * Minimize futures dependency into relevant sub-crates
    
    * Fix docs
---
 Cargo.toml                    |  5 ++++-
 src/aws/mod.rs                |  8 ++++----
 src/azure/credential.rs       |  2 +-
 src/azure/mod.rs              |  6 +++---
 src/buffered.rs               |  4 ++--
 src/chunked.rs                |  8 ++++----
 src/client/get.rs             |  8 ++++----
 src/client/http/body.rs       | 10 +++++-----
 src/client/http/connection.rs |  6 ++----
 src/client/http/spawn.rs      |  2 +-
 src/client/list.rs            |  8 ++++----
 src/client/mock_server.rs     |  4 ++--
 src/client/pagination.rs      |  4 ++--
 src/client/retry.rs           |  2 +-
 src/delimited.rs              | 11 ++++++-----
 src/gcp/credential.rs         |  2 +-
 src/gcp/mod.rs                |  2 +-
 src/http/mod.rs               |  8 ++++----
 src/integration.rs            | 10 +++++-----
 src/lib.rs                    | 18 +++++++++---------
 src/limit.rs                  |  4 ++--
 src/local.rs                  | 18 +++++++++---------
 src/memory.rs                 |  8 ++++----
 src/prefix.rs                 |  2 +-
 src/throttle.rs               | 12 ++++++------
 src/upload.rs                 | 16 ++++++++--------
 src/util.rs                   |  6 +++---
 tests/get_range_file.rs       |  2 +-
 28 files changed, 99 insertions(+), 97 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 5e9f046..81cdf66 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -34,7 +34,8 @@ all-features = true
 async-trait = "0.1.53"
 bytes = "1.0"
 chrono = { version = "0.4.34", default-features = false, features = ["clock"] }
-futures = "0.3"
+futures-core = "0.3"
+futures-util = { version = "0.3", features = ["sink"] }
 http = "1.2.0"
 humantime = "2.1"
 itertools = "0.14.0"
@@ -70,6 +71,7 @@ nix = { version = "0.31.1", features = ["fs"] }
 [target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
 web-time = { version = "1.1.0" }
 wasm-bindgen-futures = "0.4.18"
+futures-channel = {version = "0.3", features = ["sink"]}
 
 [features]
 default = ["fs"]
@@ -84,6 +86,7 @@ integration = ["rand", "tokio"]
 tokio = ["dep:tokio", "dep:tracing"]
 
 [dev-dependencies] # In alphabetical order
+futures-executor = "0.3"
 hyper = { version = "1.2", features = ["server"] }
 hyper-util = "0.1"
 rand = "0.10"
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 7c8dcf3..0a1c233 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -29,8 +29,8 @@
 //! [automatic cleanup]: 
https://aws.amazon.com/blogs/aws/s3-lifecycle-management-update-support-for-multipart-uploads-and-delete-markers/
 
 use async_trait::async_trait;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::BoxStream;
+use futures_util::{StreamExt, TryStreamExt};
 use reqwest::header::{HeaderName, IF_MATCH, IF_NONE_MATCH};
 use reqwest::{Method, StatusCode};
 use std::{sync::Arc, time::Duration};
@@ -275,7 +275,7 @@ impl ObjectStore for AmazonS3 {
                     client
                         .bulk_delete_request(locations)
                         .await
-                        .map(futures::stream::iter)
+                        .map(futures_util::stream::iter)
                 }
             })
             .buffered(20)
@@ -298,7 +298,7 @@ impl ObjectStore for AmazonS3 {
             return self
                 .client
                 .list(prefix)
-                .try_filter(move |f| futures::future::ready(f.location > 
offset))
+                .try_filter(move |f| futures_util::future::ready(f.location > 
offset))
                 .boxed();
         }
 
diff --git a/src/azure/credential.rs b/src/azure/credential.rs
index 66ce655..1fbb101 100644
--- a/src/azure/credential.rs
+++ b/src/azure/credential.rs
@@ -1064,7 +1064,7 @@ impl CredentialProvider for AzureCliCredential {
 
 #[cfg(test)]
 mod tests {
-    use futures::executor::block_on;
+    use futures_executor::block_on;
     use http::{Response, StatusCode};
     use http_body_util::BodyExt;
     use reqwest::{Client, Method};
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 8a2e5d1..4460040 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -32,7 +32,7 @@ use crate::{
     signer::Signer,
 };
 use async_trait::async_trait;
-use futures::stream::{BoxStream, StreamExt, TryStreamExt};
+use futures_util::stream::{BoxStream, StreamExt, TryStreamExt};
 use reqwest::Method;
 use std::fmt::Debug;
 use std::sync::Arc;
@@ -134,7 +134,7 @@ impl ObjectStore for MicrosoftAzure {
             // See 
https://github.com/Azure/Azurite/issues/2619#issuecomment-3660701055
             let offset = offset.clone();
             self.list(prefix)
-                .try_filter(move |f| futures::future::ready(f.location > 
offset))
+                .try_filter(move |f| futures_util::future::ready(f.location > 
offset))
                 .boxed()
         } else {
             self.client.list_with_offset(prefix, offset)
@@ -157,7 +157,7 @@ impl ObjectStore for MicrosoftAzure {
                     client
                         .bulk_delete_request(locations)
                         .await
-                        .map(futures::stream::iter)
+                        .map(futures_util::stream::iter)
                 }
             })
             .buffered(20)
diff --git a/src/buffered.rs b/src/buffered.rs
index d22ecda..5a95343 100644
--- a/src/buffered.rs
+++ b/src/buffered.rs
@@ -23,8 +23,8 @@ use crate::{
     PutOptions, PutPayloadMut, TagSet, WriteMultipart,
 };
 use bytes::Bytes;
-use futures::future::{BoxFuture, FutureExt};
-use futures::ready;
+use futures_util::future::{BoxFuture, FutureExt};
+use futures_util::ready;
 use std::cmp::Ordering;
 use std::io::{Error, ErrorKind, SeekFrom};
 use std::pin::Pin;
diff --git a/src/chunked.rs b/src/chunked.rs
index b362366..870540a 100644
--- a/src/chunked.rs
+++ b/src/chunked.rs
@@ -23,8 +23,8 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use bytes::{BufMut, Bytes, BytesMut};
-use futures::StreamExt;
-use futures::stream::BoxStream;
+use futures_util::StreamExt;
+use futures_util::stream::BoxStream;
 
 use crate::path::Path;
 use crate::{
@@ -89,7 +89,7 @@ impl ObjectStore for ChunkedStore {
             }
             GetResultPayload::Stream(stream) => {
                 let buffer = BytesMut::new();
-                futures::stream::unfold(
+                futures_util::stream::unfold(
                     (stream, buffer, false, self.chunk_size),
                     |(mut stream, mut buffer, mut exhausted, chunk_size)| 
async move {
                         // Keep accumulating bytes until we reach capacity as 
long as
@@ -173,7 +173,7 @@ impl ObjectStore for ChunkedStore {
 
 #[cfg(test)]
 mod tests {
-    use futures::StreamExt;
+    use futures_util::StreamExt;
 
     use crate::ObjectStoreExt;
     #[cfg(feature = "fs")]
diff --git a/src/client/get.rs b/src/client/get.rs
index 5a5ec37..80b3f15 100644
--- a/src/client/get.rs
+++ b/src/client/get.rs
@@ -25,8 +25,8 @@ use crate::{
 };
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::StreamExt;
-use futures::stream::BoxStream;
+use futures_util::StreamExt;
+use futures_util::stream::BoxStream;
 use http::StatusCode;
 use http::header::{
     CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, 
CONTENT_RANGE,
@@ -201,7 +201,7 @@ impl<T: GetClient> GetContext<T> {
         etag: Option<String>,
         range: Range<u64>,
     ) -> BoxStream<'static, Result<Bytes>> {
-        futures::stream::try_unfold(
+        futures_util::stream::try_unfold(
             (self, body, etag, range),
             |(mut ctx, mut body, etag, mut range)| async move {
                 while let Some(ret) = body.frame().await {
@@ -489,7 +489,7 @@ mod http_tests {
     use crate::path::Path;
     use crate::{ClientOptions, ObjectStoreExt, RetryConfig};
     use bytes::Bytes;
-    use futures::FutureExt;
+    use futures_util::FutureExt;
     use http::header::{CONNECTION, CONTENT_LENGTH, CONTENT_RANGE, ETAG, RANGE};
     use http::{Response, StatusCode};
     use hyper::body::Frame;
diff --git a/src/client/http/body.rs b/src/client/http/body.rs
index f59da97..e22ccea 100644
--- a/src/client/http/body.rs
+++ b/src/client/http/body.rs
@@ -18,8 +18,8 @@
 use crate::client::{HttpError, HttpErrorKind};
 use crate::{PutPayload, collect_bytes};
 use bytes::Bytes;
-use futures::StreamExt;
-use futures::stream::BoxStream;
+use futures_util::StreamExt;
+use futures_util::stream::BoxStream;
 use http_body_util::combinators::BoxBody;
 use http_body_util::{BodyExt, Full};
 use hyper::body::{Body, Frame, SizeHint};
@@ -43,9 +43,9 @@ impl HttpRequestBody {
     pub(crate) fn into_reqwest(self) -> reqwest::Body {
         match self.0 {
             Inner::Bytes(b) => b.into(),
-            Inner::PutPayload(_, payload) => 
reqwest::Body::wrap_stream(futures::stream::iter(
-                payload.into_iter().map(Ok::<_, HttpError>),
-            )),
+            Inner::PutPayload(_, payload) => reqwest::Body::wrap_stream(
+                futures_util::stream::iter(payload.into_iter().map(Ok::<_, 
HttpError>)),
+            ),
         }
     }
 
diff --git a/src/client/http/connection.rs b/src/client/http/connection.rs
index ae474ce..69c8436 100644
--- a/src/client/http/connection.rs
+++ b/src/client/http/connection.rs
@@ -231,10 +231,8 @@ impl HttpService for reqwest::Client {
 #[cfg(all(target_arch = "wasm32", target_os = "unknown"))]
 impl HttpService for reqwest::Client {
     async fn call(&self, req: HttpRequest) -> Result<HttpResponse, HttpError> {
-        use futures::{
-            SinkExt, StreamExt, TryStreamExt,
-            channel::{mpsc, oneshot},
-        };
+        use futures_channel::{mpsc, oneshot};
+        use futures_util::{SinkExt, StreamExt, TryStreamExt};
         use http_body_util::{Empty, StreamBody};
         use wasm_bindgen_futures::spawn_local;
 
diff --git a/src/client/http/spawn.rs b/src/client/http/spawn.rs
index 5450b63..80f3a87 100644
--- a/src/client/http/spawn.rs
+++ b/src/client/http/spawn.rs
@@ -142,7 +142,7 @@ mod tests {
 
         let url = mock.url().to_string();
         let thread = std::thread::spawn(|| {
-            futures::executor::block_on(async move {
+            futures_executor::block_on(async move {
                 let retry = RetryConfig::default();
                 let ret = client.get(url).send_retry(&retry).await.unwrap();
                 let payload = ret.into_body().bytes().await.unwrap();
diff --git a/src/client/list.rs b/src/client/list.rs
index b7192bf..6ad87db 100644
--- a/src/client/list.rs
+++ b/src/client/list.rs
@@ -21,8 +21,8 @@ use crate::list::{PaginatedListOptions, PaginatedListResult};
 use crate::path::{DELIMITER, Path};
 use crate::{ListResult, ObjectMeta};
 use async_trait::async_trait;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::BoxStream;
+use futures_util::{StreamExt, TryStreamExt};
 use std::borrow::Cow;
 use std::collections::BTreeSet;
 
@@ -93,7 +93,7 @@ impl<T: ListClient + Clone> ListClientExt for T {
 
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         self.list_paginated(prefix, false, None)
-            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+            .map_ok(|r| 
futures_util::stream::iter(r.objects.into_iter().map(Ok)))
             .try_flatten()
             .boxed()
     }
@@ -104,7 +104,7 @@ impl<T: ListClient + Clone> ListClientExt for T {
         offset: &Path,
     ) -> BoxStream<'static, Result<ObjectMeta>> {
         self.list_paginated(prefix, false, Some(offset))
-            .map_ok(|r| futures::stream::iter(r.objects.into_iter().map(Ok)))
+            .map_ok(|r| 
futures_util::stream::iter(r.objects.into_iter().map(Ok)))
             .try_flatten()
             .boxed()
     }
diff --git a/src/client/mock_server.rs b/src/client/mock_server.rs
index c27fc9f..0d0703d 100644
--- a/src/client/mock_server.rs
+++ b/src/client/mock_server.rs
@@ -16,8 +16,8 @@
 // under the License.
 
 use crate::client::{HttpResponse, HttpResponseBody};
-use futures::FutureExt;
-use futures::future::BoxFuture;
+use futures_util::FutureExt;
+use futures_util::future::BoxFuture;
 use hyper::body::Incoming;
 use hyper::server::conn::http1;
 use hyper::service::service_fn;
diff --git a/src/client/pagination.rs b/src/client/pagination.rs
index d789c74..3d6a74a 100644
--- a/src/client/pagination.rs
+++ b/src/client/pagination.rs
@@ -16,7 +16,7 @@
 // under the License.
 
 use crate::Result;
-use futures::Stream;
+use futures_util::Stream;
 use std::future::Future;
 
 /// Takes a paginated operation `op` that when called with:
@@ -51,7 +51,7 @@ where
         Done,
     }
 
-    futures::stream::unfold(PaginationState::Start(state), move |state| {
+    futures_util::stream::unfold(PaginationState::Start(state), move |state| {
         let client = client.clone();
         async move {
             let (s, page_token) = match state {
diff --git a/src/client/retry.rs b/src/client/retry.rs
index 5437896..b0e6b0c 100644
--- a/src/client/retry.rs
+++ b/src/client/retry.rs
@@ -21,7 +21,7 @@ use crate::PutPayload;
 use crate::client::backoff::{Backoff, BackoffConfig};
 use crate::client::builder::HttpRequestBuilder;
 use crate::client::{HttpClient, HttpError, HttpErrorKind, HttpRequest, 
HttpResponse};
-use futures::future::BoxFuture;
+use futures_util::future::BoxFuture;
 use http::{Method, Uri};
 use reqwest::StatusCode;
 use reqwest::header::LOCATION;
diff --git a/src/delimited.rs b/src/delimited.rs
index 0994b4f..b9f8842 100644
--- a/src/delimited.rs
+++ b/src/delimited.rs
@@ -20,7 +20,7 @@
 use std::collections::VecDeque;
 
 use bytes::Bytes;
-use futures::{Stream, StreamExt};
+use futures_util::{Stream, StreamExt};
 
 use super::Result;
 
@@ -155,7 +155,7 @@ where
 {
     let delimiter = LineDelimiter::new();
 
-    futures::stream::unfold(
+    futures_util::stream::unfold(
         (s, delimiter, false),
         |(mut s, mut delimiter, mut exhausted)| async move {
             loop {
@@ -184,7 +184,7 @@ where
 
 #[cfg(test)]
 mod tests {
-    use futures::stream::{BoxStream, TryStreamExt};
+    use futures_util::stream::{BoxStream, TryStreamExt};
 
     use super::*;
 
@@ -231,7 +231,8 @@ mod tests {
     #[tokio::test]
     async fn test_delimiter_stream() {
         let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"];
-        let input_stream = futures::stream::iter(input.into_iter().map(|s| 
Ok(Bytes::from(s))));
+        let input_stream =
+            futures_util::stream::iter(input.into_iter().map(|s| 
Ok(Bytes::from(s))));
         let stream = newline_delimited_stream(input_stream);
 
         let results: Vec<_> = stream.try_collect().await.unwrap();
@@ -246,7 +247,7 @@ mod tests {
     }
     #[tokio::test]
     async fn test_delimiter_unfold_stream() {
-        let input_stream: BoxStream<'static, Result<Bytes>> = 
futures::stream::unfold(
+        let input_stream: BoxStream<'static, Result<Bytes>> = 
futures_util::stream::unfold(
             VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
             |mut input| async move {
                 if !input.is_empty() {
diff --git a/src/gcp/credential.rs b/src/gcp/credential.rs
index 75de68c..dfba358 100644
--- a/src/gcp/credential.rs
+++ b/src/gcp/credential.rs
@@ -27,7 +27,7 @@ use async_trait::async_trait;
 use base64::Engine;
 use base64::prelude::BASE64_URL_SAFE_NO_PAD;
 use chrono::{DateTime, Utc};
-use futures::TryFutureExt;
+use futures_util::TryFutureExt;
 use http::{HeaderMap, Method};
 use itertools::Itertools;
 use percent_encoding::utf8_percent_encode;
diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs
index 4e1679e..e1157a8 100644
--- a/src/gcp/mod.rs
+++ b/src/gcp/mod.rs
@@ -48,7 +48,7 @@ use crate::{
 };
 use async_trait::async_trait;
 use client::GoogleCloudStorageClient;
-use futures::stream::{BoxStream, StreamExt};
+use futures_util::stream::{BoxStream, StreamExt};
 use http::Method;
 use url::Url;
 
diff --git a/src/http/mod.rs b/src/http/mod.rs
index 9e5af9a..4d4081b 100644
--- a/src/http/mod.rs
+++ b/src/http/mod.rs
@@ -34,8 +34,8 @@
 use std::sync::Arc;
 
 use async_trait::async_trait;
-use futures::stream::BoxStream;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::BoxStream;
+use futures_util::{StreamExt, TryStreamExt};
 use itertools::Itertools;
 use url::Url;
 
@@ -160,7 +160,7 @@ impl ObjectStore for HttpStore {
         let prefix_len = prefix.map(|p| p.as_ref().len()).unwrap_or_default();
         let prefix = prefix.cloned();
         let client = Arc::clone(&self.client);
-        futures::stream::once(async move {
+        futures_util::stream::once(async move {
             let status = client.list(prefix.as_ref(), "infinity").await?;
 
             let iter = status
@@ -174,7 +174,7 @@ impl ObjectStore for HttpStore {
                 // Filter out exact prefix matches
                 .filter_ok(move |r| r.location.as_ref().len() > prefix_len);
 
-            Ok::<_, crate::Error>(futures::stream::iter(iter))
+            Ok::<_, crate::Error>(futures_util::stream::iter(iter))
         })
         .try_flatten()
         .boxed()
diff --git a/src/integration.rs b/src/integration.rs
index 731f7d2..352bd4b 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -32,8 +32,8 @@ use crate::{
     ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, 
WriteMultipart,
 };
 use bytes::Bytes;
-use futures::stream::FuturesUnordered;
-use futures::{StreamExt, TryStreamExt};
+use futures_util::stream::FuturesUnordered;
+use futures_util::{StreamExt, TryStreamExt};
 use rand::{RngExt, rng};
 use std::collections::HashSet;
 use std::slice;
@@ -411,7 +411,7 @@ pub async fn put_get_delete_list(storage: &DynObjectStore) {
     storage.put(&paths[4], "foo".into()).await.unwrap();
 
     let out_paths = storage
-        .delete_stream(futures::stream::iter(paths.clone()).map(Ok).boxed())
+        
.delete_stream(futures_util::stream::iter(paths.clone()).map(Ok).boxed())
         .collect::<Vec<_>>()
         .await;
 
@@ -713,7 +713,7 @@ pub async fn stream_get(storage: &DynObjectStore) {
     let bytes_expected = data.concat();
     let mut upload = storage.put_multipart(&location).await.unwrap();
     let uploads = data.into_iter().map(|x| upload.put_part(x.into()));
-    futures::future::try_join_all(uploads).await.unwrap();
+    futures_util::future::try_join_all(uploads).await.unwrap();
 
     // Object should not yet exist in store
     let meta_res = storage.head(&location).await;
@@ -1020,7 +1020,7 @@ pub async fn multipart(storage: &dyn ObjectStore, 
multipart: &dyn MultipartStore
 
     let id = multipart.create_multipart(&path).await.unwrap();
 
-    let parts: Vec<_> = futures::stream::iter(chunks)
+    let parts: Vec<_> = futures_util::stream::iter(chunks)
         .enumerate()
         .map(|(idx, b)| multipart.put_part(&path, &id, idx, b.into()))
         .buffered(2)
diff --git a/src/lib.rs b/src/lib.rs
index 288d169..8124d8f 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -174,7 +174,7 @@
 //! # use object_store::local::LocalFileSystem;
 //! # use std::sync::Arc;
 //! # use object_store::{path::Path, ObjectStore};
-//! # use futures::stream::StreamExt;
+//! # use futures_util::stream::StreamExt;
 //! # // use LocalFileSystem for example
 //! # fn get_object_store() -> Arc<dyn ObjectStore> {
 //! #   Arc::new(LocalFileSystem::new())
@@ -215,7 +215,7 @@
 //! from remote storage or files in the local filesystem as a stream.
 //!
 //! ```ignore-wasm32
-//! # use futures::TryStreamExt;
+//! # use futures_util::TryStreamExt;
 //! # use object_store::local::LocalFileSystem;
 //! # use std::sync::Arc;
 //! #  use bytes::Bytes;
@@ -610,7 +610,7 @@ use crate::util::maybe_spawn_blocking;
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
-use futures::{StreamExt, TryStreamExt, stream::BoxStream};
+use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
 use std::fmt::{Debug, Formatter};
 use std::ops::Range;
 use std::sync::Arc;
@@ -931,14 +931,14 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// return Ok. If it is an error, it will be [`Error::NotFound`].
     ///
     /// ```ignore-wasm32
-    /// # use futures::{StreamExt, TryStreamExt};
+    /// # use futures_util::{StreamExt, TryStreamExt};
     /// # use object_store::local::LocalFileSystem;
     /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
     /// # let root = tempfile::TempDir::new().unwrap();
     /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
     /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
     /// # use object_store::path::Path;
-    /// # use futures::{StreamExt, TryStreamExt};
+    /// # use futures_util::{StreamExt, TryStreamExt};
     /// #
     /// // Create two objects
     /// store.put(&Path::from("foo"), "foo".into()).await?;
@@ -964,7 +964,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     ///
     /// ```
     /// # use async_trait::async_trait;
-    /// # use futures::stream::{BoxStream, StreamExt};
+    /// # use futures_util::stream::{BoxStream, StreamExt};
     /// # use object_store::path::Path;
     /// # use object_store::{
     /// #     CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
@@ -1049,7 +1049,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// #
     /// # async fn example() {
     /// #     let store = ExampleStore { client: Arc::new(ExampleClient) };
-    /// #     let paths = futures::stream::iter(vec![Ok(Path::from("foo")), 
Ok(Path::from("bar"))]).boxed();
+    /// #     let paths = 
futures_util::stream::iter(vec![Ok(Path::from("foo")), 
Ok(Path::from("bar"))]).boxed();
     /// #     let results = 
store.delete_stream(paths).collect::<Vec<_>>().await;
     /// #     assert_eq!(results.len(), 2);
     /// #     assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
@@ -1091,7 +1091,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     ) -> BoxStream<'static, Result<ObjectMeta>> {
         let offset = offset.clone();
         self.list(prefix)
-            .try_filter(move |f| futures::future::ready(f.location > offset))
+            .try_filter(move |f| futures_util::future::ready(f.location > 
offset))
             .boxed()
     }
 
@@ -1364,7 +1364,7 @@ where
     async fn delete(&self, location: &Path) -> Result<()> {
         let location = location.clone();
         let mut stream =
-            self.delete_stream(futures::stream::once(async move { Ok(location) 
}).boxed());
+            self.delete_stream(futures_util::stream::once(async move { 
Ok(location) }).boxed());
         let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
             store: "ext",
             source: "`delete_stream` with one location should yield once but 
didn't".into(),
diff --git a/src/limit.rs b/src/limit.rs
index 30fe2b6..fa29d1b 100644
--- a/src/limit.rs
+++ b/src/limit.rs
@@ -24,7 +24,7 @@ use crate::{
 };
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::{FutureExt, Stream};
+use futures_util::{FutureExt, Stream};
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -245,7 +245,7 @@ mod tests {
     use crate::integration::*;
     use crate::limit::LimitStore;
     use crate::memory::InMemory;
-    use futures::stream::StreamExt;
+    use futures_util::stream::StreamExt;
     use std::pin::Pin;
     use std::time::Duration;
     use tokio::time::timeout;
diff --git a/src/local.rs b/src/local.rs
index 907b589..98dda95 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -31,8 +31,8 @@ use std::{collections::VecDeque, path::PathBuf};
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
-use futures::{FutureExt, TryStreamExt};
-use futures::{StreamExt, stream::BoxStream};
+use futures_util::{FutureExt, TryStreamExt};
+use futures_util::{StreamExt, stream::BoxStream};
 use parking_lot::Mutex;
 use url::Url;
 use walkdir::{DirEntry, WalkDir};
@@ -709,7 +709,7 @@ impl LocalFileSystem {
         let root_path = match prefix {
             Some(prefix) => match config.prefix_to_filesystem(prefix) {
                 Ok(path) => path,
-                Err(e) => return 
futures::future::ready(Err(e)).into_stream().boxed(),
+                Err(e) => return 
futures_util::future::ready(Err(e)).into_stream().boxed(),
             },
             None => config.root.to_file_path().unwrap(),
         };
@@ -754,14 +754,14 @@ impl LocalFileSystem {
         // If no tokio context, return iterator directly as no
         // need to perform chunked spawn_blocking reads
         if tokio::runtime::Handle::try_current().is_err() {
-            return futures::stream::iter(s).boxed();
+            return futures_util::stream::iter(s).boxed();
         }
 
         // Otherwise list in batches of CHUNK_SIZE
         const CHUNK_SIZE: usize = 1024;
 
         let buffer = VecDeque::with_capacity(CHUNK_SIZE);
-        futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async 
move {
+        futures_util::stream::try_unfold((s, buffer), |(mut s, mut buffer)| 
async move {
             if buffer.is_empty() {
                 (s, buffer) = tokio::task::spawn_blocking(move || {
                     for _ in 0..CHUNK_SIZE {
@@ -929,7 +929,7 @@ pub(crate) fn chunked_stream(
     range: Range<u64>,
     chunk_size: usize,
 ) -> BoxStream<'static, Result<Bytes, super::Error>> {
-    futures::stream::once(async move {
+    futures_util::stream::once(async move {
         let requested = range.end - range.start;
 
         let (file, path) = maybe_spawn_blocking(move || {
@@ -939,7 +939,7 @@ pub(crate) fn chunked_stream(
         })
         .await?;
 
-        let stream = futures::stream::try_unfold(
+        let stream = futures_util::stream::try_unfold(
             (file, path, requested),
             move |(mut file, path, remaining)| {
                 maybe_spawn_blocking(move || {
@@ -1254,7 +1254,7 @@ fn convert_walkdir_result(
 mod tests {
     use std::fs;
 
-    use futures::TryStreamExt;
+    use futures_util::TryStreamExt;
     use tempfile::TempDir;
 
     #[cfg(target_family = "unix")]
@@ -1287,7 +1287,7 @@ mod tests {
     fn test_non_tokio() {
         let root = TempDir::new().unwrap();
         let integration = 
LocalFileSystem::new_with_prefix(root.path()).unwrap();
-        futures::executor::block_on(async move {
+        futures_executor::block_on(async move {
             put_get_delete_list(&integration).await;
             list_uses_directories_correctly(&integration).await;
             list_with_delimiter(&integration).await;
diff --git a/src/memory.rs b/src/memory.rs
index d6c7c0c..1af2584 100644
--- a/src/memory.rs
+++ b/src/memory.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
 use async_trait::async_trait;
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
-use futures::{StreamExt, stream::BoxStream};
+use futures_util::{StreamExt, stream::BoxStream};
 use parking_lot::RwLock;
 
 use crate::multipart::{MultipartStore, PartId};
@@ -259,7 +259,7 @@ impl ObjectStore for InMemory {
             }
             None => (0..entry.data.len() as u64, entry.data),
         };
-        let stream = futures::stream::once(futures::future::ready(Ok(data)));
+        let stream = 
futures_util::stream::once(futures_util::future::ready(Ok(data)));
 
         Ok(GetResult {
             payload: GetResultPayload::Stream(stream.boxed()),
@@ -334,7 +334,7 @@ impl ObjectStore for InMemory {
             })
             .collect();
 
-        futures::stream::iter(values).boxed()
+        futures_util::stream::iter(values).boxed()
     }
 
     /// The memory implementation returns all results, as opposed to the cloud
@@ -512,7 +512,7 @@ struct InMemoryUpload {
 impl MultipartUpload for InMemoryUpload {
     fn put_part(&mut self, payload: PutPayload) -> UploadPart {
         self.parts.push(payload);
-        Box::pin(futures::future::ready(Ok(())))
+        Box::pin(futures_util::future::ready(Ok(())))
     }
 
     async fn complete(&mut self) -> Result<PutResult> {
diff --git a/src/prefix.rs b/src/prefix.rs
index 67456fd..2850c52 100644
--- a/src/prefix.rs
+++ b/src/prefix.rs
@@ -17,7 +17,7 @@
 
 //! An object store wrapper handling a constant path prefix
 use bytes::Bytes;
-use futures::{StreamExt, TryStreamExt, stream::BoxStream};
+use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
 use std::ops::Range;
 
 use crate::multipart::{MultipartStore, PartId};
diff --git a/src/throttle.rs b/src/throttle.rs
index 1fc90d7..fb668f0 100644
--- a/src/throttle.rs
+++ b/src/throttle.rs
@@ -28,7 +28,7 @@ use crate::{
 };
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::{FutureExt, StreamExt, stream::BoxStream};
+use futures_util::{FutureExt, StreamExt, stream::BoxStream};
 use std::time::Duration;
 
 /// Configuration settings for throttled store
@@ -212,7 +212,7 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
     fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, 
Result<ObjectMeta>> {
         let stream = self.inner.list(prefix);
         let config = Arc::clone(&self.config);
-        futures::stream::once(async move {
+        futures_util::stream::once(async move {
             let config = *config.lock();
             let wait_list_per_entry = config.wait_list_per_entry;
             sleep(config.wait_list_per_call).await;
@@ -229,7 +229,7 @@ impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
     ) -> BoxStream<'static, Result<ObjectMeta>> {
         let stream = self.inner.list_with_offset(prefix, offset);
         let config = Arc::clone(&self.config);
-        futures::stream::once(async move {
+        futures_util::stream::once(async move {
             let config = *config.lock();
             let wait_list_per_entry = config.wait_list_per_entry;
             sleep(config.wait_list_per_call).await;
@@ -299,7 +299,7 @@ where
     stream
         .then(move |result| {
             let delay = result.as_ref().ok().map(&delay).unwrap_or_default();
-            sleep(delay).then(|_| futures::future::ready(result))
+            sleep(delay).then(|_| futures_util::future::ready(result))
         })
         .boxed()
 }
@@ -368,7 +368,7 @@ mod tests {
     use crate::GetResultPayload;
     use crate::ObjectStoreExt;
     use crate::{integration::*, memory::InMemory};
-    use futures::TryStreamExt;
+    use futures_util::TryStreamExt;
     use tokio::time::Duration;
     use tokio::time::Instant;
 
@@ -580,7 +580,7 @@ mod tests {
 
         // materialize the paths so that the throttle time for listing is not 
counted
         let paths = store.list(Some(&prefix)).collect::<Vec<_>>().await;
-        let paths = futures::stream::iter(paths)
+        let paths = futures_util::stream::iter(paths)
             .map(|x| x.map(|m| m.location))
             .boxed();
 
diff --git a/src/upload.rs b/src/upload.rs
index d7b760d..01e6b28 100644
--- a/src/upload.rs
+++ b/src/upload.rs
@@ -22,7 +22,7 @@ use std::task::{Context, Poll};
 use crate::PutPayloadMut;
 use crate::{PutPayload, PutResult, Result};
 use async_trait::async_trait;
-use futures::future::BoxFuture;
+use futures_util::future::BoxFuture;
 #[cfg(feature = "tokio")]
 use tokio::task::JoinSet;
 
@@ -52,7 +52,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
     /// returned futures in parallel
     ///
     /// ```no_run
-    /// # use futures::StreamExt;
+    /// # use futures_util::StreamExt;
     /// # use object_store::MultipartUpload;
     /// #
     /// # async fn test() {
@@ -60,7 +60,7 @@ pub trait MultipartUpload: Send + std::fmt::Debug {
     /// let mut upload: Box<&dyn MultipartUpload> = todo!();
     /// let p1 = upload.put_part(vec![0; 10 * 1024 * 1024].into());
     /// let p2 = upload.put_part(vec![1; 10 * 1024 * 1024].into());
-    /// futures::future::try_join(p1, p2).await.unwrap();
+    /// futures_util::future::try_join(p1, p2).await.unwrap();
     /// upload.complete().await.unwrap();
     /// # }
     /// ```
@@ -117,7 +117,7 @@ impl<W: MultipartUpload + ?Sized> MultipartUpload for 
Box<W> {
 /// allowing back pressure on producers, prior to buffering the next part. 
However, unlike
 /// [`Sink`] this back pressure is optional, allowing integration with 
synchronous producers
 ///
-/// [`Sink`]: futures::sink::Sink
+/// [`Sink`]: futures_util::sink::Sink
 #[cfg(feature = "tokio")]
 #[derive(Debug)]
 pub struct WriteMultipart {
@@ -156,7 +156,7 @@ impl WriteMultipart {
         max_concurrency: usize,
     ) -> Poll<Result<()>> {
         while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
-            futures::ready!(self.tasks.poll_join_next(cx)).unwrap()??
+            futures_core::ready!(self.tasks.poll_join_next(cx)).unwrap()??
         }
         Poll::Ready(Ok(()))
     }
@@ -165,7 +165,7 @@ impl WriteMultipart {
     ///
     /// See [`Self::poll_for_capacity`] for a [`Poll`] version of this function
     pub async fn wait_for_capacity(&mut self, max_concurrency: usize) -> 
Result<()> {
-        futures::future::poll_fn(|cx| self.poll_for_capacity(cx, 
max_concurrency)).await
+        futures_util::future::poll_fn(|cx| self.poll_for_capacity(cx, 
max_concurrency)).await
     }
 
     /// Write data to this [`WriteMultipart`]
@@ -247,7 +247,7 @@ mod tests {
     use std::sync::Arc;
     use std::time::Duration;
 
-    use futures::FutureExt;
+    use futures_util::FutureExt;
     use parking_lot::Mutex;
     use rand::prelude::StdRng;
     use rand::{RngExt, SeedableRng};
@@ -287,7 +287,7 @@ mod tests {
     impl MultipartUpload for InstrumentedUpload {
         fn put_part(&mut self, data: PutPayload) -> UploadPart {
             self.chunks.lock().push(data);
-            futures::future::ready(Ok(())).boxed()
+            futures_util::future::ready(Ok(())).boxed()
         }
 
         async fn complete(&mut self) -> Result<PutResult> {
diff --git a/src/util.rs b/src/util.rs
index 565c1ce..162c077 100644
--- a/src/util.rs
+++ b/src/util.rs
@@ -23,7 +23,7 @@ use std::{
 
 use super::Result;
 use bytes::Bytes;
-use futures::{Stream, TryStreamExt, stream::StreamExt};
+use futures_util::{Stream, TryStreamExt, stream::StreamExt};
 
 #[cfg(any(feature = "azure", feature = "http"))]
 pub(crate) static RFC1123_FMT: &str = "%a, %d %h %Y %T GMT";
@@ -114,7 +114,7 @@ where
 {
     let fetch_ranges = merge_ranges(ranges, coalesce);
 
-    let fetched: Vec<_> = futures::stream::iter(fetch_ranges.iter().cloned())
+    let fetched: Vec<_> = 
futures_util::stream::iter(fetch_ranges.iter().cloned())
         .map(fetch)
         .buffered(OBJECT_STORE_COALESCE_PARALLEL)
         .try_collect()
@@ -348,7 +348,7 @@ mod tests {
                 fetches.push(range.clone());
                 let start = usize::try_from(range.start).unwrap();
                 let end = usize::try_from(range.end).unwrap();
-                
futures::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
+                
futures_util::future::ready(Ok(Bytes::from(src[start..end].to_vec())))
             },
             coalesce,
         )
diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs
index 95027eb..819456a 100644
--- a/tests/get_range_file.rs
+++ b/tests/get_range_file.rs
@@ -19,7 +19,7 @@
 
 use async_trait::async_trait;
 use bytes::Bytes;
-use futures::stream::BoxStream;
+use futures_util::stream::BoxStream;
 use object_store::local::LocalFileSystem;
 use object_store::path::Path;
 use object_store::*;


Reply via email to