This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9321505f feat: use opendal to access  underlying storage (#1557)
9321505f is described below

commit 9321505f7adca61abef8b0c4b3529e8e9fba097e
Author: 鲍金日 <[email protected]>
AuthorDate: Tue Aug 27 11:59:28 2024 +0800

    feat: use opendal to access  underlying storage (#1557)
    
    ## Rationale
    Use opendal to access the object store, thus unifying the access method
    of the underlying storage.
    
    ## Detailed Changes
    - use opendal to access s3/oss/local file
    
    ## Test Plan
    - Existed tests
---
 Cargo.lock                                         | 265 +++++++++++++++------
 Cargo.toml                                         |  13 +-
 Makefile                                           |   4 +-
 src/analytic_engine/src/manifest/details.rs        |   6 +-
 src/analytic_engine/src/setup.rs                   |  31 ++-
 src/analytic_engine/src/sst/meta_data/cache.rs     |   6 +-
 src/analytic_engine/src/sst/parquet/writer.rs      |  16 +-
 src/analytic_engine/src/tests/util.rs              |   8 +
 src/benchmarks/src/merge_memtable_bench.rs         |   5 +-
 src/benchmarks/src/merge_sst_bench.rs              |   5 +-
 src/benchmarks/src/parquet_bench.rs                |   4 +-
 src/benchmarks/src/scan_memtable_bench.rs          |   4 +-
 src/benchmarks/src/sst_bench.rs                    |   4 +-
 src/benchmarks/src/sst_tools.rs                    |   9 +-
 src/benchmarks/src/util.rs                         |   6 +
 src/components/object_store/Cargo.toml             |   9 +-
 src/components/object_store/src/aliyun.rs          |  60 ++---
 src/components/object_store/src/config.rs          |  47 +++-
 src/components/object_store/src/disk_cache.rs      |  27 +--
 src/components/object_store/src/lib.rs             |  13 +-
 .../object_store/src/{lib.rs => local_file.rs}     |  45 ++--
 src/components/object_store/src/mem_cache.rs       |   6 +-
 src/components/object_store/src/multi_part.rs      |  15 +-
 src/components/object_store/src/prefix.rs          |   6 +-
 src/components/object_store/src/s3.rs              |  56 +++--
 src/tools/src/bin/sst-convert.rs                   |   9 +-
 src/tools/src/bin/sst-metadata.rs                  |   9 +-
 27 files changed, 441 insertions(+), 247 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 66815f7a..0a2d3a0f 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -876,6 +876,18 @@ dependencies = [
  "tower-service",
 ]
 
+[[package]]
+name = "backon"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d67782c3f868daa71d3533538e98a8e13713231969def7536e8039606fc46bf0"
+dependencies = [
+ "fastrand 2.1.0",
+ "futures-core",
+ "pin-project",
+ "tokio",
+]
+
 [[package]]
 name = "backtrace"
 version = "0.3.67"
@@ -1208,9 +1220,9 @@ checksum = 
"14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
 
 [[package]]
 name = "bytes"
-version = "1.5.0"
+version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223"
+checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
 
 [[package]]
 name = "bytes_ext"
@@ -1713,9 +1725,9 @@ checksum = 
"9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484"
 
 [[package]]
 name = "crc32c"
-version = "0.6.3"
+version = "0.6.8"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e"
+checksum = "3a47af21622d091a8f0fb295b88bc886ac74efcc613efc19f5d0b21de5c89e47"
 dependencies = [
  "rustc_version",
 ]
@@ -2381,6 +2393,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
 dependencies = [
  "block-buffer",
+ "const-oid",
  "crypto-common",
  "subtle",
 ]
@@ -2406,6 +2419,15 @@ dependencies = [
  "winapi",
 ]
 
+[[package]]
+name = "dlv-list"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f"
+dependencies = [
+ "const-random",
+]
+
 [[package]]
 name = "doc-comment"
 version = "0.3.3"
@@ -2548,6 +2570,12 @@ dependencies = [
  "instant",
 ]
 
+[[package]]
+name = "fastrand"
+version = "2.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
+
 [[package]]
 name = "filedescriptor"
 version = "0.8.2"
@@ -2577,6 +2605,12 @@ version = "0.4.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
 
+[[package]]
+name = "flagset"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "b3ea1ec5f8307826a5b71094dd91fc04d4ae75d5709b20ad351c7fb4815c86ec"
+
 [[package]]
 name = "flatbuffers"
 version = "23.1.21"
@@ -2777,7 +2811,7 @@ version = "1.12.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
 dependencies = [
- "fastrand",
+ "fastrand 1.9.0",
  "futures-core",
  "futures-io",
  "memchr",
@@ -2878,8 +2912,10 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5"
 dependencies = [
  "cfg-if 1.0.0",
+ "js-sys",
  "libc",
  "wasi 0.11.0+wasi-snapshot-preview1",
+ "wasm-bindgen",
 ]
 
 [[package]]
@@ -3069,6 +3105,15 @@ dependencies = [
  "digest",
 ]
 
+[[package]]
+name = "home"
+version = "0.5.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "e3d1354bf6b7235cb4a0576c2619fd4ed18183f689b12b006a0ee7329eeff9a5"
+dependencies = [
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "horaectl"
 version = "2.0.0"
@@ -3078,7 +3123,7 @@ dependencies = [
  "clap",
  "lazy_static",
  "prettytable",
- "reqwest 0.11.24",
+ "reqwest 0.12.4",
  "serde",
  "shell-words",
  "tokio",
@@ -3143,7 +3188,7 @@ dependencies = [
  "async-trait",
  "horaedb-client",
  "local-ip-address",
- "reqwest 0.11.24",
+ "reqwest 0.12.4",
  "serde",
  "sqlness",
  "tokio",
@@ -4111,7 +4156,7 @@ dependencies = [
  "logger",
  "macros",
  "prost 0.11.8",
- "reqwest 0.11.24",
+ "reqwest 0.12.4",
  "serde",
  "serde_json",
  "snafu 0.6.10",
@@ -4620,22 +4665,13 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6"
 dependencies = [
  "async-trait",
- "base64 0.22.1",
  "bytes",
  "chrono",
  "futures 0.3.28",
  "humantime 2.1.0",
- "hyper 1.3.1",
  "itertools 0.12.0",
- "md-5",
  "parking_lot 0.12.1",
  "percent-encoding",
- "quick-xml 0.31.0",
- "rand 0.8.5",
- "reqwest 0.12.4",
- "ring 0.17.7",
- "serde",
- "serde_json",
  "snafu 0.7.4",
  "tokio",
  "tracing",
@@ -4662,11 +4698,14 @@ dependencies = [
  "macros",
  "notifier",
  "object_store 0.10.1",
+ "object_store_opendal",
+ "opendal",
  "partitioned_lock",
  "prometheus 0.12.0",
  "prometheus-static-metric",
  "prost 0.11.8",
  "rand 0.8.5",
+ "reqwest 0.12.4",
  "runtime",
  "serde",
  "serde_json",
@@ -4680,6 +4719,23 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "object_store_opendal"
+version = "0.46.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "f7e5902fc99e9fb9e32c93f6a67dc5cc0772dc0fb348e2ef4ce258b03666d034"
+dependencies = [
+ "async-trait",
+ "bytes",
+ "flagset",
+ "futures 0.3.28",
+ "futures-util",
+ "object_store 0.10.1",
+ "opendal",
+ "pin-project",
+ "tokio",
+]
+
 [[package]]
 name = "obkv-table-client-rs"
 version = "0.1.0"
@@ -4739,6 +4795,36 @@ version = "11.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575"
 
+[[package]]
+name = "opendal"
+version = "0.49.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "39d516adf7db912c38af382c3e92c27cd62fbbc240e630920555d784c2ab1494"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "backon",
+ "base64 0.22.1",
+ "bytes",
+ "chrono",
+ "crc32c",
+ "flagset",
+ "futures 0.3.28",
+ "getrandom",
+ "http 1.1.0",
+ "log",
+ "md-5",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.36.1",
+ "reqsign",
+ "reqwest 0.12.4",
+ "serde",
+ "serde_json",
+ "tokio",
+ "uuid",
+]
+
 [[package]]
 name = "opensrv-mysql"
 version = "0.1.0"
@@ -4753,12 +4839,6 @@ dependencies = [
  "tokio",
 ]
 
-[[package]]
-name = "openssl-probe"
-version = "0.1.5"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
-
 [[package]]
 name = "ordered-float"
 version = "2.10.0"
@@ -4768,6 +4848,16 @@ dependencies = [
  "num-traits",
 ]
 
+[[package]]
+name = "ordered-multimap"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79"
+dependencies = [
+ "dlv-list",
+ "hashbrown 0.14.0",
+]
+
 [[package]]
 name = "overload"
 version = "0.1.1"
@@ -5101,22 +5191,22 @@ dependencies = [
 
 [[package]]
 name = "pin-project"
-version = "1.0.12"
+version = "1.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
+checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3"
 dependencies = [
  "pin-project-internal",
 ]
 
 [[package]]
 name = "pin-project-internal"
-version = "1.0.12"
+version = "1.1.5"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
+checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
 dependencies = [
  "proc-macro2",
  "quote",
- "syn 1.0.109",
+ "syn 2.0.48",
 ]
 
 [[package]]
@@ -5768,9 +5858,19 @@ dependencies = [
 
 [[package]]
 name = "quick-xml"
-version = "0.31.0"
+version = "0.35.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
+checksum = "86e446ed58cef1bbfe847bc2fda0e2e4ea9f0e57b90c507d4781292590d72a4e"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
+[[package]]
+name = "quick-xml"
+version = "0.36.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "96a05e2e8efddfa51a84ca47cec303fac86c8541b686d37cac5efc0e094417bc"
 dependencies = [
  "memchr",
  "serde",
@@ -6015,6 +6115,35 @@ dependencies = [
  "bytecheck",
 ]
 
+[[package]]
+name = "reqsign"
+version = "0.16.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "03dd4ba7c3901dd43e6b8c7446a760d45bc1ea4301002e1a6fa48f97c3a796fa"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "base64 0.22.1",
+ "chrono",
+ "form_urlencoded",
+ "getrandom",
+ "hex",
+ "hmac",
+ "home",
+ "http 1.1.0",
+ "log",
+ "once_cell",
+ "percent-encoding",
+ "quick-xml 0.35.0",
+ "rand 0.8.5",
+ "reqwest 0.12.4",
+ "rust-ini",
+ "serde",
+ "serde_json",
+ "sha1",
+ "sha2",
+]
+
 [[package]]
 name = "reqwest"
 version = "0.11.24"
@@ -6081,7 +6210,6 @@ dependencies = [
  "percent-encoding",
  "pin-project-lite",
  "rustls 0.22.2",
- "rustls-native-certs",
  "rustls-pemfile 2.1.2",
  "rustls-pki-types",
  "serde",
@@ -6097,6 +6225,7 @@ dependencies = [
  "wasm-bindgen-futures",
  "wasm-streams",
  "web-sys",
+ "webpki-roots 0.26.3",
  "winreg 0.52.0",
 ]
 
@@ -6241,6 +6370,17 @@ dependencies = [
  "time 0.1.43",
 ]
 
+[[package]]
+name = "rust-ini"
+version = "0.21.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "4e310ef0e1b6eeb79169a1171daf9abcb87a2e17c03bee2c4bb100b55c75409f"
+dependencies = [
+ "cfg-if 1.0.0",
+ "ordered-multimap",
+ "trim-in-place",
+]
+
 [[package]]
 name = "rust-sdk-test"
 version = "2.0.0"
@@ -6346,19 +6486,6 @@ dependencies = [
  "zeroize",
 ]
 
-[[package]]
-name = "rustls-native-certs"
-version = "0.7.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
-dependencies = [
- "openssl-probe",
- "rustls-pemfile 2.1.2",
- "rustls-pki-types",
- "schannel",
- "security-framework",
-]
-
 [[package]]
 name = "rustls-pemfile"
 version = "0.2.1"
@@ -6464,15 +6591,6 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71"
 
-[[package]]
-name = "schannel"
-version = "0.1.23"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534"
-dependencies = [
- "windows-sys 0.52.0",
-]
-
 [[package]]
 name = "scheduled-thread-pool"
 version = "0.2.7"
@@ -6529,29 +6647,6 @@ version = "4.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
 
-[[package]]
-name = "security-framework"
-version = "2.10.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6"
-dependencies = [
- "bitflags 1.3.2",
- "core-foundation",
- "core-foundation-sys",
- "libc",
- "security-framework-sys",
-]
-
-[[package]]
-name = "security-framework-sys"
-version = "2.11.0"
-source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
-dependencies = [
- "core-foundation-sys",
- "libc",
-]
-
 [[package]]
 name = "semver"
 version = "1.0.17"
@@ -7311,7 +7406,7 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998"
 dependencies = [
  "cfg-if 1.0.0",
- "fastrand",
+ "fastrand 1.9.0",
  "redox_syscall 0.3.5",
  "rustix",
  "windows-sys 0.45.0",
@@ -7921,6 +8016,12 @@ dependencies = [
  "tracing-subscriber",
 ]
 
+[[package]]
+name = "trim-in-place"
+version = "0.1.7"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "343e926fc669bc8cde4fa3129ab681c63671bae288b1f1081ceee6d9d37904fc"
+
 [[package]]
 name = "triomphe"
 version = "0.1.8"
@@ -8063,6 +8164,7 @@ checksum = 
"5e395fcf16a7a3d8127ec99782007af141946b4795001f876d54fb0d55978560"
 dependencies = [
  "getrandom",
  "rand 0.8.5",
+ "serde",
  "uuid-macro-internal",
 ]
 
@@ -8326,6 +8428,15 @@ version = "0.25.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
 
+[[package]]
+name = "webpki-roots"
+version = "0.26.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bd7c23921eeb1713a4e851530e9b9756e4fb0e89978582942612524cf09f01cd"
+dependencies = [
+ "rustls-pki-types",
+]
+
 [[package]]
 name = "which"
 version = "4.4.0"
diff --git a/Cargo.toml b/Cargo.toml
index 4563acf5..c0e62ebf 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -160,9 +160,10 @@ query_frontend = { path = "src/query_frontend" }
 rand = "0.8.5"
 regex = "1"
 remote_engine_client = { path = "src/remote_engine_client" }
-reqwest = { version = "0.11", default-features = false, features = [
+reqwest = { version = "0.12.4", default-features = false, features = [
     "rustls-tls",
     "json",
+    "http2",
 ] }
 router = { path = "src/router" }
 runtime = { path = "src/components/runtime" }
@@ -200,14 +201,14 @@ zstd = { version = "0.12", default-features = false }
 # This profile optimizes for good runtime performance.
 [profile.release]
 # reference: 
https://doc.rust-lang.org/rustc/codegen-options/index.html#codegen-units
-codegen-units   = 1
-debug           = true
+codegen-units = 1
+debug = true
 overflow-checks = true
 
 # This profile is used to produce a smaller (no symbols) binary with a little 
bit poorer performance,
 # but with a faster speed and low memory consumption required by compiling.
 [profile.release-slim]
-inherits      = "release"
+inherits = "release"
 codegen-units = 16
-debug         = false
-strip         = true
+debug = false
+strip = true
diff --git a/Makefile b/Makefile
index 6569ebf0..4480eede 100644
--- a/Makefile
+++ b/Makefile
@@ -126,7 +126,7 @@ dev-setup:
        echo "Installing dependencies using Homebrew..."
        HOMEBREW_NO_AUTO_UPDATE=1 brew install git openssl protobuf cmake 
pre-commit
        cargo install cargo-udeps
-       cargo install cargo-sort
+       cargo install --git https://github.com/DevinR528/cargo-sort --rev 
55ec890 --locked
 else ifeq ($(shell uname), Linux)
 dev-setup:
        echo "Detecting Linux system..."
@@ -137,7 +137,7 @@ dev-setup:
                sudo apt-get update; \
                sudo apt install -y git gcc g++ libssl-dev pkg-config 
protobuf-compiler cmake pre-commit; \
                cargo install cargo-udeps; \
-               cargo install cargo-sort; \
+               cargo install --git https://github.com/DevinR528/cargo-sort 
--rev 55ec890 --locked; \
        else \
                echo "Error: Unsupported Linux distribution. Exiting..."; \
                exit 1; \
diff --git a/src/analytic_engine/src/manifest/details.rs 
b/src/analytic_engine/src/manifest/details.rs
index 7df80a4f..e49c3d82 100644
--- a/src/analytic_engine/src/manifest/details.rs
+++ b/src/analytic_engine/src/manifest/details.rs
@@ -653,7 +653,7 @@ mod tests {
         column_schema, datum::DatumKind, schema, schema::Schema, 
table::DEFAULT_SHARD_ID,
     };
     use futures::future::BoxFuture;
-    use object_store::LocalFileSystem;
+    use object_store::local_file;
     use runtime::Runtime;
     use table_engine::table::{SchemaId, TableId, TableSeqGenerator};
     use wal::rocksdb_impl::manager::Builder as WalBuilder;
@@ -836,7 +836,9 @@ mod tests {
                 .build()
                 .unwrap();
 
-            let object_store = 
LocalFileSystem::new_with_prefix(&self.dir).unwrap();
+            let local_path = self.dir.to_string_lossy().to_string();
+            let object_store = 
local_file::try_new_with_default(local_path).unwrap();
+
             ManifestImpl::open(
                 self.options.clone(),
                 Arc::new(manifest_wal),
diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs
index be0d9354..ee167729 100644
--- a/src/analytic_engine/src/setup.rs
+++ b/src/analytic_engine/src/setup.rs
@@ -25,10 +25,11 @@ use object_store::{
     aliyun,
     config::{ObjectStoreOptions, StorageOptions},
     disk_cache::DiskCacheStore,
+    local_file,
     mem_cache::{MemCache, MemCacheStore},
     metrics::StoreWithMetrics,
     prefix::StoreWithPrefix,
-    s3, LocalFileSystem, ObjectStoreRef,
+    s3, ObjectStoreRef,
 };
 use snafu::{ResultExt, Snafu};
 use table_engine::engine::{EngineRuntimes, TableEngineRef};
@@ -61,6 +62,9 @@ pub enum Error {
         source: object_store::ObjectStoreError,
     },
 
+    #[snafu(display("Failed to access object store by openDal , err:{}", 
source))]
+    OpenDal { source: object_store::OpenDalError },
+
     #[snafu(display("Failed to create dir for {}, err:{}", path, source))]
     CreateDir {
         path: String,
@@ -192,27 +196,32 @@ fn open_storage(
 ) -> Pin<Box<dyn Future<Output = Result<OpenedStorages>> + Send>> {
     Box::pin(async move {
         let mut store = match opts.object_store {
-            ObjectStoreOptions::Local(local_opts) => {
+            ObjectStoreOptions::Local(mut local_opts) => {
                 let data_path = Path::new(&local_opts.data_dir);
-                let sst_path = data_path.join(STORE_DIR_NAME);
+                let sst_path = data_path
+                    .join(STORE_DIR_NAME)
+                    .to_string_lossy()
+                    .into_owned();
                 tokio::fs::create_dir_all(&sst_path)
                     .await
                     .context(CreateDir {
-                        path: sst_path.to_string_lossy().into_owned(),
+                        path: sst_path.clone(),
                     })?;
-                let store = 
LocalFileSystem::new_with_prefix(sst_path).context(OpenObjectStore)?;
+                local_opts.data_dir = sst_path;
+
+                let store: ObjectStoreRef =
+                    
Arc::new(local_file::try_new(&local_opts).context(OpenDal)?);
                 Arc::new(store) as _
             }
             ObjectStoreOptions::Aliyun(aliyun_opts) => {
-                let oss: ObjectStoreRef =
-                    
Arc::new(aliyun::try_new(&aliyun_opts).context(OpenObjectStore)?);
-                let store_with_prefix = 
StoreWithPrefix::new(aliyun_opts.prefix, oss);
+                let store: ObjectStoreRef =
+                    Arc::new(aliyun::try_new(&aliyun_opts).context(OpenDal)?);
+                let store_with_prefix = 
StoreWithPrefix::new(aliyun_opts.prefix, store);
                 Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
             }
             ObjectStoreOptions::S3(s3_option) => {
-                let oss: ObjectStoreRef =
-                    
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
-                let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, 
oss);
+                let store: ObjectStoreRef = 
Arc::new(s3::try_new(&s3_option).context(OpenDal)?);
+                let store_with_prefix = StoreWithPrefix::new(s3_option.prefix, 
store);
                 Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
             }
         };
diff --git a/src/analytic_engine/src/sst/meta_data/cache.rs 
b/src/analytic_engine/src/sst/meta_data/cache.rs
index 8ddaf487..d90e71b0 100644
--- a/src/analytic_engine/src/sst/meta_data/cache.rs
+++ b/src/analytic_engine/src/sst/meta_data/cache.rs
@@ -180,7 +180,7 @@ mod tests {
         schema::Builder as CustomSchemaBuilder,
         time::{TimeRange, Timestamp},
     };
-    use object_store::{LocalFileSystem, ObjectStoreRef};
+    use object_store::{local_file, ObjectStoreRef};
     use parquet::{arrow::ArrowWriter, file::footer};
     use parquet_ext::ParquetMetaData;
 
@@ -329,7 +329,9 @@ mod tests {
             parquet_filter: None,
             column_values: None,
         };
-        let store = 
Arc::new(LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap());
+
+        let local_path = temp_dir.as_ref().to_string_lossy().to_string();
+        let store = 
Arc::new(local_file::try_new_with_default(local_path).unwrap());
         write_parquet_file_with_metadata(
             store.clone(),
             parquet_file_path.as_path(),
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs 
b/src/analytic_engine/src/sst/parquet/writer.rs
index 5fe669c5..732753b7 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -28,7 +28,10 @@ use datafusion::parquet::basic::Compression;
 use futures::StreamExt;
 use generic_error::BoxError;
 use logger::{debug, error};
-use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, 
WriteMultipartRef};
+use object_store::{
+    multi_part::{MultiUploadRef, MultiUploadWriter},
+    ObjectStore, ObjectStoreRef, Path,
+};
 use snafu::{OptionExt, ResultExt};
 use tokio::io::AsyncWrite;
 
@@ -417,7 +420,7 @@ async fn write_metadata(
     Ok(buf_size)
 }
 
-async fn multi_upload_abort(aborter: WriteMultipartRef) {
+async fn multi_upload_abort(aborter: MultiUploadRef) {
     // The uploading file will be leaked if failed to abort. A repair command
     // will be provided to clean up the leaked files.
     if let Err(e) = aborter.lock().await.abort().await {
@@ -589,7 +592,7 @@ mod tests {
         time::{TimeRange, Timestamp},
     };
     use futures::stream;
-    use object_store::LocalFileSystem;
+    use object_store::local_file;
     use runtime::{self, Runtime};
     use table_engine::predicate::Predicate;
     use tempfile::tempdir;
@@ -613,7 +616,7 @@ mod tests {
     fn test_parquet_build_and_read() {
         test_util::init_log_for_test();
 
-        let runtime = Arc::new(runtime::Builder::default().build().unwrap());
+        let runtime = 
Arc::new(runtime::Builder::default().enable_all().build().unwrap());
         parquet_write_and_then_read_back(runtime.clone(), 2, vec![2, 2, 2, 2, 
2, 2, 2, 2, 2, 2]);
         parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 
3, 3, 2]);
         parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 4, 
4]);
@@ -635,9 +638,8 @@ mod tests {
                 column_stats: Default::default(),
             };
 
-            let dir = tempdir().unwrap();
-            let root = dir.path();
-            let store: ObjectStoreRef = 
Arc::new(LocalFileSystem::new_with_prefix(root).unwrap());
+            let root = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+            let store: ObjectStoreRef = 
Arc::new(local_file::try_new_with_default(root).unwrap());
             let store_picker: ObjectStorePickerRef = Arc::new(store);
             let sst_file_path = Path::from("data.par");
 
diff --git a/src/analytic_engine/src/tests/util.rs 
b/src/analytic_engine/src/tests/util.rs
index 7eab3c1b..8fe07106 100644
--- a/src/analytic_engine/src/tests/util.rs
+++ b/src/analytic_engine/src/tests/util.rs
@@ -510,6 +510,8 @@ impl Builder {
                 disk_cache_partition_bits: 0,
                 object_store: ObjectStoreOptions::Local(LocalOptions {
                     data_dir: dir.path().to_str().unwrap().to_string(),
+                    max_retries: 3,
+                    timeout: Default::default(),
                 }),
             },
             wal: WalConfig {
@@ -588,6 +590,8 @@ impl Default for RocksDBEngineBuildContext {
                 disk_cache_partition_bits: 0,
                 object_store: ObjectStoreOptions::Local(LocalOptions {
                     data_dir: dir.path().to_str().unwrap().to_string(),
+                    max_retries: 3,
+                    timeout: Default::default(),
                 }),
             },
             wal: WalConfig {
@@ -621,6 +625,8 @@ impl Clone for RocksDBEngineBuildContext {
             disk_cache_partition_bits: 0,
             object_store: ObjectStoreOptions::Local(LocalOptions {
                 data_dir: dir.path().to_str().unwrap().to_string(),
+                max_retries: 3,
+                timeout: Default::default(),
             }),
         };
 
@@ -685,6 +691,8 @@ impl Default for MemoryEngineBuildContext {
                 disk_cache_partition_bits: 0,
                 object_store: ObjectStoreOptions::Local(LocalOptions {
                     data_dir: dir.path().to_str().unwrap().to_string(),
+                    max_retries: 3,
+                    timeout: Default::default(),
                 }),
             },
             wal: WalConfig {
diff --git a/src/benchmarks/src/merge_memtable_bench.rs 
b/src/benchmarks/src/merge_memtable_bench.rs
index 7c9d9ba4..abf5f8f4 100644
--- a/src/benchmarks/src/merge_memtable_bench.rs
+++ b/src/benchmarks/src/merge_memtable_bench.rs
@@ -45,7 +45,7 @@ use common_types::{
     projected_schema::ProjectedSchema, request_id::RequestId, schema::Schema, 
time::TimeRange,
 };
 use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef};
+use object_store::{local_file, ObjectStoreRef};
 use runtime::Runtime;
 use table_engine::{predicate::Predicate, table::TableId};
 
@@ -69,7 +69,8 @@ impl MergeMemTableBench {
     pub fn new(config: MergeMemTableBenchConfig) -> Self {
         assert!(!config.sst_file_ids.is_empty());
 
-        let store = 
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+        let store = 
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
+
         let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
         let space_id = config.space_id;
         let table_id = config.table_id;
diff --git a/src/benchmarks/src/merge_sst_bench.rs 
b/src/benchmarks/src/merge_sst_bench.rs
index 0e7280f8..9a949438 100644
--- a/src/benchmarks/src/merge_sst_bench.rs
+++ b/src/benchmarks/src/merge_sst_bench.rs
@@ -38,7 +38,7 @@ use analytic_engine::{
 };
 use common_types::{projected_schema::ProjectedSchema, request_id::RequestId, 
schema::Schema};
 use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef};
+use object_store::{local_file, ObjectStoreRef};
 use runtime::Runtime;
 use table_engine::{predicate::Predicate, table::TableId};
 use tokio::sync::mpsc::{self, UnboundedReceiver};
@@ -65,7 +65,8 @@ impl MergeSstBench {
     pub fn new(config: MergeSstBenchConfig) -> Self {
         assert!(!config.sst_file_ids.is_empty());
 
-        let store = 
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+        let store = 
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
+
         let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
         let space_id = config.space_id;
         let table_id = config.table_id;
diff --git a/src/benchmarks/src/parquet_bench.rs 
b/src/benchmarks/src/parquet_bench.rs
index 9ff18608..5bec32ba 100644
--- a/src/benchmarks/src/parquet_bench.rs
+++ b/src/benchmarks/src/parquet_bench.rs
@@ -23,7 +23,7 @@ use analytic_engine::sst::meta_data::cache::MetaCacheRef;
 use common_types::schema::Schema;
 use futures::StreamExt;
 use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef, Path};
+use object_store::{local_file, ObjectStoreRef, Path};
 use parquet::arrow::{
     arrow_reader::ParquetRecordBatchReaderBuilder, 
ParquetRecordBatchStreamBuilder,
 };
@@ -46,7 +46,7 @@ pub struct ParquetBench {
 
 impl ParquetBench {
     pub fn new(config: SstBenchConfig) -> Self {
-        let store = 
Arc::new(LocalFileSystem::new_with_prefix(&config.store_path).unwrap()) as _;
+        let store = 
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
 
         let runtime = util::new_runtime(config.runtime_thread_num);
 
diff --git a/src/benchmarks/src/scan_memtable_bench.rs 
b/src/benchmarks/src/scan_memtable_bench.rs
index a51ceeb0..edd1ad32 100644
--- a/src/benchmarks/src/scan_memtable_bench.rs
+++ b/src/benchmarks/src/scan_memtable_bench.rs
@@ -33,7 +33,7 @@ use common_types::{
     time::TimeRange,
 };
 use logger::info;
-use object_store::{LocalFileSystem, Path};
+use object_store::{local_file, Path};
 
 use crate::{config::ScanMemTableBenchConfig, util};
 
@@ -45,7 +45,7 @@ pub struct ScanMemTableBench {
 
 impl ScanMemTableBench {
     pub fn new(config: ScanMemTableBenchConfig) -> Self {
-        let store = 
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+        let store = 
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
 
         let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
         let meta_cache: Option<MetaCacheRef> = None;
diff --git a/src/benchmarks/src/sst_bench.rs b/src/benchmarks/src/sst_bench.rs
index 2577c0a1..29afdd7f 100644
--- a/src/benchmarks/src/sst_bench.rs
+++ b/src/benchmarks/src/sst_bench.rs
@@ -31,7 +31,7 @@ use common_types::{
     schema::Schema,
 };
 use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef, Path};
+use object_store::{local_file, ObjectStoreRef, Path};
 use runtime::Runtime;
 
 use crate::{config::SstBenchConfig, util};
@@ -50,7 +50,7 @@ impl SstBench {
     pub fn new(config: SstBenchConfig) -> Self {
         let runtime = Arc::new(util::new_runtime(config.runtime_thread_num));
 
-        let store = 
Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap()) as _;
+        let store = 
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
         let sst_path = Path::from(config.sst_file_name.clone());
         let meta_cache: Option<MetaCacheRef> = config
             .sst_meta_cache_cap
diff --git a/src/benchmarks/src/sst_tools.rs b/src/benchmarks/src/sst_tools.rs
index 664f89b0..4e274929 100644
--- a/src/benchmarks/src/sst_tools.rs
+++ b/src/benchmarks/src/sst_tools.rs
@@ -48,7 +48,7 @@ use common_types::{
 };
 use generic_error::BoxError;
 use logger::info;
-use object_store::{LocalFileSystem, ObjectStoreRef, Path};
+use object_store::{local_file, ObjectStoreRef, Path};
 use runtime::Runtime;
 use serde::Deserialize;
 use table_engine::{predicate::Predicate, table::TableId};
@@ -81,7 +81,7 @@ async fn create_sst_from_stream(config: SstConfig, 
record_batch_stream: RecordBa
     );
 
     let store: ObjectStoreRef =
-        Arc::new(LocalFileSystem::new_with_prefix(config.store_path).unwrap());
+        Arc::new(local_file::try_new_with_default(config.store_path).unwrap());
     let store_picker: ObjectStorePickerRef = Arc::new(store);
     let sst_file_path = Path::from(config.sst_file_name);
 
@@ -115,7 +115,7 @@ pub struct RebuildSstConfig {
 pub async fn rebuild_sst(config: RebuildSstConfig, runtime: Arc<Runtime>) {
     info!("Start rebuild sst, config:{:?}", config);
 
-    let store = 
Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) 
as _;
+    let store = 
Arc::new(local_file::try_new_with_default(config.store_path.clone()).unwrap()) 
as _;
     let input_path = Path::from(config.input_file_name);
 
     let parquet_metadata = util::parquet_metadata(&store, &input_path).await;
@@ -210,7 +210,8 @@ pub async fn merge_sst(config: MergeSstConfig, runtime: 
Arc<Runtime>) {
 
     let space_id = config.space_id;
     let table_id = config.table_id;
-    let store = 
Arc::new(LocalFileSystem::new_with_prefix(config.store_path.clone()).unwrap()) 
as _;
+    let store = 
Arc::new(local_file::try_new_with_default(config.store_path).unwrap()) as _;
+
     let (tx, _rx) = mpsc::unbounded_channel();
     let purge_queue = FilePurgeQueue::new(space_id, table_id, tx);
 
diff --git a/src/benchmarks/src/util.rs b/src/benchmarks/src/util.rs
index cb6d8de9..a7f86f08 100644
--- a/src/benchmarks/src/util.rs
+++ b/src/benchmarks/src/util.rs
@@ -308,6 +308,8 @@ impl Builder {
                 disk_cache_partition_bits: 0,
                 object_store: ObjectStoreOptions::Local(LocalOptions {
                     data_dir: dir.path().to_str().unwrap().to_string(),
+                    max_retries: 3,
+                    timeout: Default::default(),
                 }),
             },
             wal: WalConfig {
@@ -386,6 +388,8 @@ impl Default for RocksDBEngineBuildContext {
                 disk_cache_partition_bits: 0,
                 object_store: ObjectStoreOptions::Local(LocalOptions {
                     data_dir: dir.path().to_str().unwrap().to_string(),
+                    max_retries: 3,
+                    timeout: Default::default(),
                 }),
             },
             wal: WalConfig {
@@ -419,6 +423,8 @@ impl Clone for RocksDBEngineBuildContext {
             disk_cache_partition_bits: 0,
             object_store: ObjectStoreOptions::Local(LocalOptions {
                 data_dir: dir.path().to_str().unwrap().to_string(),
+                max_retries: 3,
+                timeout: Default::default(),
             }),
         };
 
diff --git a/src/components/object_store/Cargo.toml 
b/src/components/object_store/Cargo.toml
index f9221e1d..926e85f9 100644
--- a/src/components/object_store/Cargo.toml
+++ b/src/components/object_store/Cargo.toml
@@ -45,11 +45,18 @@ logger = { workspace = true }
 lru = { workspace = true }
 macros = { workspace = true }
 notifier = { workspace = true }
+object_store_opendal = "0.46.0"
+opendal = { version = "0.49.0", features = [
+    "services-oss",
+    "services-s3",
+    "services-fs",
+] }
 partitioned_lock = { workspace = true }
 prometheus = { workspace = true }
 prometheus-static-metric = { workspace = true }
 prost = { workspace = true }
 rand = { workspace = true }
+reqwest = { workspace = true }
 runtime = { workspace = true }
 serde = { workspace = true }
 serde_json = { workspace = true }
@@ -59,7 +66,7 @@ table_kv = { workspace = true }
 time_ext = { workspace = true }
 tokio = { workspace = true }
 twox-hash = "1.6"
-upstream = { package = "object_store", version = "0.10.1", features = [ "aws" 
] }
+upstream = { package = "object_store", version = "0.10.1" }
 uuid = { version = "1.3.3", features = ["v4"] }
 
 [dev-dependencies]
diff --git a/src/components/object_store/src/aliyun.rs 
b/src/components/object_store/src/aliyun.rs
index f2432d51..736c8755 100644
--- a/src/components/object_store/src/aliyun.rs
+++ b/src/components/object_store/src/aliyun.rs
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use upstream::{
-    aws::{AmazonS3, AmazonS3Builder},
-    ClientOptions, RetryConfig,
+use object_store_opendal::OpendalStore;
+use opendal::{
+    layers::{RetryLayer, TimeoutLayer},
+    raw::HttpClient,
+    services::Oss,
+    Operator, Result,
 };
 
 use crate::config::AliyunOptions;
@@ -34,36 +37,35 @@ fn normalize_endpoint(endpoint: &str, bucket: &str) -> 
String {
     }
 }
 
-pub fn try_new(aliyun_opts: &AliyunOptions) -> upstream::Result<AmazonS3> {
-    let cli_opt = ClientOptions::new()
-        .with_allow_http(true)
-        .with_pool_max_idle_per_host(aliyun_opts.http.pool_max_idle_per_host)
-        .with_http2_keep_alive_timeout(aliyun_opts.http.keep_alive_timeout.0)
-        .with_http2_keep_alive_while_idle()
-        .with_http2_keep_alive_interval(aliyun_opts.http.keep_alive_interval.0)
-        .with_timeout(aliyun_opts.http.timeout.0);
-    let retry_config = RetryConfig {
-        max_retries: aliyun_opts.retry.max_retries,
-        retry_timeout: aliyun_opts.retry.retry_timeout.0,
-        ..Default::default()
-    };
+pub fn try_new(aliyun_opts: &AliyunOptions) -> Result<OpendalStore> {
+    let http_builder = reqwest::ClientBuilder::new()
+        .pool_max_idle_per_host(aliyun_opts.http.pool_max_idle_per_host)
+        .http2_keep_alive_timeout(aliyun_opts.http.keep_alive_timeout.0)
+        .http2_keep_alive_while_idle(true)
+        .http2_keep_alive_interval(aliyun_opts.http.keep_alive_interval.0)
+        .timeout(aliyun_opts.http.timeout.0);
+    let http_client = HttpClient::build(http_builder)?;
 
     let endpoint = &aliyun_opts.endpoint;
     let bucket = &aliyun_opts.bucket;
     let endpoint = normalize_endpoint(endpoint, bucket);
-    AmazonS3Builder::new()
-        .with_virtual_hosted_style_request(true)
-        // region is not used when virtual_hosted_style is true,
-        // but is required, so dummy is used here
-        // https://github.com/apache/arrow-rs/issues/3827
-        .with_region("dummy")
-        .with_access_key_id(&aliyun_opts.key_id)
-        .with_secret_access_key(&aliyun_opts.key_secret)
-        .with_endpoint(endpoint)
-        .with_bucket_name(bucket)
-        .with_client_options(cli_opt)
-        .with_retry(retry_config)
-        .build()
+
+    let builder = Oss::default()
+        .access_key_id(&aliyun_opts.key_id)
+        .access_key_secret(&aliyun_opts.key_secret)
+        .endpoint(&endpoint)
+        .bucket(bucket)
+        .http_client(http_client);
+    let op = Operator::new(builder)?
+        .layer(
+            TimeoutLayer::new()
+                .with_timeout(aliyun_opts.timeout.timeout.0)
+                .with_io_timeout(aliyun_opts.timeout.io_timeout.0),
+        )
+        .layer(RetryLayer::new().with_max_times(aliyun_opts.max_retries))
+        .finish();
+
+    Ok(OpendalStore::new(op))
 }
 
 #[cfg(test)]
diff --git a/src/components/object_store/src/config.rs 
b/src/components/object_store/src/config.rs
index d0ecbfb0..072b9159 100644
--- a/src/components/object_store/src/config.rs
+++ b/src/components/object_store/src/config.rs
@@ -49,9 +49,7 @@ impl Default for StorageOptions {
             disk_cache_capacity: ReadableSize::gb(0),
             disk_cache_page_size: ReadableSize::mb(2),
             disk_cache_partition_bits: 4,
-            object_store: ObjectStoreOptions::Local(LocalOptions {
-                data_dir: root_path,
-            }),
+            object_store: 
ObjectStoreOptions::Local(LocalOptions::new_with_default(root_path)),
         }
     }
 }
@@ -68,6 +66,20 @@ pub enum ObjectStoreOptions {
 #[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct LocalOptions {
     pub data_dir: String,
+    #[serde(default = "default_max_retries")]
+    pub max_retries: usize,
+    #[serde(default)]
+    pub timeout: TimeoutOptions,
+}
+
+impl LocalOptions {
+    pub fn new_with_default(data_dir: String) -> Self {
+        Self {
+            data_dir,
+            max_retries: default_max_retries(),
+            timeout: Default::default(),
+        }
+    }
 }
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -77,10 +89,12 @@ pub struct AliyunOptions {
     pub endpoint: String,
     pub bucket: String,
     pub prefix: String,
+    #[serde(default = "default_max_retries")]
+    pub max_retries: usize,
     #[serde(default)]
     pub http: HttpOptions,
     #[serde(default)]
-    pub retry: RetryOptions,
+    pub timeout: TimeoutOptions,
 }
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -91,10 +105,12 @@ pub struct S3Options {
     pub endpoint: String,
     pub bucket: String,
     pub prefix: String,
+    #[serde(default = "default_max_retries")]
+    pub max_retries: usize,
     #[serde(default)]
     pub http: HttpOptions,
     #[serde(default)]
-    pub retry: RetryOptions,
+    pub timeout: TimeoutOptions,
 }
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -117,16 +133,25 @@ impl Default for HttpOptions {
 }
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct RetryOptions {
-    pub max_retries: usize,
-    pub retry_timeout: ReadableDuration,
+pub struct TimeoutOptions {
+    // Non IO Operation like stat and delete, they operate on a single file, 
we control them by
+    // setting timeout.
+    pub timeout: ReadableDuration,
+    // IO Operation like read and write, they operate on data directly, we 
control them by setting
+    // io_timeout.
+    pub io_timeout: ReadableDuration,
 }
 
-impl Default for RetryOptions {
+impl Default for TimeoutOptions {
     fn default() -> Self {
         Self {
-            max_retries: 3,
-            retry_timeout: ReadableDuration::from(Duration::from_secs(3 * 60)),
+            timeout: ReadableDuration::from(Duration::from_secs(10)),
+            io_timeout: ReadableDuration::from(Duration::from_secs(10)),
         }
     }
 }
+
+#[inline]
+fn default_max_retries() -> usize {
+    3
+}
diff --git a/src/components/object_store/src/disk_cache.rs 
b/src/components/object_store/src/disk_cache.rs
index 33ab7776..a89fd428 100644
--- a/src/components/object_store/src/disk_cache.rs
+++ b/src/components/object_store/src/disk_cache.rs
@@ -1033,10 +1033,9 @@ impl ObjectStore for DiskCacheStore {
 mod test {
     use runtime::{Builder, RuntimeRef};
     use tempfile::{tempdir, TempDir};
-    use upstream::local::LocalFileSystem;
 
     use super::*;
-    use crate::test_util::MemoryStore;
+    use crate::{local_file, test_util::MemoryStore};
 
     struct StoreWithCacheDir {
         inner: DiskCacheStore,
@@ -1334,9 +1333,10 @@ mod test {
             let page_size = 8;
             let first_create_time = {
                 let _store = {
-                    let local_path = tempdir().unwrap();
+                    let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
                     let local_store =
-                        
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+                        
Arc::new(local_file::try_new_with_default(local_path).unwrap());
+
                     DiskCacheStore::try_new(
                         cache_root_dir.clone(),
                         160,
@@ -1361,9 +1361,9 @@ mod test {
             // open again
             {
                 let _store = {
-                    let local_path = tempdir().unwrap();
+                    let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
                     let local_store =
-                        
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+                        
Arc::new(local_file::try_new_with_default(local_path).unwrap());
                     DiskCacheStore::try_new(
                         cache_root_dir.clone(),
                         160,
@@ -1387,9 +1387,8 @@ mod test {
 
             // open again, but with different page_size
             {
-                let local_path = tempdir().unwrap();
-                let local_store =
-                    
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+                let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+                let local_store = 
Arc::new(local_file::try_new_with_default(local_path).unwrap());
                 let store = DiskCacheStore::try_new(
                     cache_dir.as_ref().to_string_lossy().to_string(),
                     160,
@@ -1407,7 +1406,7 @@ mod test {
 
     #[test]
     fn test_disk_cache_recovery() {
-        let rt = Arc::new(Builder::default().build().unwrap());
+        let rt = Arc::new(Builder::default().enable_all().build().unwrap());
         rt.block_on(async {
             let cache_dir = tempdir().unwrap();
             let cache_root_dir = 
cache_dir.as_ref().to_string_lossy().to_string();
@@ -1415,9 +1414,9 @@ mod test {
             let location = Path::from("recovery.sst");
             {
                 let store = {
-                    let local_path = tempdir().unwrap();
+                    let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
                     let local_store =
-                        
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+                        
Arc::new(local_file::try_new_with_default(local_path).unwrap());
                     DiskCacheStore::try_new(
                         cache_root_dir.clone(),
                         10240,
@@ -1448,9 +1447,9 @@ mod test {
             // recover
             {
                 let store = {
-                    let local_path = tempdir().unwrap();
+                    let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
                     let local_store =
-                        
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+                        
Arc::new(local_file::try_new_with_default(local_path).unwrap());
                     DiskCacheStore::try_new(
                         cache_root_dir.clone(),
                         160,
diff --git a/src/components/object_store/src/lib.rs 
b/src/components/object_store/src/lib.rs
index 350ccfa0..4627dbae 100644
--- a/src/components/object_store/src/lib.rs
+++ b/src/components/object_store/src/lib.rs
@@ -19,25 +19,22 @@
 
 use std::sync::Arc;
 
-pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter};
-use tokio::sync::Mutex;
+pub use opendal::Error as OpenDalError;
 pub use upstream::{
-    local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error, 
GetResult, ListResult,
-    ObjectMeta, ObjectStore, PutPayloadMut,
+    path::Path, Error as ObjectStoreError, GetResult, ListResult, ObjectMeta, 
ObjectStore,
+    PutPayloadMut,
 };
 
 pub mod aliyun;
 pub mod config;
 pub mod disk_cache;
+pub mod local_file;
 pub mod mem_cache;
 pub mod metrics;
-mod multi_part;
+pub mod multi_part;
 pub mod prefix;
 pub mod s3;
 #[cfg(test)]
 pub mod test_util;
 
 pub type ObjectStoreRef = Arc<dyn ObjectStore>;
-
-// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
-pub type WriteMultipartRef = Arc<Mutex<ConcurrentMultipartUpload>>;
diff --git a/src/components/object_store/src/lib.rs 
b/src/components/object_store/src/local_file.rs
similarity index 50%
copy from src/components/object_store/src/lib.rs
copy to src/components/object_store/src/local_file.rs
index 350ccfa0..4070b004 100644
--- a/src/components/object_store/src/lib.rs
+++ b/src/components/object_store/src/local_file.rs
@@ -15,29 +15,30 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Re-export of [object_store] crate.
-
-use std::sync::Arc;
-
-pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter};
-use tokio::sync::Mutex;
-pub use upstream::{
-    local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error, 
GetResult, ListResult,
-    ObjectMeta, ObjectStore, PutPayloadMut,
+use object_store_opendal::OpendalStore;
+use opendal::{
+    layers::{RetryLayer, TimeoutLayer},
+    services::Fs,
+    Operator, Result,
 };
 
-pub mod aliyun;
-pub mod config;
-pub mod disk_cache;
-pub mod mem_cache;
-pub mod metrics;
-mod multi_part;
-pub mod prefix;
-pub mod s3;
-#[cfg(test)]
-pub mod test_util;
+use crate::config::LocalOptions;
+
+pub fn try_new(local_opts: &LocalOptions) -> Result<OpendalStore> {
+    let builder = Fs::default().root(&local_opts.data_dir);
+    let op = Operator::new(builder)?
+        .layer(
+            TimeoutLayer::new()
+                .with_timeout(local_opts.timeout.timeout.0)
+                .with_io_timeout(local_opts.timeout.io_timeout.0),
+        )
+        .layer(RetryLayer::new().with_max_times(local_opts.max_retries))
+        .finish();
 
-pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+    Ok(OpendalStore::new(op))
+}
 
-// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
-pub type WriteMultipartRef = Arc<Mutex<ConcurrentMultipartUpload>>;
+pub fn try_new_with_default(data_dir: String) -> Result<OpendalStore> {
+    let local_opts = LocalOptions::new_with_default(data_dir);
+    try_new(&local_opts)
+}
diff --git a/src/components/object_store/src/mem_cache.rs 
b/src/components/object_store/src/mem_cache.rs
index 0fa8a912..9e40fb8e 100644
--- a/src/components/object_store/src/mem_cache.rs
+++ b/src/components/object_store/src/mem_cache.rs
@@ -294,13 +294,13 @@ impl ObjectStore for MemCacheStore {
 #[cfg(test)]
 mod test {
     use tempfile::tempdir;
-    use upstream::local::LocalFileSystem;
 
     use super::*;
+    use crate::local_file;
 
     fn prepare_store(bits: usize, mem_cap: usize) -> MemCacheStore {
-        let local_path = tempdir().unwrap();
-        let local_store = 
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+        let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+        let local_store = 
Arc::new(local_file::try_new_with_default(local_path).unwrap());
 
         let mem_cache =
             Arc::new(MemCache::try_new(bits, 
NonZeroUsize::new(mem_cap).unwrap()).unwrap());
diff --git a/src/components/object_store/src/multi_part.rs 
b/src/components/object_store/src/multi_part.rs
index 871ffe2a..fb5b9dd9 100644
--- a/src/components/object_store/src/multi_part.rs
+++ b/src/components/object_store/src/multi_part.rs
@@ -28,7 +28,13 @@ use tokio::{io::AsyncWrite, sync::Mutex, task::JoinSet};
 pub use upstream::PutPayloadMut;
 use upstream::{path::Path, Error, MultipartUpload, PutPayload, PutResult};
 
-use crate::{ObjectStoreRef, WriteMultipartRef};
+use crate::ObjectStoreRef;
+
+// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
+pub type MultiUploadRef = Arc<Mutex<ConcurrentMultipartUpload>>;
+
+const CHUNK_SIZE: usize = 5 * 1024 * 1024;
+const MAX_CONCURRENCY: usize = 10;
 
 #[derive(Debug)]
 pub struct ConcurrentMultipartUpload {
@@ -113,15 +119,12 @@ impl ConcurrentMultipartUpload {
 }
 
 pub struct MultiUploadWriter {
-    pub multi_upload: WriteMultipartRef,
+    pub multi_upload: MultiUploadRef,
     upload_task: Option<BoxFuture<'static, std::result::Result<usize, 
IoError>>>,
     flush_task: Option<BoxFuture<'static, std::result::Result<(), IoError>>>,
     completion_task: Option<BoxFuture<'static, std::result::Result<(), 
IoError>>>,
 }
 
-const CHUNK_SIZE: usize = 5 * 1024 * 1024;
-const MAX_CONCURRENCY: usize = 10;
-
 impl<'a> MultiUploadWriter {
     pub async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) -> 
Result<Self, Error> {
         let upload_writer = object_store.put_multipart(location).await?;
@@ -141,7 +144,7 @@ impl<'a> MultiUploadWriter {
         Ok(multi_upload)
     }
 
-    pub fn aborter(&self) -> WriteMultipartRef {
+    pub fn aborter(&self) -> MultiUploadRef {
         self.multi_upload.clone()
     }
 }
diff --git a/src/components/object_store/src/prefix.rs 
b/src/components/object_store/src/prefix.rs
index 187b10a8..24233eeb 100644
--- a/src/components/object_store/src/prefix.rs
+++ b/src/components/object_store/src/prefix.rs
@@ -238,9 +238,9 @@ mod tests {
     use chrono::{DateTime, Utc};
     use futures::{stream, stream::StreamExt};
     use tempfile::tempdir;
-    use upstream::local::LocalFileSystem;
 
     use super::*;
+    use crate::local_file;
 
     #[derive(Debug, Clone)]
     struct PathPrefixChecker {
@@ -423,8 +423,8 @@ mod tests {
             ("/0/1/", "100/101.sst", "0/1/100/101.sst"),
         ];
 
-        let local_path = tempdir().unwrap();
-        let local_store = 
Arc::new(LocalFileSystem::new_with_prefix(local_path.path()).unwrap());
+        let local_path = 
tempdir().unwrap().as_ref().to_string_lossy().to_string();
+        let local_store = 
Arc::new(local_file::try_new_with_default(local_path).unwrap());
         for (prefix, filename, expect_loc) in cases.clone() {
             let prefix_store =
                 StoreWithPrefix::new(prefix.to_string(), 
local_store.clone()).unwrap();
diff --git a/src/components/object_store/src/s3.rs 
b/src/components/object_store/src/s3.rs
index fdbce027..2b81521f 100644
--- a/src/components/object_store/src/s3.rs
+++ b/src/components/object_store/src/s3.rs
@@ -15,34 +15,40 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use upstream::{
-    aws::{AmazonS3, AmazonS3Builder},
-    ClientOptions, RetryConfig,
+use object_store_opendal::OpendalStore;
+use opendal::{
+    layers::{RetryLayer, TimeoutLayer},
+    raw::HttpClient,
+    services::S3,
+    Operator, Result,
 };
 
 use crate::config::S3Options;
 
-pub fn try_new(s3_option: &S3Options) -> upstream::Result<AmazonS3> {
-    let cli_opt = ClientOptions::new()
-        .with_allow_http(true)
-        .with_pool_max_idle_per_host(s3_option.http.pool_max_idle_per_host)
-        .with_http2_keep_alive_timeout(s3_option.http.keep_alive_timeout.0)
-        .with_http2_keep_alive_while_idle()
-        .with_http2_keep_alive_interval(s3_option.http.keep_alive_interval.0)
-        .with_timeout(s3_option.http.timeout.0);
-    let retry_config = RetryConfig {
-        max_retries: s3_option.retry.max_retries,
-        retry_timeout: s3_option.retry.retry_timeout.0,
-        ..Default::default()
-    };
+pub fn try_new(s3_option: &S3Options) -> Result<OpendalStore> {
+    let http_builder = reqwest::ClientBuilder::new()
+        .pool_max_idle_per_host(s3_option.http.pool_max_idle_per_host)
+        .http2_keep_alive_timeout(s3_option.http.keep_alive_timeout.0)
+        .http2_keep_alive_while_idle(true)
+        .http2_keep_alive_interval(s3_option.http.keep_alive_interval.0)
+        .timeout(s3_option.http.timeout.0);
+    let http_client = HttpClient::build(http_builder)?;
 
-    AmazonS3Builder::new()
-        .with_region(&s3_option.region)
-        .with_access_key_id(&s3_option.key_id)
-        .with_secret_access_key(&s3_option.key_secret)
-        .with_endpoint(&s3_option.endpoint)
-        .with_bucket_name(&s3_option.bucket)
-        .with_client_options(cli_opt)
-        .with_retry(retry_config)
-        .build()
+    let builder = S3::default()
+        .region(&s3_option.region)
+        .access_key_id(&s3_option.key_id)
+        .secret_access_key(&s3_option.key_secret)
+        .endpoint(&s3_option.endpoint)
+        .bucket(&s3_option.bucket)
+        .http_client(http_client);
+    let op = Operator::new(builder)?
+        .layer(
+            TimeoutLayer::new()
+                .with_timeout(s3_option.timeout.timeout.0)
+                .with_io_timeout(s3_option.timeout.io_timeout.0),
+        )
+        .layer(RetryLayer::new().with_max_times(s3_option.max_retries))
+        .finish();
+
+    Ok(OpendalStore::new(op))
 }
diff --git a/src/tools/src/bin/sst-convert.rs b/src/tools/src/bin/sst-convert.rs
index c4c9935c..7c7856be 100644
--- a/src/tools/src/bin/sst-convert.rs
+++ b/src/tools/src/bin/sst-convert.rs
@@ -37,7 +37,7 @@ use common_types::{
     request_id::RequestId,
 };
 use generic_error::BoxError;
-use object_store::{LocalFileSystem, Path};
+use object_store::{config::LocalOptions, local_file, Path};
 use runtime::Runtime;
 use table_engine::predicate::Predicate;
 use tools::sst_util;
@@ -91,7 +91,12 @@ fn main() {
 }
 
 async fn run(args: Args, runtime: Arc<Runtime>) -> Result<()> {
-    let storage = 
LocalFileSystem::new_with_prefix(args.store_path).expect("invalid path");
+    let local_opts = LocalOptions {
+        data_dir: args.store_path,
+        max_retries: 3,
+        timeout: Default::default(),
+    };
+    let storage = local_file::try_new(&local_opts).expect("invalid path");
     let store = Arc::new(storage) as _;
     let input_path = Path::from(args.input);
     let sst_meta = sst_util::meta_from_sst(&store, &input_path).await;
diff --git a/src/tools/src/bin/sst-metadata.rs 
b/src/tools/src/bin/sst-metadata.rs
index bf659960..b48ca929 100644
--- a/src/tools/src/bin/sst-metadata.rs
+++ b/src/tools/src/bin/sst-metadata.rs
@@ -23,7 +23,7 @@ use analytic_engine::sst::{meta_data::cache::MetaData, 
parquet::async_reader::Ch
 use anyhow::{Context, Result};
 use clap::Parser;
 use futures::StreamExt;
-use object_store::{LocalFileSystem, ObjectMeta, ObjectStoreRef, Path};
+use object_store::{config::LocalOptions, local_file, ObjectMeta, 
ObjectStoreRef, Path};
 use parquet_ext::{meta_data::fetch_parquet_metadata, 
reader::ObjectStoreReader};
 use runtime::Runtime;
 use time_ext::format_as_ymdhms;
@@ -141,7 +141,12 @@ fn main() {
 
 async fn run(args: Args) -> Result<()> {
     let handle = Handle::current();
-    let storage = LocalFileSystem::new_with_prefix(&args.dir)?;
+    let local_opts = LocalOptions {
+        data_dir: args.dir,
+        max_retries: 3,
+        timeout: Default::default(),
+    };
+    let storage = local_file::try_new(&local_opts)?;
     let storage: ObjectStoreRef = Arc::new(storage);
 
     let mut join_set = JoinSet::new();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to