This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 26b52cc17 feat(service): support b2 (#3604)
26b52cc17 is described below

commit 26b52cc17fc2c1bb080c131b5e3d2b1b78081359
Author: hoslo <[email protected]>
AuthorDate: Tue Nov 21 17:28:36 2023 +0800

    feat(service): support b2 (#3604)
---
 .env.example                       |   6 +
 core/Cargo.toml                    |   1 +
 core/src/services/b2/backend.rs    | 554 +++++++++++++++++++++++++++++
 core/src/services/b2/core.rs       | 706 +++++++++++++++++++++++++++++++++++++
 core/src/services/b2/docs.md       |  56 +++
 core/src/services/b2/error.rs      | 138 ++++++++
 core/src/services/b2/lister.rs     | 104 ++++++
 core/src/services/b2/mod.rs        |  25 ++
 core/src/services/b2/writer.rs     | 163 +++++++++
 core/src/services/mod.rs           |   7 +
 core/src/types/operator/builder.rs |   2 +
 core/src/types/scheme.rs           |   6 +
 12 files changed, 1768 insertions(+)

diff --git a/.env.example b/.env.example
index 7f11ebf03..450c734f8 100644
--- a/.env.example
+++ b/.env.example
@@ -169,3 +169,9 @@ OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
 # alluxio
 OPENDAL_ALLUXIO_ENDPOINT=<endpoint>
 OPENDAL_ALLUXIO_ROOT=/path/to/dor
+# b2
+OPENDAL_B2_ROOT=/path/to/dir
+OPENDAL_B2_BUCKET=<bucket>
+OPENDAL_B2_BUCKET_ID=<bucket_id>
+OPENDAL_B2_APPLICATION_KEY_ID=<key_id>
+OPENDAL_B2_APPLICATION_KEY=<application_key>
\ No newline at end of file
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f5e261b00..d4833ecaa 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -125,6 +125,7 @@ services-azdls = [
   "reqsign?/reqwest_request",
 ]
 services-azfile = []
+services-b2 = []
 services-cacache = ["dep:cacache"]
 services-cloudflare-kv = []
 services-cos = [
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
new file mode 100644
index 000000000..0c09d55fc
--- /dev/null
+++ b/core/src/services/b2/backend.rs
@@ -0,0 +1,554 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::Request;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+use tokio::sync::RwLock;
+
+use crate::raw::*;
+use crate::services::b2::core::B2Signer;
+use crate::services::b2::core::ListFileNamesResponse;
+use crate::*;
+
+use super::core::constants;
+use super::core::parse_file_info;
+use super::core::B2Core;
+use super::error::parse_error;
+use super::lister::B2Lister;
+use super::writer::B2Writer;
+use super::writer::B2Writers;
+
+/// Config for backblaze b2 services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct B2Config {
+    /// root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub root: Option<String>,
+    /// keyID of this backend.
+    ///
+    /// - If application_key_id is set, we will take user's input first.
+    /// - If not, we will try to load it from environment.
+    pub application_key_id: Option<String>,
+    /// applicationKey of this backend.
+    ///
+    /// - If application_key is set, we will take user's input first.
+    /// - If not, we will try to load it from environment.
+    pub application_key: Option<String>,
+    /// bucket of this backend.
+    ///
+    /// required.
+    pub bucket: String,
+    /// bucket id of this backend.
+    ///
+    /// required.
+    pub bucket_id: String,
+}
+
+impl Debug for B2Config {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("B2Config");
+
+        d.field("root", &self.root)
+            .field("application_key_id", &self.application_key_id)
+            .field("bucket_id", &self.bucket_id)
+            .field("bucket", &self.bucket);
+
+        d.finish_non_exhaustive()
+    }
+}
+
+/// [b2](https://www.backblaze.com/cloud-storage) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct B2Builder {
+    config: B2Config,
+
+    http_client: Option<HttpClient>,
+}
+
+impl Debug for B2Builder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut d = f.debug_struct("B2Builder");
+
+        d.field("config", &self.config);
+        d.finish_non_exhaustive()
+    }
+}
+
+impl B2Builder {
+    /// Set root of this backend.
+    ///
+    /// All operations will happen under this root.
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        self.config.root = if root.is_empty() {
+            None
+        } else {
+            Some(root.to_string())
+        };
+
+        self
+    }
+
+    /// application_key_id of this backend.
+    pub fn application_key_id(&mut self, application_key_id: &str) -> &mut 
Self {
+        self.config.application_key_id = if application_key_id.is_empty() {
+            None
+        } else {
+            Some(application_key_id.to_string())
+        };
+
+        self
+    }
+
+    /// application_key of this backend.
+    pub fn application_key(&mut self, application_key: &str) -> &mut Self {
+        self.config.application_key = if application_key.is_empty() {
+            None
+        } else {
+            Some(application_key.to_string())
+        };
+
+        self
+    }
+
+    /// Set bucket name of this backend.
+    /// You can find it in https://secure.backblaze.com/b2_buckets.html
+    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+        self.config.bucket = bucket.to_string();
+
+        self
+    }
+
+    /// Set bucket id of this backend.
+    /// You can find it in https://secure.backblaze.com/b2_buckets.html
+    pub fn bucket_id(&mut self, bucket_id: &str) -> &mut Self {
+        self.config.bucket_id = bucket_id.to_string();
+
+        self
+    }
+
+    /// Specify the http client that used by this service.
+    ///
+    /// # Notes
+    ///
+    /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+    /// during minor updates.
+    pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+        self.http_client = Some(client);
+        self
+    }
+}
+
+impl Builder for B2Builder {
+    const SCHEME: Scheme = Scheme::B2;
+    type Accessor = B2Backend;
+
+    /// Converts a HashMap into an B2Builder instance.
+    ///
+    /// # Arguments
+    ///
+    /// * `map` - A HashMap containing the configuration values.
+    ///
+    /// # Returns
+    ///
+    /// Returns an instance of B2Builder.
+    fn from_map(map: HashMap<String, String>) -> Self {
+        // Deserialize the configuration from the HashMap.
+        let config = B2Config::deserialize(ConfigDeserializer::new(map))
+            .expect("config deserialize must succeed");
+
+        // Create an B2Builder instance with the deserialized config.
+        B2Builder {
+            config,
+            http_client: None,
+        }
+    }
+
+    /// Builds the backend and returns the result of B2Backend.
+    fn build(&mut self) -> Result<Self::Accessor> {
+        debug!("backend build started: {:?}", &self);
+
+        let root = 
normalize_root(&self.config.root.clone().unwrap_or_default());
+        debug!("backend use root {}", &root);
+
+        // Handle bucket.
+        if self.config.bucket.is_empty() {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::B2));
+        }
+
+        debug!("backend use bucket {}", &self.config.bucket);
+
+        // Handle bucket_id.
+        if self.config.bucket_id.is_empty() {
+            return Err(Error::new(ErrorKind::ConfigInvalid, "bucket_id is 
empty")
+                .with_operation("Builder::build")
+                .with_context("service", Scheme::B2));
+        }
+
+        debug!("backend bucket_id {}", &self.config.bucket_id);
+
+        let application_key_id = match &self.config.application_key_id {
+            Some(application_key_id) => Ok(application_key_id.clone()),
+            None => Err(
+                Error::new(ErrorKind::ConfigInvalid, "application_key_id is 
empty")
+                    .with_operation("Builder::build")
+                    .with_context("service", Scheme::B2),
+            ),
+        }?;
+
+        let application_key = match &self.config.application_key {
+            Some(key_id) => Ok(key_id.clone()),
+            None => Err(
+                Error::new(ErrorKind::ConfigInvalid, "application_key is 
empty")
+                    .with_operation("Builder::build")
+                    .with_context("service", Scheme::B2),
+            ),
+        }?;
+
+        let client = if let Some(client) = self.http_client.take() {
+            client
+        } else {
+            HttpClient::new().map_err(|err| {
+                err.with_operation("Builder::build")
+                    .with_context("service", Scheme::B2)
+            })?
+        };
+
+        let signer = B2Signer {
+            application_key_id,
+            application_key,
+            ..Default::default()
+        };
+
+        Ok(B2Backend {
+            core: Arc::new(B2Core {
+                signer: Arc::new(RwLock::new(signer)),
+                root,
+
+                bucket: self.config.bucket.clone(),
+                bucket_id: self.config.bucket_id.clone(),
+                client,
+            }),
+        })
+    }
+}
+
+/// Backend for b2 services.
+#[derive(Debug, Clone)]
+pub struct B2Backend {
+    core: Arc<B2Core>,
+}
+
+#[async_trait]
+impl Accessor for B2Backend {
+    type Reader = IncomingAsyncBody;
+
+    type BlockingReader = ();
+
+    type Writer = B2Writers;
+
+    type BlockingWriter = ();
+
+    type Lister = oio::PageLister<B2Lister>;
+
+    type BlockingLister = ();
+
+    fn info(&self) -> AccessorInfo {
+        let mut am = AccessorInfo::default();
+        am.set_scheme(Scheme::B2)
+            .set_root(&self.core.root)
+            .set_native_capability(Capability {
+                stat: true,
+
+                read: true,
+                read_can_next: true,
+                read_with_range: true,
+
+                write: true,
+                write_can_empty: true,
+                write_can_multi: true,
+                write_with_content_type: true,
+                // The min multipart size of b2 is 5 MiB.
+                //
+                // ref: 
<https://www.backblaze.com/docs/cloud-storage-large-files>
+                write_multi_min_size: Some(5 * 1024 * 1024),
+                // The max multipart size of b2 is 5 Gb.
+                //
+                // ref: 
<https://www.backblaze.com/docs/cloud-storage-large-files>
+                write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
+
+                create_dir: true,
+                delete: true,
+                copy: true,
+
+                list: true,
+                list_with_limit: true,
+                list_with_start_after: true,
+                list_with_recursive: true,
+                list_without_recursive: true,
+
+                presign: true,
+                presign_read: true,
+                presign_write: true,
+                presign_stat: true,
+
+                ..Default::default()
+            });
+
+        am
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let resp = self.core.download_file_by_name(path, &args).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+                let size = parse_content_length(resp.headers())?;
+                Ok((RpRead::new().with_size(size), resp.into_body()))
+            }
+            StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(), 
IncomingAsyncBody::empty())),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
+        let resp: http::Response<IncomingAsyncBody> = self
+            .core
+            .upload_file(path, Some(0), &OpWrite::default(), AsyncBody::Empty)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(RpCreateDir::default())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let writer = B2Writer::new(self.core.clone(), path, args);
+
+        let w = oio::MultipartUploadWriter::new(writer);
+
+        Ok((RpWrite::default(), w))
+    }
+
+    /// B2 have a get_file_info api required a file_id field, but field_id 
need call list api, list api also return file info
+    /// So we call list api to get file info
+    async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+        // Stat root always returns a DIR.
+        if path == "/" {
+            return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+        }
+
+        let delimiter = if path.ends_with('/') { Some("/") } else { None };
+        let resp = self
+            .core
+            .list_file_names(Some(path), delimiter, None, None)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let resp: ListFileNamesResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                if resp.files.is_empty() {
+                    return Err(Error::new(ErrorKind::NotFound, "no such file 
or directory"));
+                }
+                let meta = parse_file_info(&resp.files[0]);
+                Ok(RpStat::new(meta))
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
+        let resp = self
+            .core
+            .list_file_names(Some(from), None, None, None)
+            .await?;
+
+        let status = resp.status();
+
+        let source_file_id = match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let resp: ListFileNamesResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+                if resp.files.is_empty() {
+                    return Err(Error::new(ErrorKind::NotFound, "no such file 
or directory"));
+                }
+
+                let file_id = resp.files[0].clone().file_id;
+                Ok(file_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }?;
+
+        let Some(source_file_id) = source_file_id else {
+            return Err(Error::new(ErrorKind::IsADirectory, "is a directory"));
+        };
+
+        let resp = self.core.copy_file(source_file_id, to).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpCopy::default()),
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+        let resp = self.core.hide_file(path).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => Ok(RpDelete::default()),
+            _ => {
+                let err = parse_error(resp).await?;
+                match err.kind() {
+                    ErrorKind::NotFound => Ok(RpDelete::default()),
+                    // Representative deleted
+                    ErrorKind::AlreadyExists => Ok(RpDelete::default()),
+                    _ => Err(err),
+                }
+            }
+        }
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        Ok((
+            RpList::default(),
+            oio::PageLister::new(B2Lister::new(
+                self.core.clone(),
+                path,
+                args.recursive(),
+                args.limit(),
+                args.start_after(),
+            )),
+        ))
+    }
+
+    async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+        match args.operation() {
+            PresignOperation::Stat(_) => {
+                let resp = self
+                    .core
+                    .get_download_authorization(path, &OpRead::default(), 
args.expire())
+                    .await?;
+                let path = build_abs_path(&self.core.root, path);
+
+                let auth_info = self.core.get_auth_info().await?;
+
+                let url = format!(
+                    "{}/file/{}/{}?Authorization={}",
+                    auth_info.download_url, self.core.bucket, path, 
resp.authorization_token
+                );
+
+                let req = Request::get(url);
+
+                let req = req
+                    .body(AsyncBody::Empty)
+                    .map_err(new_request_build_error)?;
+
+                // We don't need this request anymore, consume
+                let (parts, _) = req.into_parts();
+
+                Ok(RpPresign::new(PresignedRequest::new(
+                    parts.method,
+                    parts.uri,
+                    parts.headers,
+                )))
+            }
+            PresignOperation::Read(op) => {
+                let resp = self
+                    .core
+                    .get_download_authorization(path, op, args.expire())
+                    .await?;
+                let path = build_abs_path(&self.core.root, path);
+
+                let auth_info = self.core.get_auth_info().await?;
+
+                let url = format!(
+                    "{}/file/{}/{}?Authorization={}",
+                    auth_info.download_url, self.core.bucket, path, 
resp.authorization_token
+                );
+
+                let req = Request::get(url);
+
+                let req = req
+                    .body(AsyncBody::Empty)
+                    .map_err(new_request_build_error)?;
+
+                // We don't need this request anymore, consume
+                let (parts, _) = req.into_parts();
+
+                Ok(RpPresign::new(PresignedRequest::new(
+                    parts.method,
+                    parts.uri,
+                    parts.headers,
+                )))
+            }
+            PresignOperation::Write(_) => {
+                let resp = self.core.get_upload_url().await?;
+
+                let mut req = Request::post(&resp.upload_url);
+
+                req = req.header(http::header::AUTHORIZATION, 
resp.authorization_token);
+                req = req.header("X-Bz-File-Name", 
build_abs_path(&self.core.root, path));
+                req = req.header(http::header::CONTENT_TYPE, "b2/x-auto");
+                req = req.header(constants::X_BZ_CONTENT_SHA1, 
"do_not_verify");
+
+                let req = req
+                    .body(AsyncBody::Empty)
+                    .map_err(new_request_build_error)?;
+                // We don't need this request anymore, consume it directly.
+                let (parts, _) = req.into_parts();
+
+                Ok(RpPresign::new(PresignedRequest::new(
+                    parts.method,
+                    parts.uri,
+                    parts.headers,
+                )))
+            }
+        }
+    }
+}
diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs
new file mode 100644
index 000000000..db706629a
--- /dev/null
+++ b/core/src/services/b2/core.rs
@@ -0,0 +1,706 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+use std::time::Duration;
+
+use chrono::{DateTime, Utc};
+use http::{header, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+use tokio::sync::RwLock;
+
+use crate::raw::*;
+use crate::services::b2::core::constants::X_BZ_PART_NUMBER;
+use crate::services::b2::error::parse_error;
+use crate::*;
+
+use self::constants::{X_BZ_CONTENT_SHA1, X_BZ_FILE_NAME};
+
+pub(super) mod constants {
+    pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
+    pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
+    pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
+}
+
+/// Core of [b2](https://www.backblaze.com/cloud-storage) services support.
+#[derive(Clone)]
+pub struct B2Core {
+    pub signer: Arc<RwLock<B2Signer>>,
+
+    /// The root of this core.
+    pub root: String,
+    /// The bucket name of this backend.
+    pub bucket: String,
+    /// The bucket id of this backend.
+    pub bucket_id: String,
+
+    pub client: HttpClient,
+}
+
+impl Debug for B2Core {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Backend")
+            .field("root", &self.root)
+            .field("bucket", &self.bucket)
+            .field("bucket_id", &self.bucket_id)
+            .finish_non_exhaustive()
+    }
+}
+
+impl B2Core {
+    #[inline]
+    pub async fn send(&self, req: Request<AsyncBody>) -> 
Result<Response<IncomingAsyncBody>> {
+        self.client.send(req).await
+    }
+
+    /// 
[b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account)
+    pub async fn get_auth_info(&self) -> Result<AuthInfo> {
+        {
+            let signer = self.signer.read().await;
+
+            if !signer.auth_info.authorization_token.is_empty()
+                && signer.auth_info.expires_in > Utc::now()
+            {
+                let auth_info = signer.auth_info.clone();
+                return Ok(auth_info);
+            }
+        }
+
+        {
+            let mut signer = self.signer.write().await;
+            let req = 
Request::get("https://api.backblazeb2.com/b2api/v3/b2_authorize_account";)
+                .header(
+                    header::AUTHORIZATION,
+                    format_authorization_by_basic(
+                        &signer.application_key_id,
+                        &signer.application_key,
+                    )?,
+                )
+                .body(AsyncBody::Empty)
+                .map_err(new_request_build_error)?;
+
+            let resp = self.client.send(req).await?;
+            let status = resp.status();
+
+            match status {
+                StatusCode::OK => {
+                    let resp_body = &resp.into_body().bytes().await?;
+                    let token = 
serde_json::from_slice::<AuthorizeAccountResponse>(resp_body)
+                        .map_err(new_json_deserialize_error)?;
+                    signer.auth_info = AuthInfo {
+                        authorization_token: token.authorization_token.clone(),
+                        api_url: token.api_info.storage_api.api_url.clone(),
+                        download_url: 
token.api_info.storage_api.download_url.clone(),
+                        // This authorization token is valid for at most 24 
hours.
+                        expires_in: Utc::now() + chrono::Duration::hours(20),
+                    };
+                }
+                _ => {
+                    return Err(parse_error(resp).await?);
+                }
+            }
+            Ok(signer.auth_info.clone())
+        }
+    }
+}
+
+impl B2Core {
+    pub async fn download_file_by_name(
+        &self,
+        path: &str,
+        args: &OpRead,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let auth_info = self.get_auth_info().await?;
+
+        // Construct headers to add to the request
+        let url = format!(
+            "{}/file/{}/{}",
+            auth_info.download_url,
+            self.bucket,
+            percent_encode_path(&path)
+        );
+
+        let mut req = Request::get(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let range = args.range();
+        if !range.is_full() {
+            req = req.header(http::header::RANGE, range.to_header());
+        }
+
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!(
+            "{}/b2api/v2/b2_get_upload_url?bucketId={}",
+            auth_info.api_url, self.bucket_id
+        );
+
+        let mut req = Request::get(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.send(req).await?;
+        let status = resp.status();
+        match status {
+            StatusCode::OK => {
+                let resp_body = &resp.into_body().bytes().await?;
+                let resp = 
serde_json::from_slice::<GetUploadUrlResponse>(resp_body)
+                    .map_err(new_json_deserialize_error)?;
+                Ok(resp)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn get_download_authorization(
+        &self,
+        path: &str,
+        args: &OpRead,
+        expire: Duration,
+    ) -> Result<GetDownloadAuthorizationResponse> {
+        let path = build_abs_path(&self.root, path);
+
+        let auth_info = self.get_auth_info().await?;
+
+        // Construct headers to add to the request
+        let url = format!(
+            "{}/b2api/v2/b2_get_download_authorization",
+            auth_info.api_url
+        );
+        let mut req = Request::post(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let range = args.range();
+        if !range.is_full() {
+            req = req.header(http::header::RANGE, range.to_header());
+        }
+        let body = GetDownloadAuthorizationRequest {
+            bucket_id: self.bucket_id.clone(),
+            file_name_prefix: path,
+            valid_duration_in_seconds: expire.as_secs(),
+        };
+        let body = 
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        let resp = self.send(req).await?;
+
+        let status = resp.status();
+        match status {
+            StatusCode::OK => {
+                let resp_body = &resp.into_body().bytes().await?;
+                let resp = 
serde_json::from_slice::<GetDownloadAuthorizationResponse>(resp_body)
+                    .map_err(new_json_deserialize_error)?;
+                Ok(resp)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn upload_file(
+        &self,
+        path: &str,
+        size: Option<u64>,
+        args: &OpWrite,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let resp = self.get_upload_url().await?;
+
+        let p = build_abs_path(&self.root, path);
+
+        let mut req = Request::post(resp.upload_url);
+
+        req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
+
+        req = req.header(header::AUTHORIZATION, resp.authorization_token);
+
+        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
+
+        if let Some(size) = size {
+            req = req.header(header::CONTENT_LENGTH, size.to_string())
+        }
+
+        if let Some(mime) = args.content_type() {
+            req = req.header(header::CONTENT_TYPE, mime)
+        } else {
+            req = req.header(header::CONTENT_TYPE, "b2/x-auto")
+        }
+
+        if let Some(pos) = args.content_disposition() {
+            req = req.header(header::CONTENT_DISPOSITION, pos)
+        }
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn start_large_file(
+        &self,
+        path: &str,
+        args: &OpWrite,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let p = build_abs_path(&self.root, path);
+
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!("{}/b2api/v2/b2_start_large_file", 
auth_info.api_url);
+
+        let mut req = Request::post(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let mut start_large_file_request = StartLargeFileRequest {
+            bucket_id: self.bucket_id.clone(),
+            file_name: percent_encode_path(&p),
+            content_type: "b2/x-auto".to_owned(),
+        };
+
+        if let Some(mime) = args.content_type() {
+            start_large_file_request.content_type = mime.to_owned();
+        }
+
+        let body =
+            
serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn get_upload_part_url(&self, file_id: &str) -> 
Result<GetUploadPartUrlResponse> {
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!(
+            "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
+            auth_info.api_url, file_id
+        );
+
+        let mut req = Request::get(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        let resp = self.send(req).await?;
+
+        let status = resp.status();
+        match status {
+            StatusCode::OK => {
+                let resp_body = &resp.into_body().bytes().await?;
+                let resp = 
serde_json::from_slice::<GetUploadPartUrlResponse>(resp_body)
+                    .map_err(new_json_deserialize_error)?;
+                Ok(resp)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    pub async fn upload_part(
+        &self,
+        file_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let resp = self.get_upload_part_url(file_id).await?;
+
+        let mut req = Request::post(resp.upload_url);
+
+        req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
+
+        req = req.header(header::CONTENT_LENGTH, size.to_string());
+
+        req = req.header(header::AUTHORIZATION, resp.authorization_token);
+
+        req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
+
+        // Set body
+        let req = req.body(body).map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn finish_large_file(
+        &self,
+        file_id: &str,
+        part_sha1_array: Vec<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!("{}/b2api/v2/b2_finish_large_file", 
auth_info.api_url);
+
+        let mut req = Request::post(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let body = serde_json::to_vec(&FinishLargeFileRequest {
+            file_id: file_id.to_owned(),
+            part_sha1_array,
+        })
+        .map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn cancel_large_file(&self, file_id: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!("{}/b2api/v2/b2_cancel_large_file", 
auth_info.api_url);
+
+        let mut req = Request::post(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let body = serde_json::to_vec(&CancelLargeFileRequest {
+            file_id: file_id.to_owned(),
+        })
+        .map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn list_file_names(
+        &self,
+        prefix: Option<&str>,
+        delimiter: Option<&str>,
+        limit: Option<usize>,
+        start_after: Option<String>,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let auth_info = self.get_auth_info().await?;
+
+        let mut url = format!(
+            "{}/b2api/v2/b2_list_file_names?bucketId={}",
+            auth_info.api_url, self.bucket_id
+        );
+
+        if let Some(prefix) = prefix {
+            let prefix = build_abs_path(&self.root, prefix);
+            url.push_str(&format!("&prefix={}", percent_encode_path(&prefix)));
+        }
+
+        if let Some(limit) = limit {
+            url.push_str(&format!("&maxFileCount={}", limit));
+        }
+
+        if let Some(start_after) = start_after {
+            let start_after = build_abs_path(&self.root, &start_after);
+            url.push_str(&format!(
+                "&startFileName={}",
+                percent_encode_path(&start_after)
+            ));
+        }
+
+        if let Some(delimiter) = delimiter {
+            url.push_str(&format!("&delimiter={}", delimiter));
+        }
+
+        let mut req = Request::get(&url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Empty)
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn copy_file(
+        &self,
+        source_file_id: String,
+        to: &str,
+    ) -> Result<Response<IncomingAsyncBody>> {
+        let to = build_abs_path(&self.root, to);
+
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
+
+        let mut req = Request::post(url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let body = CopyFileRequest {
+            source_file_id,
+            file_name: to,
+        };
+
+        let body = 
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+
+    pub async fn hide_file(&self, path: &str) -> 
Result<Response<IncomingAsyncBody>> {
+        let path = build_abs_path(&self.root, path);
+
+        let auth_info = self.get_auth_info().await?;
+
+        let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
+
+        let mut req = Request::post(url);
+
+        req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+        let body = HideFileRequest {
+            bucket_id: self.bucket_id.clone(),
+            file_name: path.to_string(),
+        };
+
+        let body = 
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+        let body = bytes::Bytes::from(body);
+
+        // Set body
+        let req = req
+            .body(AsyncBody::Bytes(body))
+            .map_err(new_request_build_error)?;
+
+        self.send(req).await
+    }
+}
+
+#[derive(Clone)]
+pub struct B2Signer {
+    /// The application_key_id of this core.
+    pub application_key_id: String,
+    /// The application_key of this core.
+    pub application_key: String,
+
+    pub auth_info: AuthInfo,
+}
+
+#[derive(Clone)]
+pub struct AuthInfo {
+    pub authorization_token: String,
+    /// The base URL to use for all API calls except for uploading and 
downloading files.
+    pub api_url: String,
+    /// The base URL to use for downloading files.
+    pub download_url: String,
+
+    pub expires_in: DateTime<Utc>,
+}
+
+impl Default for B2Signer {
+    fn default() -> Self {
+        B2Signer {
+            application_key: String::new(),
+            application_key_id: String::new(),
+
+            auth_info: AuthInfo {
+                authorization_token: String::new(),
+                api_url: String::new(),
+                download_url: String::new(),
+                expires_in: DateTime::<Utc>::MIN_UTC,
+            },
+        }
+    }
+}
+
+/// Request of 
[b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StartLargeFileRequest {
+    pub bucket_id: String,
+    pub file_name: String,
+    pub content_type: String,
+}
+
+/// Response of 
[b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StartLargeFileResponse {
+    pub file_id: String,
+}
+
+/// Response of 
[b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct AuthorizeAccountResponse {
+    /// An authorization token to use with all calls, other than 
b2_authorize_account, that need an Authorization header. This authorization 
token is valid for at most 24 hours.
+    /// So we should call b2_authorize_account every 24 hours.
+    pub authorization_token: String,
+    pub api_info: ApiInfo,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ApiInfo {
+    pub storage_api: StorageApi,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StorageApi {
+    pub api_url: String,
+    pub download_url: String,
+}
+
+/// Response of 
[b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-url).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetUploadUrlResponse {
+    /// The authorizationToken that must be used when uploading files to this 
bucket.
+    /// This token is valid for 24 hours or until the uploadUrl endpoint 
rejects an upload, see b2_upload_file
+    pub authorization_token: String,
+    pub upload_url: String,
+}
+
+/// Response of 
[b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-part-url).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetUploadPartUrlResponse {
+    /// The authorizationToken that must be used when uploading files to this 
bucket.
+    /// This token is valid for 24 hours or until the uploadUrl endpoint 
rejects an upload, see b2_upload_file
+    pub authorization_token: String,
+    pub upload_url: String,
+}
+
+/// Response of 
[b2_upload_part](https://www.backblaze.com/apidocs/b2-upload-part).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UploadPartResponse {
+    pub content_sha1: String,
+}
+
+/// Response of 
[b2_finish_large_file](https://www.backblaze.com/apidocs/b2-finish-large-file).
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct FinishLargeFileRequest {
+    pub file_id: String,
+    pub part_sha1_array: Vec<String>,
+}
+
+/// Response of 
[b2_cancel_large_file](https://www.backblaze.com/apidocs/b2-cancel-large-file).
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CancelLargeFileRequest {
+    pub file_id: String,
+}
+
+/// Response of 
[list_file_names](https://www.backblaze.com/apidocs/b2-list-file-names).
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListFileNamesResponse {
+    pub files: Vec<File>,
+    pub next_file_name: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct File {
+    pub file_id: Option<String>,
+    pub content_length: u64,
+    pub content_md5: Option<String>,
+    pub content_type: Option<String>,
+    pub file_name: String,
+    pub action: String,
+}
+
+pub(super) fn parse_file_info(file: &File) -> Metadata {
+    if file.file_name.ends_with('/') {
+        return Metadata::new(EntryMode::DIR);
+    }
+
+    let mut metadata = Metadata::new(EntryMode::FILE);
+
+    metadata.set_content_length(file.content_length);
+
+    if let Some(content_md5) = &file.content_md5 {
+        metadata.set_content_md5(content_md5);
+    }
+
+    if let Some(content_type) = &file.content_type {
+        metadata.set_content_type(content_type);
+    }
+
+    metadata
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CopyFileRequest {
+    pub source_file_id: String,
+    pub file_name: String,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct HideFileRequest {
+    pub bucket_id: String,
+    pub file_name: String,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetDownloadAuthorizationRequest {
+    pub bucket_id: String,
+    pub file_name_prefix: String,
+    pub valid_duration_in_seconds: u64,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetDownloadAuthorizationResponse {
+    pub authorization_token: String,
+}
diff --git a/core/src/services/b2/docs.md b/core/src/services/b2/docs.md
new file mode 100644
index 000000000..577bc82ff
--- /dev/null
+++ b/core/src/services/b2/docs.md
@@ -0,0 +1,56 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [x] copy
+- [ ] rename
+- [x] list
+- [x] scan
+- [x] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `key_id`: B2 application key keyID
+- `application_key` B2 application key applicationKey
+- `bucket` B2 bucket name
+- `bucket_id` B2 bucket_id
+
+You can refer to [`B2Builder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::B2;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    // create backend builder
+    let mut builder = B2::default();
+
+    // set the storage bucket for OpenDAL
+    builder.root("/");
+    // set the key_id for OpenDAL
+    builder.key_id("xxxxxxxxxx");
+    // set the key_id for OpenDAL
+    builder.application_key("xxxxxxxxxx");
+    // set the bucket name for OpenDAL
+    builder.bucket("opendal");
+    // set the bucket_id for OpenDAL
+    builder.bucket_id("xxxxxxxxxxxxx");
+
+    let op: Operator = Operator::new(builder)?.finish();
+
+    Ok(())
+}
+```
diff --git a/core/src/services/b2/error.rs b/core/src/services/b2/error.rs
new file mode 100644
index 000000000..2a3ead791
--- /dev/null
+++ b/core/src/services/b2/error.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// the error response of b2
+#[derive(Default, Debug, Deserialize)]
+#[allow(dead_code)]
+struct B2Error {
+    status: u32,
+    code: String,
+    message: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+    let (parts, body) = resp.into_parts();
+    let bs = body.bytes().await?;
+
+    let (mut kind, mut retryable) = match parts.status.as_u16() {
+        403 => (ErrorKind::PermissionDenied, false),
+        404 => (ErrorKind::NotFound, false),
+        304 | 412 => (ErrorKind::ConditionNotMatch, false),
+        // Service b2 could return 403, show the authorization error
+        401 => (ErrorKind::PermissionDenied, true),
+        500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
+        _ => (ErrorKind::Unexpected, false),
+    };
+
+    let (message, b2_err) = serde_json::from_reader::<_, 
B2Error>(bs.clone().reader())
+        .map(|b2_err| (format!("{b2_err:?}"), Some(b2_err)))
+        .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+    if let Some(b2_err) = b2_err {
+        (kind, retryable) = 
parse_b2_error_code(b2_err.code.as_str()).unwrap_or((kind, retryable));
+    };
+
+    let mut err = Error::new(kind, &message);
+
+    err = with_error_response_context(err, parts);
+
+    if retryable {
+        err = err.set_temporary();
+    }
+
+    Ok(err)
+}
+
+/// Returns the `Error kind` of this code and whether the error is retryable.
+pub fn parse_b2_error_code(code: &str) -> Option<(ErrorKind, bool)> {
+    match code {
+        "already_hidden" => Some((ErrorKind::AlreadyExists, false)),
+        "no_such_file" => Some((ErrorKind::NotFound, false)),
+        _ => None,
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use futures::stream;
+    use http::StatusCode;
+
+    use super::*;
+
+    #[test]
+    fn test_parse_b2_error_code() {
+        let code = "already_hidden";
+        assert_eq!(
+            parse_b2_error_code(code),
+            Some((crate::ErrorKind::AlreadyExists, false))
+        );
+
+        let code = "no_such_file";
+        assert_eq!(
+            parse_b2_error_code(code),
+            Some((crate::ErrorKind::NotFound, false))
+        );
+
+        let code = "not_found";
+        assert_eq!(parse_b2_error_code(code), None);
+    }
+
+    #[tokio::test]
+    async fn test_parse_error() {
+        let err_res = vec![
+            (
+                r#"{"status": 403, "code": "access_denied", "message":"The 
provided customer-managed encryption key is wrong."}"#,
+                ErrorKind::PermissionDenied,
+                StatusCode::FORBIDDEN,
+            ),
+            (
+                r#"{"status": 404, "code": "not_found", "message":"File is not 
in B2 Cloud Storage."}"#,
+                ErrorKind::NotFound,
+                StatusCode::NOT_FOUND,
+            ),
+            (
+                r#"{"status": 401, "code": "bad_auth_token", "message":"The 
auth token used is not valid. Call b2_authorize_account again to either get a 
new one, or an error message describing the problem."}"#,
+                ErrorKind::PermissionDenied,
+                StatusCode::UNAUTHORIZED,
+            ),
+        ];
+
+        for res in err_res {
+            let bs = bytes::Bytes::from(res.0);
+            let body = IncomingAsyncBody::new(
+                Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+                None,
+            );
+            let resp = Response::builder().status(res.2).body(body).unwrap();
+
+            let err = parse_error(resp).await;
+
+            assert!(err.is_ok());
+            assert_eq!(err.unwrap().kind(), res.1);
+        }
+    }
+}
diff --git a/core/src/services/b2/lister.rs b/core/src/services/b2/lister.rs
new file mode 100644
index 000000000..4f084537a
--- /dev/null
+++ b/core/src/services/b2/lister.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Buf;
+
+use super::core::{parse_file_info, B2Core, ListFileNamesResponse};
+
+use crate::raw::*;
+use crate::services::b2::error::parse_error;
+use crate::*;
+
+pub struct B2Lister {
+    core: Arc<B2Core>,
+
+    path: String,
+    delimiter: Option<&'static str>,
+    limit: Option<usize>,
+
+    /// B2 starts listing **after** this specified key
+    start_after: Option<String>,
+}
+
+impl B2Lister {
+    pub fn new(
+        core: Arc<B2Core>,
+        path: &str,
+        recursive: bool,
+        limit: Option<usize>,
+        start_after: Option<&str>,
+    ) -> Self {
+        let delimiter = if recursive { None } else { Some("/") };
+        Self {
+            core,
+
+            path: path.to_string(),
+            delimiter,
+            limit,
+            start_after: start_after.map(String::from),
+        }
+    }
+}
+
+#[async_trait]
+impl oio::PageList for B2Lister {
+    async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+        let resp = self
+            .core
+            .list_file_names(
+                Some(&self.path),
+                self.delimiter,
+                self.limit,
+                self.start_after.clone(),
+            )
+            .await?;
+
+        if resp.status() != http::StatusCode::OK {
+            return Err(parse_error(resp).await?);
+        }
+
+        let bs = resp.into_body().bytes().await?;
+
+        let output: ListFileNamesResponse =
+            
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+
+        ctx.done = output.next_file_name.is_none();
+
+        for file in output.files {
+            if let Some(start_after) = self.start_after.clone() {
+                if build_abs_path(&self.core.root, &start_after) == 
file.file_name {
+                    continue;
+                }
+            }
+            if file.file_name == build_abs_path(&self.core.root, &self.path) {
+                continue;
+            }
+            let file_name = file.file_name.clone();
+            let metadata = parse_file_info(&file);
+
+            ctx.entries.push_back(oio::Entry::new(
+                &build_rel_path(&self.core.root, &file_name),
+                metadata,
+            ))
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/src/services/b2/mod.rs b/core/src/services/b2/mod.rs
new file mode 100644
index 000000000..80745f01e
--- /dev/null
+++ b/core/src/services/b2/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::B2Builder as B2;
+pub use backend::B2Config;
+
+mod core;
+mod error;
+mod lister;
+mod writer;
diff --git a/core/src/services/b2/writer.rs b/core/src/services/b2/writer.rs
new file mode 100644
index 000000000..d4ed419ff
--- /dev/null
+++ b/core/src/services/b2/writer.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{B2Core, StartLargeFileResponse, UploadPartResponse};
+use super::error::parse_error;
+
+pub type B2Writers = oio::MultipartUploadWriter<B2Writer>;
+
+pub struct B2Writer {
+    core: Arc<B2Core>,
+
+    op: OpWrite,
+    path: String,
+}
+
+impl B2Writer {
+    pub fn new(core: Arc<B2Core>, path: &str, op: OpWrite) -> Self {
+        B2Writer {
+            core,
+            path: path.to_string(),
+            op,
+        }
+    }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for B2Writer {
+    async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+        let resp = self
+            .core
+            .upload_file(&self.path, Some(size), &self.op, body)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn initiate_part(&self) -> Result<String> {
+        let resp = self.core.start_large_file(&self.path, &self.op).await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let result: StartLargeFileResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+
+                Ok(result.file_id)
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn write_part(
+        &self,
+        upload_id: &str,
+        part_number: usize,
+        size: u64,
+        body: AsyncBody,
+    ) -> Result<oio::MultipartUploadPart> {
+        // B2 requires part number must between [1..=10000]
+        let part_number = part_number + 1;
+
+        let resp = self
+            .core
+            .upload_part(upload_id, part_number, size, body)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                let bs = resp.into_body().bytes().await?;
+
+                let result: UploadPartResponse =
+                    
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+
+                Ok(oio::MultipartUploadPart {
+                    etag: result.content_sha1,
+                    part_number,
+                })
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn complete_part(
+        &self,
+        upload_id: &str,
+        parts: &[oio::MultipartUploadPart],
+    ) -> Result<()> {
+        let part_sha1_array = parts
+            .iter()
+            .map(|p| {
+                let binding = p.etag.clone();
+                let sha1 = binding.strip_prefix("unverified:");
+                let Some(sha1) = sha1 else {
+                    return "".to_string();
+                };
+                sha1.to_string()
+            })
+            .collect();
+
+        let resp = self
+            .core
+            .finish_large_file(upload_id, part_sha1_array)
+            .await?;
+
+        let status = resp.status();
+
+        match status {
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+
+    async fn abort_part(&self, upload_id: &str) -> Result<()> {
+        let resp = self.core.cancel_large_file(upload_id).await?;
+        match resp.status() {
+            // b2 returns code 200 if abort succeeds.
+            StatusCode::OK => {
+                resp.into_body().consume().await?;
+                Ok(())
+            }
+            _ => Err(parse_error(resp).await?),
+        }
+    }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 9ba0246ca..947274a63 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -289,3 +289,10 @@ mod alluxio;
 pub use alluxio::Alluxio;
 #[cfg(feature = "services-alluxio")]
 pub use alluxio::AlluxioConfig;
+
+#[cfg(feature = "services-b2")]
+mod b2;
+#[cfg(feature = "services-b2")]
+pub use b2::B2Config;
+#[cfg(feature = "services-b2")]
+pub use b2::B2;
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index f926bd570..278a08e11 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -163,6 +163,8 @@ impl Operator {
             Scheme::Azdls => Self::from_map::<services::Azdls>(map)?.finish(),
             #[cfg(feature = "services-azfile")]
             Scheme::Azfile => 
Self::from_map::<services::Azfile>(map)?.finish(),
+            #[cfg(feature = "services-b2")]
+            Scheme::B2 => Self::from_map::<services::B2>(map)?.finish(),
             #[cfg(feature = "services-cacache")]
             Scheme::Cacache => 
Self::from_map::<services::Cacache>(map)?.finish(),
             #[cfg(feature = "services-cos")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 54ea475f8..57093f510 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -38,6 +38,8 @@ pub enum Scheme {
     Azblob,
     /// [Azdls][crate::services::Azdls]: Azure Data Lake Storage Gen2.
     Azdls,
+    /// [B2][crate::services::B2]: Backblaze B2 Services.
+    B2,
     /// [cacache][crate::services::Cacache]: cacache backend support.
     Cacache,
     /// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services.
@@ -173,6 +175,8 @@ impl Scheme {
             Scheme::Azdls,
             #[cfg(feature = "services-azfile")]
             Scheme::Azfile,
+            #[cfg(feature = "services-b2")]
+            Scheme::B2,
             #[cfg(feature = "services-cacache")]
             Scheme::Cacache,
             #[cfg(feature = "services-cos")]
@@ -283,6 +287,7 @@ impl FromStr for Scheme {
             // OpenDAL used to call `azdls` as `azdfs`, we keep it for 
backward compatibility.
             // And abfs is widely used in hadoop ecosystem, keep it for easy 
to use.
             "azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls),
+            "b2" => Ok(Scheme::B2),
             "cacache" => Ok(Scheme::Cacache),
             "cloudflare_kv" => Ok(Scheme::CloudflareKv),
             "cos" => Ok(Scheme::Cos),
@@ -338,6 +343,7 @@ impl From<Scheme> for &'static str {
             Scheme::Atomicserver => "atomicserver",
             Scheme::Azblob => "azblob",
             Scheme::Azdls => "azdls",
+            Scheme::B2 => "b2",
             Scheme::Cacache => "cacache",
             Scheme::CloudflareKv => "cloudflare_kv",
             Scheme::Cos => "cos",

Reply via email to