alamb commented on code in PR #2242:
URL: https://github.com/apache/arrow-rs/pull/2242#discussion_r933967628


##########
object_store/src/limit.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store that limits the maximum concurrency of the wrapped 
implementation
+
+use crate::{
+    BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Path, Result,
+    StreamExt,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::Stream;
+use std::io::{Error, IoSlice};
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+
+/// Store wrapper that wraps an inner store and limits the maximum number of 
concurrent
+/// object store operations. Where each call to an [`ObjectStore`] member 
function is
+/// considered a single operation, even if it may result in more than one 
network call
+#[derive(Debug)]
+pub struct LimitStore<T: ObjectStore> {
+    inner: T,
+    semaphore: Arc<Semaphore>,
+}
+
+impl<T: ObjectStore> LimitStore<T> {
+    /// Create new limit store
+    pub fn new(inner: T, max_requests: usize) -> Self {
+        Self {
+            inner,
+            semaphore: Arc::new(Semaphore::new(max_requests)),
+        }
+    }
+}
+
+impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "LimitStore({})", self.inner)

Review Comment:
   including `max_requests` would also be helpful here 



##########
object_store/src/limit.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store that limits the maximum concurrency of the wrapped 
implementation
+
+use crate::{
+    BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Path, Result,
+    StreamExt,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::Stream;
+use std::io::{Error, IoSlice};
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+
+/// Store wrapper that wraps an inner store and limits the maximum number of 
concurrent
+/// object store operations. Where each call to an [`ObjectStore`] member 
function is
+/// considered a single operation, even if it may result in more than one 
network call
+#[derive(Debug)]
+pub struct LimitStore<T: ObjectStore> {
+    inner: T,
+    semaphore: Arc<Semaphore>,
+}
+
+impl<T: ObjectStore> LimitStore<T> {
+    /// Create new limit store
+    pub fn new(inner: T, max_requests: usize) -> Self {
+        Self {
+            inner,
+            semaphore: Arc::new(Semaphore::new(max_requests)),
+        }
+    }
+}
+
+impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "LimitStore({})", self.inner)
+    }
+}
+
+#[async_trait]
+impl<T: ObjectStore> ObjectStore for LimitStore<T> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();

Review Comment:
   I double checked if we needed to check the error here -- 
https://docs.rs/tokio/1.20.1/tokio/sync/struct.Semaphore.html#method.acquire 
says that errors are only returned if the Semaphore is closed, which this code 
does not do 👍 



##########
object_store/src/limit.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store that limits the maximum concurrency of the wrapped 
implementation
+
+use crate::{
+    BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Path, Result,
+    StreamExt,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::Stream;
+use std::io::{Error, IoSlice};
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+
+/// Store wrapper that wraps an inner store and limits the maximum number of 
concurrent
+/// object store operations. Where each call to an [`ObjectStore`] member 
function is
+/// considered a single operation, even if it may result in more than one 
network call

Review Comment:
   An example in this doc of how to construct a `LimitStore` might be helpful



##########
object_store/src/limit.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store that limits the maximum concurrency of the wrapped 
implementation
+
+use crate::{
+    BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Path, Result,
+    StreamExt,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::Stream;
+use std::io::{Error, IoSlice};
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+
+/// Store wrapper that wraps an inner store and limits the maximum number of 
concurrent
+/// object store operations. Where each call to an [`ObjectStore`] member 
function is
+/// considered a single operation, even if it may result in more than one 
network call
+#[derive(Debug)]
+pub struct LimitStore<T: ObjectStore> {
+    inner: T,
+    semaphore: Arc<Semaphore>,
+}
+
+impl<T: ObjectStore> LimitStore<T> {
+    /// Create new limit store

Review Comment:
   ```suggestion
       /// Create new limit store that will limit the maximum
       /// number of outstanding concurrent requests to
       /// `max_requests` 
   ```



##########
object_store/src/limit.rs:
##########
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! An object store that limits the maximum concurrency of the wrapped 
implementation
+
+use crate::{
+    BoxStream, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Path, Result,
+    StreamExt,
+};
+use async_trait::async_trait;
+use bytes::Bytes;
+use futures::Stream;
+use std::io::{Error, IoSlice};
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use tokio::io::AsyncWrite;
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
+
+/// Store wrapper that wraps an inner store and limits the maximum number of 
concurrent
+/// object store operations. Where each call to an [`ObjectStore`] member 
function is
+/// considered a single operation, even if it may result in more than one 
network call
+#[derive(Debug)]
+pub struct LimitStore<T: ObjectStore> {
+    inner: T,
+    semaphore: Arc<Semaphore>,
+}
+
+impl<T: ObjectStore> LimitStore<T> {
+    /// Create new limit store
+    pub fn new(inner: T, max_requests: usize) -> Self {
+        Self {
+            inner,
+            semaphore: Arc::new(Semaphore::new(max_requests)),
+        }
+    }
+}
+
+impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "LimitStore({})", self.inner)
+    }
+}
+
+#[async_trait]
+impl<T: ObjectStore> ObjectStore for LimitStore<T> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.put(location, bytes).await
+    }
+
+    async fn put_multipart(
+        &self,
+        location: &Path,
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+        let (id, write) = self.inner.put_multipart(location).await?;
+        Ok((id, Box::new(PermitWrapper::new(write, permit))))
+    }
+
+    async fn abort_multipart(
+        &self,
+        location: &Path,
+        multipart_id: &MultipartId,
+    ) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.abort_multipart(location, multipart_id).await
+    }
+
+    async fn get(&self, location: &Path) -> Result<GetResult> {
+        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+        match self.inner.get(location).await? {
+            r @ GetResult::File(_, _) => Ok(r),
+            GetResult::Stream(s) => {
+                Ok(GetResult::Stream(PermitWrapper::new(s, permit).boxed()))
+            }
+        }
+    }
+
+    async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.get_range(location, range).await
+    }
+
+    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.head(location).await
+    }
+
+    async fn delete(&self, location: &Path) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.delete(location).await
+    }
+
+    async fn list(
+        &self,
+        prefix: Option<&Path>,
+    ) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
+        let permit = 
Arc::clone(&self.semaphore).acquire_owned().await.unwrap();
+        let s = self.inner.list(prefix).await?;
+        Ok(PermitWrapper::new(s, permit).boxed())
+    }
+
+    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.list_with_delimiter(prefix).await
+    }
+
+    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.copy(from, to).await
+    }
+
+    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.rename(from, to).await
+    }
+
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.copy_if_not_exists(from, to).await
+    }
+
+    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> 
{
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.rename_if_not_exists(from, to).await
+    }
+}
+
+/// Combines an [`OwnedSemaphorePermit`] with some other type
+struct PermitWrapper<T> {
+    inner: T,
+    #[allow(dead_code)]
+    permit: OwnedSemaphorePermit,
+}
+
+impl<T> PermitWrapper<T> {
+    fn new(inner: T, permit: OwnedSemaphorePermit) -> Self {
+        Self { inner, permit }
+    }
+}
+
+impl<T: Stream + Unpin> Stream for PermitWrapper<T> {
+    type Item = T::Item;
+
+    fn poll_next(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        Pin::new(&mut self.inner).poll_next(cx)
+    }
+
+    fn size_hint(&self) -> (usize, Option<usize>) {
+        self.inner.size_hint()
+    }
+}
+
+impl<T: AsyncWrite + Unpin> AsyncWrite for PermitWrapper<T> {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &[u8],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        Pin::new(&mut self.inner).poll_write(cx, buf)
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.inner).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<std::result::Result<(), Error>> {
+        Pin::new(&mut self.inner).poll_shutdown(cx)
+    }
+
+    fn poll_write_vectored(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        bufs: &[IoSlice<'_>],
+    ) -> Poll<std::result::Result<usize, Error>> {
+        Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
+    }
+
+    fn is_write_vectored(&self) -> bool {
+        self.inner.is_write_vectored()
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::limit::LimitStore;
+    use crate::memory::InMemory;
+    use crate::tests::{
+        list_uses_directories_correctly, list_with_delimiter, 
put_get_delete_list,
+        rename_and_copy, stream_get,
+    };
+    use crate::ObjectStore;
+    use std::time::Duration;
+    use tokio::time::timeout;
+
+    #[tokio::test]
+    async fn limit_test() {
+        let max_requests = 10;
+        let memory = InMemory::new();
+        let integration = LimitStore::new(memory, max_requests);
+
+        put_get_delete_list(&integration).await.unwrap();
+        list_uses_directories_correctly(&integration).await.unwrap();
+        list_with_delimiter(&integration).await.unwrap();
+        rename_and_copy(&integration).await.unwrap();
+        stream_get(&integration).await.unwrap();
+
+        let mut streams = Vec::with_capacity(max_requests);
+        for _ in 0..max_requests {
+            let stream = integration.list(None).await.unwrap();
+            streams.push(stream);
+        }
+
+        let t = Duration::from_millis(1);
+
+        // Expect to not be able to make another request

Review Comment:
   👍 I am always a little worried when I see a `sleep` style thing in a test as 
I worry about intermittent failures due to timing differences, but this use 
seems sound 👍 
   
   Maybe we can bump the timeout up to 20ms or something to be safe on slow 
machines



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to