This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 26b52cc17 feat(service): support b2 (#3604)
26b52cc17 is described below
commit 26b52cc17fc2c1bb080c131b5e3d2b1b78081359
Author: hoslo <[email protected]>
AuthorDate: Tue Nov 21 17:28:36 2023 +0800
feat(service): support b2 (#3604)
---
.env.example | 6 +
core/Cargo.toml | 1 +
core/src/services/b2/backend.rs | 554 +++++++++++++++++++++++++++++
core/src/services/b2/core.rs | 706 +++++++++++++++++++++++++++++++++++++
core/src/services/b2/docs.md | 56 +++
core/src/services/b2/error.rs | 138 ++++++++
core/src/services/b2/lister.rs | 104 ++++++
core/src/services/b2/mod.rs | 25 ++
core/src/services/b2/writer.rs | 163 +++++++++
core/src/services/mod.rs | 7 +
core/src/types/operator/builder.rs | 2 +
core/src/types/scheme.rs | 6 +
12 files changed, 1768 insertions(+)
diff --git a/.env.example b/.env.example
index 7f11ebf03..450c734f8 100644
--- a/.env.example
+++ b/.env.example
@@ -169,3 +169,9 @@ OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
# alluxio
OPENDAL_ALLUXIO_ENDPOINT=<endpoint>
OPENDAL_ALLUXIO_ROOT=/path/to/dor
+# b2
+OPENDAL_B2_ROOT=/path/to/dir
+OPENDAL_B2_BUCKET=<bucket>
+OPENDAL_B2_BUCKET_ID=<bucket_id>
+OPENDAL_B2_APPLICATION_KEY_ID=<key_id>
+OPENDAL_B2_APPLICATION_KEY=<application_key>
\ No newline at end of file
diff --git a/core/Cargo.toml b/core/Cargo.toml
index f5e261b00..d4833ecaa 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -125,6 +125,7 @@ services-azdls = [
"reqsign?/reqwest_request",
]
services-azfile = []
+services-b2 = []
services-cacache = ["dep:cacache"]
services-cloudflare-kv = []
services-cos = [
diff --git a/core/src/services/b2/backend.rs b/core/src/services/b2/backend.rs
new file mode 100644
index 000000000..0c09d55fc
--- /dev/null
+++ b/core/src/services/b2/backend.rs
@@ -0,0 +1,554 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::Request;
+use http::StatusCode;
+use log::debug;
+use serde::Deserialize;
+use tokio::sync::RwLock;
+
+use crate::raw::*;
+use crate::services::b2::core::B2Signer;
+use crate::services::b2::core::ListFileNamesResponse;
+use crate::*;
+
+use super::core::constants;
+use super::core::parse_file_info;
+use super::core::B2Core;
+use super::error::parse_error;
+use super::lister::B2Lister;
+use super::writer::B2Writer;
+use super::writer::B2Writers;
+
+/// Config for backblaze b2 services support.
+#[derive(Default, Deserialize)]
+#[serde(default)]
+#[non_exhaustive]
+pub struct B2Config {
+ /// root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub root: Option<String>,
+ /// keyID of this backend.
+ ///
+ /// - If application_key_id is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub application_key_id: Option<String>,
+ /// applicationKey of this backend.
+ ///
+ /// - If application_key is set, we will take user's input first.
+ /// - If not, we will try to load it from environment.
+ pub application_key: Option<String>,
+ /// bucket of this backend.
+ ///
+ /// required.
+ pub bucket: String,
+ /// bucket id of this backend.
+ ///
+ /// required.
+ pub bucket_id: String,
+}
+
+impl Debug for B2Config {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("B2Config");
+
+ d.field("root", &self.root)
+ .field("application_key_id", &self.application_key_id)
+ .field("bucket_id", &self.bucket_id)
+ .field("bucket", &self.bucket);
+
+ d.finish_non_exhaustive()
+ }
+}
+
+/// [b2](https://www.backblaze.com/cloud-storage) services support.
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct B2Builder {
+ config: B2Config,
+
+ http_client: Option<HttpClient>,
+}
+
+impl Debug for B2Builder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut d = f.debug_struct("B2Builder");
+
+ d.field("config", &self.config);
+ d.finish_non_exhaustive()
+ }
+}
+
+impl B2Builder {
+ /// Set root of this backend.
+ ///
+ /// All operations will happen under this root.
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ self.config.root = if root.is_empty() {
+ None
+ } else {
+ Some(root.to_string())
+ };
+
+ self
+ }
+
+ /// application_key_id of this backend.
+ pub fn application_key_id(&mut self, application_key_id: &str) -> &mut
Self {
+ self.config.application_key_id = if application_key_id.is_empty() {
+ None
+ } else {
+ Some(application_key_id.to_string())
+ };
+
+ self
+ }
+
+ /// application_key of this backend.
+ pub fn application_key(&mut self, application_key: &str) -> &mut Self {
+ self.config.application_key = if application_key.is_empty() {
+ None
+ } else {
+ Some(application_key.to_string())
+ };
+
+ self
+ }
+
+ /// Set bucket name of this backend.
+ /// You can find it in https://secure.backblaze.com/b2_buckets.html
+ pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+ self.config.bucket = bucket.to_string();
+
+ self
+ }
+
+ /// Set bucket id of this backend.
+ /// You can find it in https://secure.backblaze.com/b2_buckets.html
+ pub fn bucket_id(&mut self, bucket_id: &str) -> &mut Self {
+ self.config.bucket_id = bucket_id.to_string();
+
+ self
+ }
+
+ /// Specify the http client that used by this service.
+ ///
+ /// # Notes
+ ///
+ /// This API is part of OpenDAL's Raw API. `HttpClient` could be changed
+ /// during minor updates.
+ pub fn http_client(&mut self, client: HttpClient) -> &mut Self {
+ self.http_client = Some(client);
+ self
+ }
+}
+
+impl Builder for B2Builder {
+ const SCHEME: Scheme = Scheme::B2;
+ type Accessor = B2Backend;
+
+ /// Converts a HashMap into an B2Builder instance.
+ ///
+ /// # Arguments
+ ///
+ /// * `map` - A HashMap containing the configuration values.
+ ///
+ /// # Returns
+ ///
+ /// Returns an instance of B2Builder.
+ fn from_map(map: HashMap<String, String>) -> Self {
+ // Deserialize the configuration from the HashMap.
+ let config = B2Config::deserialize(ConfigDeserializer::new(map))
+ .expect("config deserialize must succeed");
+
+ // Create an B2Builder instance with the deserialized config.
+ B2Builder {
+ config,
+ http_client: None,
+ }
+ }
+
+ /// Builds the backend and returns the result of B2Backend.
+ fn build(&mut self) -> Result<Self::Accessor> {
+ debug!("backend build started: {:?}", &self);
+
+ let root =
normalize_root(&self.config.root.clone().unwrap_or_default());
+ debug!("backend use root {}", &root);
+
+ // Handle bucket.
+ if self.config.bucket.is_empty() {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "bucket is empty")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::B2));
+ }
+
+ debug!("backend use bucket {}", &self.config.bucket);
+
+ // Handle bucket_id.
+ if self.config.bucket_id.is_empty() {
+ return Err(Error::new(ErrorKind::ConfigInvalid, "bucket_id is
empty")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::B2));
+ }
+
+ debug!("backend bucket_id {}", &self.config.bucket_id);
+
+ let application_key_id = match &self.config.application_key_id {
+ Some(application_key_id) => Ok(application_key_id.clone()),
+ None => Err(
+ Error::new(ErrorKind::ConfigInvalid, "application_key_id is
empty")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::B2),
+ ),
+ }?;
+
+ let application_key = match &self.config.application_key {
+ Some(key_id) => Ok(key_id.clone()),
+ None => Err(
+ Error::new(ErrorKind::ConfigInvalid, "application_key is
empty")
+ .with_operation("Builder::build")
+ .with_context("service", Scheme::B2),
+ ),
+ }?;
+
+ let client = if let Some(client) = self.http_client.take() {
+ client
+ } else {
+ HttpClient::new().map_err(|err| {
+ err.with_operation("Builder::build")
+ .with_context("service", Scheme::B2)
+ })?
+ };
+
+ let signer = B2Signer {
+ application_key_id,
+ application_key,
+ ..Default::default()
+ };
+
+ Ok(B2Backend {
+ core: Arc::new(B2Core {
+ signer: Arc::new(RwLock::new(signer)),
+ root,
+
+ bucket: self.config.bucket.clone(),
+ bucket_id: self.config.bucket_id.clone(),
+ client,
+ }),
+ })
+ }
+}
+
+/// Backend for b2 services.
+#[derive(Debug, Clone)]
+pub struct B2Backend {
+ core: Arc<B2Core>,
+}
+
+#[async_trait]
+impl Accessor for B2Backend {
+ type Reader = IncomingAsyncBody;
+
+ type BlockingReader = ();
+
+ type Writer = B2Writers;
+
+ type BlockingWriter = ();
+
+ type Lister = oio::PageLister<B2Lister>;
+
+ type BlockingLister = ();
+
+ fn info(&self) -> AccessorInfo {
+ let mut am = AccessorInfo::default();
+ am.set_scheme(Scheme::B2)
+ .set_root(&self.core.root)
+ .set_native_capability(Capability {
+ stat: true,
+
+ read: true,
+ read_can_next: true,
+ read_with_range: true,
+
+ write: true,
+ write_can_empty: true,
+ write_can_multi: true,
+ write_with_content_type: true,
+ // The min multipart size of b2 is 5 MiB.
+ //
+ // ref:
<https://www.backblaze.com/docs/cloud-storage-large-files>
+ write_multi_min_size: Some(5 * 1024 * 1024),
+ // The max multipart size of b2 is 5 Gb.
+ //
+ // ref:
<https://www.backblaze.com/docs/cloud-storage-large-files>
+ write_multi_max_size: Some(5 * 1024 * 1024 * 1024),
+
+ create_dir: true,
+ delete: true,
+ copy: true,
+
+ list: true,
+ list_with_limit: true,
+ list_with_start_after: true,
+ list_with_recursive: true,
+ list_without_recursive: true,
+
+ presign: true,
+ presign_read: true,
+ presign_write: true,
+ presign_stat: true,
+
+ ..Default::default()
+ });
+
+ am
+ }
+
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let resp = self.core.download_file_by_name(path, &args).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK | StatusCode::PARTIAL_CONTENT => {
+ let size = parse_content_length(resp.headers())?;
+ Ok((RpRead::new().with_size(size), resp.into_body()))
+ }
+ StatusCode::RANGE_NOT_SATISFIABLE => Ok((RpRead::new(),
IncomingAsyncBody::empty())),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn create_dir(&self, path: &str, _: OpCreateDir) ->
Result<RpCreateDir> {
+ let resp: http::Response<IncomingAsyncBody> = self
+ .core
+ .upload_file(path, Some(0), &OpWrite::default(), AsyncBody::Empty)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(RpCreateDir::default())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let writer = B2Writer::new(self.core.clone(), path, args);
+
+ let w = oio::MultipartUploadWriter::new(writer);
+
+ Ok((RpWrite::default(), w))
+ }
+
+ /// B2 have a get_file_info api required a file_id field, but field_id
need call list api, list api also return file info
+ /// So we call list api to get file info
+ async fn stat(&self, path: &str, _args: OpStat) -> Result<RpStat> {
+ // Stat root always returns a DIR.
+ if path == "/" {
+ return Ok(RpStat::new(Metadata::new(EntryMode::DIR)));
+ }
+
+ let delimiter = if path.ends_with('/') { Some("/") } else { None };
+ let resp = self
+ .core
+ .list_file_names(Some(path), delimiter, None, None)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let resp: ListFileNamesResponse =
+
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+ if resp.files.is_empty() {
+ return Err(Error::new(ErrorKind::NotFound, "no such file
or directory"));
+ }
+ let meta = parse_file_info(&resp.files[0]);
+ Ok(RpStat::new(meta))
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn copy(&self, from: &str, to: &str, _args: OpCopy) ->
Result<RpCopy> {
+ let resp = self
+ .core
+ .list_file_names(Some(from), None, None, None)
+ .await?;
+
+ let status = resp.status();
+
+ let source_file_id = match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let resp: ListFileNamesResponse =
+
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+ if resp.files.is_empty() {
+ return Err(Error::new(ErrorKind::NotFound, "no such file
or directory"));
+ }
+
+ let file_id = resp.files[0].clone().file_id;
+ Ok(file_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }?;
+
+ let Some(source_file_id) = source_file_id else {
+ return Err(Error::new(ErrorKind::IsADirectory, "is a directory"));
+ };
+
+ let resp = self.core.copy_file(source_file_id, to).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(RpCopy::default()),
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn delete(&self, path: &str, _: OpDelete) -> Result<RpDelete> {
+ let resp = self.core.hide_file(path).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => Ok(RpDelete::default()),
+ _ => {
+ let err = parse_error(resp).await?;
+ match err.kind() {
+ ErrorKind::NotFound => Ok(RpDelete::default()),
+ // Representative deleted
+ ErrorKind::AlreadyExists => Ok(RpDelete::default()),
+ _ => Err(err),
+ }
+ }
+ }
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ Ok((
+ RpList::default(),
+ oio::PageLister::new(B2Lister::new(
+ self.core.clone(),
+ path,
+ args.recursive(),
+ args.limit(),
+ args.start_after(),
+ )),
+ ))
+ }
+
+ async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
+ match args.operation() {
+ PresignOperation::Stat(_) => {
+ let resp = self
+ .core
+ .get_download_authorization(path, &OpRead::default(),
args.expire())
+ .await?;
+ let path = build_abs_path(&self.core.root, path);
+
+ let auth_info = self.core.get_auth_info().await?;
+
+ let url = format!(
+ "{}/file/{}/{}?Authorization={}",
+ auth_info.download_url, self.core.bucket, path,
resp.authorization_token
+ );
+
+ let req = Request::get(url);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ // We don't need this request anymore, consume
+ let (parts, _) = req.into_parts();
+
+ Ok(RpPresign::new(PresignedRequest::new(
+ parts.method,
+ parts.uri,
+ parts.headers,
+ )))
+ }
+ PresignOperation::Read(op) => {
+ let resp = self
+ .core
+ .get_download_authorization(path, op, args.expire())
+ .await?;
+ let path = build_abs_path(&self.core.root, path);
+
+ let auth_info = self.core.get_auth_info().await?;
+
+ let url = format!(
+ "{}/file/{}/{}?Authorization={}",
+ auth_info.download_url, self.core.bucket, path,
resp.authorization_token
+ );
+
+ let req = Request::get(url);
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ // We don't need this request anymore, consume
+ let (parts, _) = req.into_parts();
+
+ Ok(RpPresign::new(PresignedRequest::new(
+ parts.method,
+ parts.uri,
+ parts.headers,
+ )))
+ }
+ PresignOperation::Write(_) => {
+ let resp = self.core.get_upload_url().await?;
+
+ let mut req = Request::post(&resp.upload_url);
+
+ req = req.header(http::header::AUTHORIZATION,
resp.authorization_token);
+ req = req.header("X-Bz-File-Name",
build_abs_path(&self.core.root, path));
+ req = req.header(http::header::CONTENT_TYPE, "b2/x-auto");
+ req = req.header(constants::X_BZ_CONTENT_SHA1,
"do_not_verify");
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+ // We don't need this request anymore, consume it directly.
+ let (parts, _) = req.into_parts();
+
+ Ok(RpPresign::new(PresignedRequest::new(
+ parts.method,
+ parts.uri,
+ parts.headers,
+ )))
+ }
+ }
+ }
+}
diff --git a/core/src/services/b2/core.rs b/core/src/services/b2/core.rs
new file mode 100644
index 000000000..db706629a
--- /dev/null
+++ b/core/src/services/b2/core.rs
@@ -0,0 +1,706 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+use std::time::Duration;
+
+use chrono::{DateTime, Utc};
+use http::{header, Request, Response, StatusCode};
+use serde::{Deserialize, Serialize};
+use tokio::sync::RwLock;
+
+use crate::raw::*;
+use crate::services::b2::core::constants::X_BZ_PART_NUMBER;
+use crate::services::b2::error::parse_error;
+use crate::*;
+
+use self::constants::{X_BZ_CONTENT_SHA1, X_BZ_FILE_NAME};
+
+pub(super) mod constants {
+ pub const X_BZ_FILE_NAME: &str = "X-Bz-File-Name";
+ pub const X_BZ_CONTENT_SHA1: &str = "X-Bz-Content-Sha1";
+ pub const X_BZ_PART_NUMBER: &str = "X-Bz-Part-Number";
+}
+
+/// Core of [b2](https://www.backblaze.com/cloud-storage) services support.
+#[derive(Clone)]
+pub struct B2Core {
+ pub signer: Arc<RwLock<B2Signer>>,
+
+ /// The root of this core.
+ pub root: String,
+ /// The bucket name of this backend.
+ pub bucket: String,
+ /// The bucket id of this backend.
+ pub bucket_id: String,
+
+ pub client: HttpClient,
+}
+
+impl Debug for B2Core {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backend")
+ .field("root", &self.root)
+ .field("bucket", &self.bucket)
+ .field("bucket_id", &self.bucket_id)
+ .finish_non_exhaustive()
+ }
+}
+
+impl B2Core {
+ #[inline]
+ pub async fn send(&self, req: Request<AsyncBody>) ->
Result<Response<IncomingAsyncBody>> {
+ self.client.send(req).await
+ }
+
+ ///
[b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account)
+ pub async fn get_auth_info(&self) -> Result<AuthInfo> {
+ {
+ let signer = self.signer.read().await;
+
+ if !signer.auth_info.authorization_token.is_empty()
+ && signer.auth_info.expires_in > Utc::now()
+ {
+ let auth_info = signer.auth_info.clone();
+ return Ok(auth_info);
+ }
+ }
+
+ {
+ let mut signer = self.signer.write().await;
+ let req =
Request::get("https://api.backblazeb2.com/b2api/v3/b2_authorize_account")
+ .header(
+ header::AUTHORIZATION,
+ format_authorization_by_basic(
+ &signer.application_key_id,
+ &signer.application_key,
+ )?,
+ )
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.client.send(req).await?;
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
+ let token =
serde_json::from_slice::<AuthorizeAccountResponse>(resp_body)
+ .map_err(new_json_deserialize_error)?;
+ signer.auth_info = AuthInfo {
+ authorization_token: token.authorization_token.clone(),
+ api_url: token.api_info.storage_api.api_url.clone(),
+ download_url:
token.api_info.storage_api.download_url.clone(),
+ // This authorization token is valid for at most 24
hours.
+ expires_in: Utc::now() + chrono::Duration::hours(20),
+ };
+ }
+ _ => {
+ return Err(parse_error(resp).await?);
+ }
+ }
+ Ok(signer.auth_info.clone())
+ }
+ }
+}
+
+impl B2Core {
+ pub async fn download_file_by_name(
+ &self,
+ path: &str,
+ args: &OpRead,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let path = build_abs_path(&self.root, path);
+
+ let auth_info = self.get_auth_info().await?;
+
+ // Construct headers to add to the request
+ let url = format!(
+ "{}/file/{}/{}",
+ auth_info.download_url,
+ self.bucket,
+ percent_encode_path(&path)
+ );
+
+ let mut req = Request::get(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let range = args.range();
+ if !range.is_full() {
+ req = req.header(http::header::RANGE, range.to_header());
+ }
+
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub(super) async fn get_upload_url(&self) -> Result<GetUploadUrlResponse> {
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!(
+ "{}/b2api/v2/b2_get_upload_url?bucketId={}",
+ auth_info.api_url, self.bucket_id
+ );
+
+ let mut req = Request::get(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.send(req).await?;
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
+ let resp =
serde_json::from_slice::<GetUploadUrlResponse>(resp_body)
+ .map_err(new_json_deserialize_error)?;
+ Ok(resp)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn get_download_authorization(
+ &self,
+ path: &str,
+ args: &OpRead,
+ expire: Duration,
+ ) -> Result<GetDownloadAuthorizationResponse> {
+ let path = build_abs_path(&self.root, path);
+
+ let auth_info = self.get_auth_info().await?;
+
+ // Construct headers to add to the request
+ let url = format!(
+ "{}/b2api/v2/b2_get_download_authorization",
+ auth_info.api_url
+ );
+ let mut req = Request::post(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let range = args.range();
+ if !range.is_full() {
+ req = req.header(http::header::RANGE, range.to_header());
+ }
+ let body = GetDownloadAuthorizationRequest {
+ bucket_id: self.bucket_id.clone(),
+ file_name_prefix: path,
+ valid_duration_in_seconds: expire.as_secs(),
+ };
+ let body =
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ let resp = self.send(req).await?;
+
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
+ let resp =
serde_json::from_slice::<GetDownloadAuthorizationResponse>(resp_body)
+ .map_err(new_json_deserialize_error)?;
+ Ok(resp)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn upload_file(
+ &self,
+ path: &str,
+ size: Option<u64>,
+ args: &OpWrite,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let resp = self.get_upload_url().await?;
+
+ let p = build_abs_path(&self.root, path);
+
+ let mut req = Request::post(resp.upload_url);
+
+ req = req.header(X_BZ_FILE_NAME, percent_encode_path(&p));
+
+ req = req.header(header::AUTHORIZATION, resp.authorization_token);
+
+ req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
+
+ if let Some(size) = size {
+ req = req.header(header::CONTENT_LENGTH, size.to_string())
+ }
+
+ if let Some(mime) = args.content_type() {
+ req = req.header(header::CONTENT_TYPE, mime)
+ } else {
+ req = req.header(header::CONTENT_TYPE, "b2/x-auto")
+ }
+
+ if let Some(pos) = args.content_disposition() {
+ req = req.header(header::CONTENT_DISPOSITION, pos)
+ }
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn start_large_file(
+ &self,
+ path: &str,
+ args: &OpWrite,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let p = build_abs_path(&self.root, path);
+
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!("{}/b2api/v2/b2_start_large_file",
auth_info.api_url);
+
+ let mut req = Request::post(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let mut start_large_file_request = StartLargeFileRequest {
+ bucket_id: self.bucket_id.clone(),
+ file_name: percent_encode_path(&p),
+ content_type: "b2/x-auto".to_owned(),
+ };
+
+ if let Some(mime) = args.content_type() {
+ start_large_file_request.content_type = mime.to_owned();
+ }
+
+ let body =
+
serde_json::to_vec(&start_large_file_request).map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn get_upload_part_url(&self, file_id: &str) ->
Result<GetUploadPartUrlResponse> {
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!(
+ "{}/b2api/v2/b2_get_upload_part_url?fileId={}",
+ auth_info.api_url, file_id
+ );
+
+ let mut req = Request::get(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ let resp = self.send(req).await?;
+
+ let status = resp.status();
+ match status {
+ StatusCode::OK => {
+ let resp_body = &resp.into_body().bytes().await?;
+ let resp =
serde_json::from_slice::<GetUploadPartUrlResponse>(resp_body)
+ .map_err(new_json_deserialize_error)?;
+ Ok(resp)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ pub async fn upload_part(
+ &self,
+ file_id: &str,
+ part_number: usize,
+ size: u64,
+ body: AsyncBody,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let resp = self.get_upload_part_url(file_id).await?;
+
+ let mut req = Request::post(resp.upload_url);
+
+ req = req.header(X_BZ_PART_NUMBER, part_number.to_string());
+
+ req = req.header(header::CONTENT_LENGTH, size.to_string());
+
+ req = req.header(header::AUTHORIZATION, resp.authorization_token);
+
+ req = req.header(X_BZ_CONTENT_SHA1, "do_not_verify");
+
+ // Set body
+ let req = req.body(body).map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn finish_large_file(
+ &self,
+ file_id: &str,
+ part_sha1_array: Vec<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!("{}/b2api/v2/b2_finish_large_file",
auth_info.api_url);
+
+ let mut req = Request::post(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let body = serde_json::to_vec(&FinishLargeFileRequest {
+ file_id: file_id.to_owned(),
+ part_sha1_array,
+ })
+ .map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn cancel_large_file(&self, file_id: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!("{}/b2api/v2/b2_cancel_large_file",
auth_info.api_url);
+
+ let mut req = Request::post(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let body = serde_json::to_vec(&CancelLargeFileRequest {
+ file_id: file_id.to_owned(),
+ })
+ .map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn list_file_names(
+ &self,
+ prefix: Option<&str>,
+ delimiter: Option<&str>,
+ limit: Option<usize>,
+ start_after: Option<String>,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let auth_info = self.get_auth_info().await?;
+
+ let mut url = format!(
+ "{}/b2api/v2/b2_list_file_names?bucketId={}",
+ auth_info.api_url, self.bucket_id
+ );
+
+ if let Some(prefix) = prefix {
+ let prefix = build_abs_path(&self.root, prefix);
+ url.push_str(&format!("&prefix={}", percent_encode_path(&prefix)));
+ }
+
+ if let Some(limit) = limit {
+ url.push_str(&format!("&maxFileCount={}", limit));
+ }
+
+ if let Some(start_after) = start_after {
+ let start_after = build_abs_path(&self.root, &start_after);
+ url.push_str(&format!(
+ "&startFileName={}",
+ percent_encode_path(&start_after)
+ ));
+ }
+
+ if let Some(delimiter) = delimiter {
+ url.push_str(&format!("&delimiter={}", delimiter));
+ }
+
+ let mut req = Request::get(&url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Empty)
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn copy_file(
+ &self,
+ source_file_id: String,
+ to: &str,
+ ) -> Result<Response<IncomingAsyncBody>> {
+ let to = build_abs_path(&self.root, to);
+
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!("{}/b2api/v2/b2_copy_file", auth_info.api_url);
+
+ let mut req = Request::post(url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let body = CopyFileRequest {
+ source_file_id,
+ file_name: to,
+ };
+
+ let body =
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+
+ pub async fn hide_file(&self, path: &str) ->
Result<Response<IncomingAsyncBody>> {
+ let path = build_abs_path(&self.root, path);
+
+ let auth_info = self.get_auth_info().await?;
+
+ let url = format!("{}/b2api/v2/b2_hide_file", auth_info.api_url);
+
+ let mut req = Request::post(url);
+
+ req = req.header(header::AUTHORIZATION, auth_info.authorization_token);
+
+ let body = HideFileRequest {
+ bucket_id: self.bucket_id.clone(),
+ file_name: path.to_string(),
+ };
+
+ let body =
serde_json::to_vec(&body).map_err(new_json_serialize_error)?;
+ let body = bytes::Bytes::from(body);
+
+ // Set body
+ let req = req
+ .body(AsyncBody::Bytes(body))
+ .map_err(new_request_build_error)?;
+
+ self.send(req).await
+ }
+}
+
+#[derive(Clone)]
+pub struct B2Signer {
+ /// The application_key_id of this core.
+ pub application_key_id: String,
+ /// The application_key of this core.
+ pub application_key: String,
+
+ pub auth_info: AuthInfo,
+}
+
+#[derive(Clone)]
+pub struct AuthInfo {
+ pub authorization_token: String,
+ /// The base URL to use for all API calls except for uploading and
downloading files.
+ pub api_url: String,
+ /// The base URL to use for downloading files.
+ pub download_url: String,
+
+ pub expires_in: DateTime<Utc>,
+}
+
+impl Default for B2Signer {
+ fn default() -> Self {
+ B2Signer {
+ application_key: String::new(),
+ application_key_id: String::new(),
+
+ auth_info: AuthInfo {
+ authorization_token: String::new(),
+ api_url: String::new(),
+ download_url: String::new(),
+ expires_in: DateTime::<Utc>::MIN_UTC,
+ },
+ }
+ }
+}
+
+/// Request of
[b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StartLargeFileRequest {
+ pub bucket_id: String,
+ pub file_name: String,
+ pub content_type: String,
+}
+
+/// Response of
[b2_start_large_file](https://www.backblaze.com/apidocs/b2-start-large-file).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StartLargeFileResponse {
+ pub file_id: String,
+}
+
+/// Response of
[b2_authorize_account](https://www.backblaze.com/apidocs/b2-authorize-account).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct AuthorizeAccountResponse {
+ /// An authorization token to use with all calls, other than
b2_authorize_account, that need an Authorization header. This authorization
token is valid for at most 24 hours.
+ /// So we should call b2_authorize_account every 24 hours.
+ pub authorization_token: String,
+ pub api_info: ApiInfo,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ApiInfo {
+ pub storage_api: StorageApi,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct StorageApi {
+ pub api_url: String,
+ pub download_url: String,
+}
+
+/// Response of
[b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-url).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetUploadUrlResponse {
+ /// The authorizationToken that must be used when uploading files to this
bucket.
+ /// This token is valid for 24 hours or until the uploadUrl endpoint
rejects an upload, see b2_upload_file
+ pub authorization_token: String,
+ pub upload_url: String,
+}
+
+/// Response of
[b2_get_upload_url](https://www.backblaze.com/apidocs/b2-get-upload-part-url).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetUploadPartUrlResponse {
+ /// The authorizationToken that must be used when uploading files to this
bucket.
+ /// This token is valid for 24 hours or until the uploadUrl endpoint
rejects an upload, see b2_upload_file
+ pub authorization_token: String,
+ pub upload_url: String,
+}
+
+/// Response of
[b2_upload_part](https://www.backblaze.com/apidocs/b2-upload-part).
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct UploadPartResponse {
+ pub content_sha1: String,
+}
+
+/// Response of
[b2_finish_large_file](https://www.backblaze.com/apidocs/b2-finish-large-file).
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct FinishLargeFileRequest {
+ pub file_id: String,
+ pub part_sha1_array: Vec<String>,
+}
+
+/// Response of
[b2_cancel_large_file](https://www.backblaze.com/apidocs/b2-cancel-large-file).
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CancelLargeFileRequest {
+ pub file_id: String,
+}
+
+/// Response of
[list_file_names](https://www.backblaze.com/apidocs/b2-list-file-names).
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ListFileNamesResponse {
+ pub files: Vec<File>,
+ pub next_file_name: Option<String>,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct File {
+ pub file_id: Option<String>,
+ pub content_length: u64,
+ pub content_md5: Option<String>,
+ pub content_type: Option<String>,
+ pub file_name: String,
+ pub action: String,
+}
+
+pub(super) fn parse_file_info(file: &File) -> Metadata {
+ if file.file_name.ends_with('/') {
+ return Metadata::new(EntryMode::DIR);
+ }
+
+ let mut metadata = Metadata::new(EntryMode::FILE);
+
+ metadata.set_content_length(file.content_length);
+
+ if let Some(content_md5) = &file.content_md5 {
+ metadata.set_content_md5(content_md5);
+ }
+
+ if let Some(content_type) = &file.content_type {
+ metadata.set_content_type(content_type);
+ }
+
+ metadata
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CopyFileRequest {
+ pub source_file_id: String,
+ pub file_name: String,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct HideFileRequest {
+ pub bucket_id: String,
+ pub file_name: String,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetDownloadAuthorizationRequest {
+ pub bucket_id: String,
+ pub file_name_prefix: String,
+ pub valid_duration_in_seconds: u64,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct GetDownloadAuthorizationResponse {
+ pub authorization_token: String,
+}
diff --git a/core/src/services/b2/docs.md b/core/src/services/b2/docs.md
new file mode 100644
index 000000000..577bc82ff
--- /dev/null
+++ b/core/src/services/b2/docs.md
@@ -0,0 +1,56 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [x] copy
+- [ ] rename
+- [x] list
+- [x] scan
+- [x] presign
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the work directory for backend
+- `key_id`: B2 application key keyID
+- `application_key` B2 application key applicationKey
+- `bucket` B2 bucket name
+- `bucket_id` B2 bucket_id
+
+You can refer to [`B2Builder`]'s docs for more information
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::B2;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // create backend builder
+ let mut builder = B2::default();
+
+ // set the storage bucket for OpenDAL
+ builder.root("/");
+ // set the key_id for OpenDAL
+ builder.key_id("xxxxxxxxxx");
+ // set the key_id for OpenDAL
+ builder.application_key("xxxxxxxxxx");
+ // set the bucket name for OpenDAL
+ builder.bucket("opendal");
+ // set the bucket_id for OpenDAL
+ builder.bucket_id("xxxxxxxxxxxxx");
+
+ let op: Operator = Operator::new(builder)?.finish();
+
+ Ok(())
+}
+```
diff --git a/core/src/services/b2/error.rs b/core/src/services/b2/error.rs
new file mode 100644
index 000000000..2a3ead791
--- /dev/null
+++ b/core/src/services/b2/error.rs
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::Buf;
+use http::Response;
+use serde::Deserialize;
+
+use crate::raw::*;
+use crate::Error;
+use crate::ErrorKind;
+use crate::Result;
+
+/// the error response of b2
+#[derive(Default, Debug, Deserialize)]
+#[allow(dead_code)]
+struct B2Error {
+ status: u32,
+ code: String,
+ message: String,
+}
+
+/// Parse error response into Error.
+pub async fn parse_error(resp: Response<IncomingAsyncBody>) -> Result<Error> {
+ let (parts, body) = resp.into_parts();
+ let bs = body.bytes().await?;
+
+ let (mut kind, mut retryable) = match parts.status.as_u16() {
+ 403 => (ErrorKind::PermissionDenied, false),
+ 404 => (ErrorKind::NotFound, false),
+ 304 | 412 => (ErrorKind::ConditionNotMatch, false),
+ // Service b2 could return 403, show the authorization error
+ 401 => (ErrorKind::PermissionDenied, true),
+ 500 | 502 | 503 | 504 => (ErrorKind::Unexpected, true),
+ _ => (ErrorKind::Unexpected, false),
+ };
+
+ let (message, b2_err) = serde_json::from_reader::<_,
B2Error>(bs.clone().reader())
+ .map(|b2_err| (format!("{b2_err:?}"), Some(b2_err)))
+ .unwrap_or_else(|_| (String::from_utf8_lossy(&bs).into_owned(), None));
+
+ if let Some(b2_err) = b2_err {
+ (kind, retryable) =
parse_b2_error_code(b2_err.code.as_str()).unwrap_or((kind, retryable));
+ };
+
+ let mut err = Error::new(kind, &message);
+
+ err = with_error_response_context(err, parts);
+
+ if retryable {
+ err = err.set_temporary();
+ }
+
+ Ok(err)
+}
+
+/// Returns the `Error kind` of this code and whether the error is retryable.
+pub fn parse_b2_error_code(code: &str) -> Option<(ErrorKind, bool)> {
+ match code {
+ "already_hidden" => Some((ErrorKind::AlreadyExists, false)),
+ "no_such_file" => Some((ErrorKind::NotFound, false)),
+ _ => None,
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use futures::stream;
+ use http::StatusCode;
+
+ use super::*;
+
+ #[test]
+ fn test_parse_b2_error_code() {
+ let code = "already_hidden";
+ assert_eq!(
+ parse_b2_error_code(code),
+ Some((crate::ErrorKind::AlreadyExists, false))
+ );
+
+ let code = "no_such_file";
+ assert_eq!(
+ parse_b2_error_code(code),
+ Some((crate::ErrorKind::NotFound, false))
+ );
+
+ let code = "not_found";
+ assert_eq!(parse_b2_error_code(code), None);
+ }
+
+ #[tokio::test]
+ async fn test_parse_error() {
+ let err_res = vec![
+ (
+ r#"{"status": 403, "code": "access_denied", "message":"The
provided customer-managed encryption key is wrong."}"#,
+ ErrorKind::PermissionDenied,
+ StatusCode::FORBIDDEN,
+ ),
+ (
+ r#"{"status": 404, "code": "not_found", "message":"File is not
in B2 Cloud Storage."}"#,
+ ErrorKind::NotFound,
+ StatusCode::NOT_FOUND,
+ ),
+ (
+ r#"{"status": 401, "code": "bad_auth_token", "message":"The
auth token used is not valid. Call b2_authorize_account again to either get a
new one, or an error message describing the problem."}"#,
+ ErrorKind::PermissionDenied,
+ StatusCode::UNAUTHORIZED,
+ ),
+ ];
+
+ for res in err_res {
+ let bs = bytes::Bytes::from(res.0);
+ let body = IncomingAsyncBody::new(
+ Box::new(oio::into_stream(stream::iter(vec![Ok(bs.clone())]))),
+ None,
+ );
+ let resp = Response::builder().status(res.2).body(body).unwrap();
+
+ let err = parse_error(resp).await;
+
+ assert!(err.is_ok());
+ assert_eq!(err.unwrap().kind(), res.1);
+ }
+ }
+}
diff --git a/core/src/services/b2/lister.rs b/core/src/services/b2/lister.rs
new file mode 100644
index 000000000..4f084537a
--- /dev/null
+++ b/core/src/services/b2/lister.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use bytes::Buf;
+
+use super::core::{parse_file_info, B2Core, ListFileNamesResponse};
+
+use crate::raw::*;
+use crate::services::b2::error::parse_error;
+use crate::*;
+
+pub struct B2Lister {
+ core: Arc<B2Core>,
+
+ path: String,
+ delimiter: Option<&'static str>,
+ limit: Option<usize>,
+
+ /// B2 starts listing **after** this specified key
+ start_after: Option<String>,
+}
+
+impl B2Lister {
+ pub fn new(
+ core: Arc<B2Core>,
+ path: &str,
+ recursive: bool,
+ limit: Option<usize>,
+ start_after: Option<&str>,
+ ) -> Self {
+ let delimiter = if recursive { None } else { Some("/") };
+ Self {
+ core,
+
+ path: path.to_string(),
+ delimiter,
+ limit,
+ start_after: start_after.map(String::from),
+ }
+ }
+}
+
+#[async_trait]
+impl oio::PageList for B2Lister {
+ async fn next_page(&self, ctx: &mut oio::PageContext) -> Result<()> {
+ let resp = self
+ .core
+ .list_file_names(
+ Some(&self.path),
+ self.delimiter,
+ self.limit,
+ self.start_after.clone(),
+ )
+ .await?;
+
+ if resp.status() != http::StatusCode::OK {
+ return Err(parse_error(resp).await?);
+ }
+
+ let bs = resp.into_body().bytes().await?;
+
+ let output: ListFileNamesResponse =
+
serde_json::from_reader(bs.reader()).map_err(new_json_deserialize_error)?;
+
+ ctx.done = output.next_file_name.is_none();
+
+ for file in output.files {
+ if let Some(start_after) = self.start_after.clone() {
+ if build_abs_path(&self.core.root, &start_after) ==
file.file_name {
+ continue;
+ }
+ }
+ if file.file_name == build_abs_path(&self.core.root, &self.path) {
+ continue;
+ }
+ let file_name = file.file_name.clone();
+ let metadata = parse_file_info(&file);
+
+ ctx.entries.push_back(oio::Entry::new(
+ &build_rel_path(&self.core.root, &file_name),
+ metadata,
+ ))
+ }
+
+ Ok(())
+ }
+}
diff --git a/core/src/services/b2/mod.rs b/core/src/services/b2/mod.rs
new file mode 100644
index 000000000..80745f01e
--- /dev/null
+++ b/core/src/services/b2/mod.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::B2Builder as B2;
+pub use backend::B2Config;
+
+mod core;
+mod error;
+mod lister;
+mod writer;
diff --git a/core/src/services/b2/writer.rs b/core/src/services/b2/writer.rs
new file mode 100644
index 000000000..d4ed419ff
--- /dev/null
+++ b/core/src/services/b2/writer.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use http::StatusCode;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::{B2Core, StartLargeFileResponse, UploadPartResponse};
+use super::error::parse_error;
+
+pub type B2Writers = oio::MultipartUploadWriter<B2Writer>;
+
+pub struct B2Writer {
+ core: Arc<B2Core>,
+
+ op: OpWrite,
+ path: String,
+}
+
+impl B2Writer {
+ pub fn new(core: Arc<B2Core>, path: &str, op: OpWrite) -> Self {
+ B2Writer {
+ core,
+ path: path.to_string(),
+ op,
+ }
+ }
+}
+
+#[async_trait]
+impl oio::MultipartUploadWrite for B2Writer {
+ async fn write_once(&self, size: u64, body: AsyncBody) -> Result<()> {
+ let resp = self
+ .core
+ .upload_file(&self.path, Some(size), &self.op, body)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn initiate_part(&self) -> Result<String> {
+ let resp = self.core.start_large_file(&self.path, &self.op).await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let result: StartLargeFileResponse =
+
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+
+ Ok(result.file_id)
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn write_part(
+ &self,
+ upload_id: &str,
+ part_number: usize,
+ size: u64,
+ body: AsyncBody,
+ ) -> Result<oio::MultipartUploadPart> {
+ // B2 requires part number must between [1..=10000]
+ let part_number = part_number + 1;
+
+ let resp = self
+ .core
+ .upload_part(upload_id, part_number, size, body)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ let bs = resp.into_body().bytes().await?;
+
+ let result: UploadPartResponse =
+
serde_json::from_slice(&bs).map_err(new_json_deserialize_error)?;
+
+ Ok(oio::MultipartUploadPart {
+ etag: result.content_sha1,
+ part_number,
+ })
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn complete_part(
+ &self,
+ upload_id: &str,
+ parts: &[oio::MultipartUploadPart],
+ ) -> Result<()> {
+ let part_sha1_array = parts
+ .iter()
+ .map(|p| {
+ let binding = p.etag.clone();
+ let sha1 = binding.strip_prefix("unverified:");
+ let Some(sha1) = sha1 else {
+ return "".to_string();
+ };
+ sha1.to_string()
+ })
+ .collect();
+
+ let resp = self
+ .core
+ .finish_large_file(upload_id, part_sha1_array)
+ .await?;
+
+ let status = resp.status();
+
+ match status {
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+
+ async fn abort_part(&self, upload_id: &str) -> Result<()> {
+ let resp = self.core.cancel_large_file(upload_id).await?;
+ match resp.status() {
+ // b2 returns code 200 if abort succeeds.
+ StatusCode::OK => {
+ resp.into_body().consume().await?;
+ Ok(())
+ }
+ _ => Err(parse_error(resp).await?),
+ }
+ }
+}
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index 9ba0246ca..947274a63 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -289,3 +289,10 @@ mod alluxio;
pub use alluxio::Alluxio;
#[cfg(feature = "services-alluxio")]
pub use alluxio::AlluxioConfig;
+
+#[cfg(feature = "services-b2")]
+mod b2;
+#[cfg(feature = "services-b2")]
+pub use b2::B2Config;
+#[cfg(feature = "services-b2")]
+pub use b2::B2;
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index f926bd570..278a08e11 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -163,6 +163,8 @@ impl Operator {
Scheme::Azdls => Self::from_map::<services::Azdls>(map)?.finish(),
#[cfg(feature = "services-azfile")]
Scheme::Azfile =>
Self::from_map::<services::Azfile>(map)?.finish(),
+ #[cfg(feature = "services-b2")]
+ Scheme::B2 => Self::from_map::<services::B2>(map)?.finish(),
#[cfg(feature = "services-cacache")]
Scheme::Cacache =>
Self::from_map::<services::Cacache>(map)?.finish(),
#[cfg(feature = "services-cos")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 54ea475f8..57093f510 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -38,6 +38,8 @@ pub enum Scheme {
Azblob,
/// [Azdls][crate::services::Azdls]: Azure Data Lake Storage Gen2.
Azdls,
+ /// [B2][crate::services::B2]: Backblaze B2 Services.
+ B2,
/// [cacache][crate::services::Cacache]: cacache backend support.
Cacache,
/// [cloudflare-kv][crate::services::CloudflareKv]: Cloudflare KV services.
@@ -173,6 +175,8 @@ impl Scheme {
Scheme::Azdls,
#[cfg(feature = "services-azfile")]
Scheme::Azfile,
+ #[cfg(feature = "services-b2")]
+ Scheme::B2,
#[cfg(feature = "services-cacache")]
Scheme::Cacache,
#[cfg(feature = "services-cos")]
@@ -283,6 +287,7 @@ impl FromStr for Scheme {
// OpenDAL used to call `azdls` as `azdfs`, we keep it for
backward compatibility.
// And abfs is widely used in hadoop ecosystem, keep it for easy
to use.
"azdls" | "azdfs" | "abfs" => Ok(Scheme::Azdls),
+ "b2" => Ok(Scheme::B2),
"cacache" => Ok(Scheme::Cacache),
"cloudflare_kv" => Ok(Scheme::CloudflareKv),
"cos" => Ok(Scheme::Cos),
@@ -338,6 +343,7 @@ impl From<Scheme> for &'static str {
Scheme::Atomicserver => "atomicserver",
Scheme::Azblob => "azblob",
Scheme::Azdls => "azdls",
+ Scheme::B2 => "b2",
Scheme::Cacache => "cacache",
Scheme::CloudflareKv => "cloudflare_kv",
Scheme::Cos => "cos",