This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 01227cfb refactor(services/memcached): Rewrite memecached connection 
entirely (#2204)
01227cfb is described below

commit 01227cfbf06bb1f37694478f0b0c2f4a5f696131
Author: Xuanwo <[email protected]>
AuthorDate: Thu May 4 18:54:06 2023 +0800

    refactor(services/memcached): Rewrite memecached connection entirely (#2204)
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/memcached/MIT-ascii.txt |  20 ----
 core/src/services/memcached/ascii.rs      | 171 +++++++++++++++++-------------
 core/src/services/memcached/backend.rs    |  54 +++-------
 3 files changed, 113 insertions(+), 132 deletions(-)

diff --git a/core/src/services/memcached/MIT-ascii.txt 
b/core/src/services/memcached/MIT-ascii.txt
deleted file mode 100644
index c176da35..00000000
--- a/core/src/services/memcached/MIT-ascii.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-The MIT License (MIT)
-
-Copyright (c) 2017 An Long
-
-Permission is hereby granted, free of charge, to any person obtaining a copy of
-this software and associated documentation files (the "Software"), to deal in
-the Software without restriction, including without limitation the rights to
-use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
of
-the Software, and to permit persons to whom the Software is furnished to do so,
-subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
FITNESS
-FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
-COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
-IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
-CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/core/src/services/memcached/ascii.rs 
b/core/src/services/memcached/ascii.rs
index 85c74178..d5d8aa6f 100644
--- a/core/src/services/memcached/ascii.rs
+++ b/core/src/services/memcached/ascii.rs
@@ -1,55 +1,62 @@
-// Copyright 2017 vavrusa <[email protected]>
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
 //
-// Licensed under the MIT License (see MIT-ascii.txt);
-
-use core::fmt::Display;
-use std::io::Error;
-use std::io::ErrorKind;
-use std::marker::Unpin;
-
-use futures::io::AsyncBufReadExt;
-use futures::io::AsyncRead;
-use futures::io::AsyncReadExt;
-use futures::io::AsyncWrite;
-use futures::io::AsyncWriteExt;
-use futures::io::BufReader;
-
-/// Memcache ASCII protocol implementation.
-pub struct Protocol<S> {
-    io: BufReader<S>,
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::*;
+
+use super::backend::parse_io_error;
+use tokio::io::AsyncBufReadExt;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncWriteExt;
+use tokio::io::BufReader;
+use tokio::net::TcpStream;
+
+pub struct Connection {
+    io: BufReader<TcpStream>,
     buf: Vec<u8>,
 }
 
-impl<S> Protocol<S>
-where
-    S: AsyncRead + AsyncWrite + Unpin,
-{
-    /// Creates the ASCII protocol on a stream.
-    pub fn new(io: S) -> Self {
+impl Connection {
+    pub fn new(io: TcpStream) -> Self {
         Self {
             io: BufReader::new(io),
             buf: Vec::new(),
         }
     }
 
-    /// Returns the value for given key as bytes. If the value doesn't exist, 
[`ErrorKind::NotFound`] is returned.
-    pub async fn get<K: AsRef<[u8]>>(&mut self, key: K) -> Result<Vec<u8>, 
Error> {
+    pub async fn get(&mut self, key: &str) -> Result<Option<Vec<u8>>> {
         // Send command
         let writer = self.io.get_mut();
         writer
-            .write_all(&[b"get ", key.as_ref(), b"\r\n"].concat())
-            .await?;
-        writer.flush().await?;
+            .write_all(&[b"get ", key.as_bytes(), b"\r\n"].concat())
+            .await
+            .map_err(parse_io_error)?;
+        writer.flush().await.map_err(parse_io_error)?;
 
         // Read response header
-        let header = self.read_line().await?;
-        let header = std::str::from_utf8(header).map_err(|_| 
ErrorKind::InvalidData)?;
+        let header = self.read_header().await?;
 
         // Check response header and parse value length
         if header.contains("ERROR") {
-            return Err(Error::new(ErrorKind::Other, header));
+            return Err(
+                Error::new(ErrorKind::Unexpected, "unexpected data received")
+                    .with_context("message", header),
+            );
         } else if header.starts_with("END") {
-            return Err(ErrorKind::NotFound.into());
+            return Ok(None);
         }
 
         // VALUE <key> <flags> <bytes> [<cas unique>]\r\n
@@ -57,89 +64,109 @@ where
             .split(' ')
             .nth(3)
             .and_then(|len| len.trim_end().parse().ok())
-            .ok_or(ErrorKind::InvalidData)?;
+            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "invalid data 
received"))?;
 
         // Read value
         let mut buffer: Vec<u8> = vec![0; length];
-        self.io.read_exact(&mut buffer).await?;
+        self.io
+            .read_exact(&mut buffer)
+            .await
+            .map_err(parse_io_error)?;
 
         // Read the trailing header
         self.read_line().await?; // \r\n
         self.read_line().await?; // END\r\n
 
-        Ok(buffer)
+        Ok(Some(buffer))
     }
 
-    /// Set key to given value and don't wait for response.
-    pub async fn set<K: Display>(
-        &mut self,
-        key: K,
-        val: &[u8],
-        expiration: u32,
-    ) -> Result<(), Error> {
+    pub async fn set(&mut self, key: &str, val: &[u8], expiration: u32) -> 
Result<()> {
         let header = format!("set {} 0 {} {}\r\n", key, expiration, val.len());
-        self.io.write_all(header.as_bytes()).await?;
-        self.io.write_all(val).await?;
-        self.io.write_all(b"\r\n").await?;
-        self.io.flush().await?;
+        self.io
+            .write_all(header.as_bytes())
+            .await
+            .map_err(parse_io_error)?;
+        self.io.write_all(val).await.map_err(parse_io_error)?;
+        self.io.write_all(b"\r\n").await.map_err(parse_io_error)?;
+        self.io.flush().await.map_err(parse_io_error)?;
 
         // Read response header
-        let header = self.read_line().await?;
-        let header = std::str::from_utf8(header).map_err(|_| 
ErrorKind::InvalidData)?;
+        let header = self.read_header().await?;
+
         // Check response header and make sure we got a `STORED`
         if header.contains("STORED") {
             return Ok(());
         } else if header.contains("ERROR") {
-            return Err(Error::new(ErrorKind::Other, header));
+            return Err(
+                Error::new(ErrorKind::Unexpected, "unexpected data received")
+                    .with_context("message", header),
+            );
         }
         Ok(())
     }
 
-    /// Delete a key and don't wait for response.
-    pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
+    pub async fn delete(&mut self, key: &str) -> Result<()> {
         let header = format!("delete {}\r\n", key);
-        self.io.write_all(header.as_bytes()).await?;
-        self.io.flush().await?;
+        self.io
+            .write_all(header.as_bytes())
+            .await
+            .map_err(parse_io_error)?;
+        self.io.flush().await.map_err(parse_io_error)?;
 
         // Read response header
-        let header = self.read_line().await?;
-        let header = std::str::from_utf8(header).map_err(|_| 
ErrorKind::InvalidData)?;
+        let header = self.read_header().await?;
+
         // Check response header and parse value length
-        if header.contains("NOT_FOUND") {
+        if header.contains("NOT_FOUND") || header.starts_with("END") {
             return Ok(());
-        } else if header.starts_with("END") {
-            return Err(ErrorKind::NotFound.into());
         } else if header.contains("ERROR") || !header.contains("DELETED") {
-            return Err(Error::new(ErrorKind::Other, header));
+            return Err(
+                Error::new(ErrorKind::Unexpected, "unexpected data received")
+                    .with_context("message", header),
+            );
         }
         Ok(())
     }
 
-    /// Return the version of the remote server.
-    pub async fn version(&mut self) -> Result<String, Error> {
-        self.io.write_all(b"version\r\n").await?;
-        self.io.flush().await?;
+    pub async fn version(&mut self) -> Result<String> {
+        self.io
+            .write_all(b"version\r\n")
+            .await
+            .map_err(parse_io_error)?;
+        self.io.flush().await.map_err(parse_io_error)?;
 
         // Read response header
-        let header = {
-            let buf = self.read_line().await?;
-            std::str::from_utf8(buf).map_err(|_| 
Error::from(ErrorKind::InvalidData))?
-        };
+        let header = self.read_header().await?;
 
         if !header.starts_with("VERSION") {
-            return Err(Error::new(ErrorKind::Other, header));
+            return Err(
+                Error::new(ErrorKind::Unexpected, "unexpected data received")
+                    .with_context("message", header),
+            );
         }
         let version = header.trim_start_matches("VERSION ").trim_end();
         Ok(version.to_string())
     }
 
-    async fn read_line(&mut self) -> Result<&[u8], Error> {
+    async fn read_line(&mut self) -> Result<&[u8]> {
         let Self { io, buf } = self;
         buf.clear();
-        io.read_until(b'\n', buf).await?;
+        io.read_until(b'\n', buf).await.map_err(parse_io_error)?;
         if buf.last().copied() != Some(b'\n') {
-            return Err(ErrorKind::UnexpectedEof.into());
+            return Err(Error::new(
+                ErrorKind::ContentIncomplete,
+                "unexpected eof, the response must be incomplete",
+            ));
         }
         Ok(&buf[..])
     }
+
+    async fn read_header(&mut self) -> Result<&str> {
+        let header = self.read_line().await?;
+        let header = std::str::from_utf8(header).map_err(|err| {
+            Error::new(ErrorKind::Unexpected, "invalid data 
received").set_source(err)
+        })?;
+
+        Ok(header)
+    }
 }
diff --git a/core/src/services/memcached/backend.rs 
b/core/src/services/memcached/backend.rs
index 3920b239..ee6b30e7 100644
--- a/core/src/services/memcached/backend.rs
+++ b/core/src/services/memcached/backend.rs
@@ -18,7 +18,6 @@
 use std::collections::HashMap;
 use std::time::Duration;
 
-use async_compat::Compat;
 use async_trait::async_trait;
 use bb8::RunError;
 use tokio::net::TcpStream;
@@ -220,7 +219,7 @@ impl Adapter {
             RunError::TimedOut => {
                 Error::new(ErrorKind::Unexpected, "get connection from pool 
failed").set_temporary()
             }
-            RunError::User(err) => parse_io_error(err),
+            RunError::User(err) => err,
         })
     }
 }
@@ -243,12 +242,8 @@ impl kv::Adapter for Adapter {
 
     async fn get(&self, key: &str) -> Result<Option<Vec<u8>>> {
         let mut conn = self.conn().await?;
-        // TODO: memcache-async have `Sized` limit on key, can we remove it?
-        match conn.get(&percent_encode_path(key)).await {
-            Ok(bs) => Ok(Some(bs)),
-            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(None),
-            Err(err) => Err(parse_io_error(err)),
-        }
+
+        conn.get(&percent_encode_path(key)).await
     }
 
     async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
@@ -263,40 +258,13 @@ impl kv::Adapter for Adapter {
                 .unwrap_or_default(),
         )
         .await
-        .map_err(parse_io_error)?;
-
-        Ok(())
     }
 
     async fn delete(&self, key: &str) -> Result<()> {
         let mut conn = self.conn().await?;
 
-        let _: () = conn
-            .delete(&percent_encode_path(key))
-            .await
-            .map_err(parse_io_error)?;
-        Ok(())
-    }
-}
-
-fn parse_io_error(err: std::io::Error) -> Error {
-    use std::io::ErrorKind::*;
-
-    let (kind, retryable) = match err.kind() {
-        NotFound => (ErrorKind::NotFound, false),
-        AlreadyExists => (ErrorKind::NotFound, false),
-        PermissionDenied => (ErrorKind::PermissionDenied, false),
-        Interrupted | UnexpectedEof | TimedOut | WouldBlock => 
(ErrorKind::Unexpected, true),
-        _ => (ErrorKind::Unexpected, true),
-    };
-
-    let mut err = Error::new(kind, &err.kind().to_string()).set_source(err);
-
-    if retryable {
-        err = err.set_temporary();
+        conn.delete(&percent_encode_path(key)).await
     }
-
-    err
 }
 
 /// A `bb8::ManageConnection` for `memcache_async::ascii::Protocol`.
@@ -317,13 +285,15 @@ impl MemcacheConnectionManager {
 
 #[async_trait]
 impl bb8::ManageConnection for MemcacheConnectionManager {
-    type Connection = ascii::Protocol<Compat<TcpStream>>;
-    type Error = std::io::Error;
+    type Connection = ascii::Connection;
+    type Error = Error;
 
     /// TODO: Implement unix stream support.
     async fn connect(&self) -> std::result::Result<Self::Connection, 
Self::Error> {
-        let sock = TcpStream::connect(&self.address).await?;
-        Ok(ascii::Protocol::new(Compat::new(sock)))
+        let conn = TcpStream::connect(&self.address)
+            .await
+            .map_err(parse_io_error)?;
+        Ok(ascii::Connection::new(conn))
     }
 
     async fn is_valid(&self, conn: &mut Self::Connection) -> 
std::result::Result<(), Self::Error> {
@@ -334,3 +304,7 @@ impl bb8::ManageConnection for MemcacheConnectionManager {
         false
     }
 }
+
+pub fn parse_io_error(err: std::io::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, &err.kind().to_string()).set_source(err)
+}

Reply via email to