This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new b94c99e7 chore: upgrade object store version (#1541)
b94c99e7 is described below

commit b94c99e79554f8c915d3b733ed628e4f837f0183
Author: 鲍金日 <[email protected]>
AuthorDate: Mon Aug 19 14:04:41 2024 +0800

    chore: upgrade object store version (#1541)
    
    ## Rationale
    The object store version is upgraded to 0.10.1 to prepare for access to
    opendal
    
    ## Detailed Changes
    - Impl AsyncWrite for ObjectStoreMultiUpload
    - Impl MultipartUpload for ObkvMultiPartUpload
    - Adapt new api on query writing path
    
    ## Test Plan
    - Existing tests
    
    ---------
    
    Co-authored-by: jiacai2050 <[email protected]>
---
 Cargo.lock                                     |  367 +++++++--
 src/analytic_engine/src/setup.rs               |   25 -
 src/analytic_engine/src/sst/meta_data/cache.rs |    2 +-
 src/analytic_engine/src/sst/parquet/writer.rs  |  100 +--
 src/components/object_store/Cargo.toml         |    2 +-
 src/components/object_store/src/config.rs      |   35 -
 src/components/object_store/src/disk_cache.rs  |   62 +-
 src/components/object_store/src/lib.rs         |   12 +-
 src/components/object_store/src/mem_cache.rs   |   44 +-
 src/components/object_store/src/metrics.rs     |   85 +-
 src/components/object_store/src/multi_part.rs  |  221 ++++++
 src/components/object_store/src/multipart.rs   |  280 -------
 src/components/object_store/src/obkv/meta.rs   |  437 ----------
 src/components/object_store/src/obkv/mod.rs    | 1015 ------------------------
 src/components/object_store/src/obkv/util.rs   |  122 ---
 src/components/object_store/src/prefix.rs      |   94 ++-
 src/components/object_store/src/test_util.rs   |   66 +-
 src/tools/src/bin/sst-metadata.rs              |    2 +-
 18 files changed, 822 insertions(+), 2149 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c904497a..66815f7a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -797,6 +797,12 @@ dependencies = [
  "syn 2.0.48",
 ]
 
+[[package]]
+name = "atomic-waker"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
+
 [[package]]
 name = "atomic_enum"
 version = "0.2.0"
@@ -836,9 +842,9 @@ dependencies = [
  "bitflags 1.3.2",
  "bytes",
  "futures-util",
- "http",
- "http-body",
- "hyper",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
  "itoa",
  "matchit",
  "memchr",
@@ -862,8 +868,8 @@ dependencies = [
  "async-trait",
  "bytes",
  "futures-util",
- "http",
- "http-body",
+ "http 0.2.9",
+ "http-body 0.4.5",
  "mime",
  "rustversion",
  "tower-layer",
@@ -897,6 +903,12 @@ version = "0.21.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "a4a4ddaa51a5bc52a6948f74c06d20aaaddb71924eab79b8c97a8c556e942d6a"
 
+[[package]]
+name = "base64"
+version = "0.22.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
+
 [[package]]
 name = "base64ct"
 version = "1.6.0"
@@ -1333,9 +1345,9 @@ checksum = 
"baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
 
 [[package]]
 name = "chrono"
-version = "0.4.33"
+version = "0.4.38"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb"
+checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
 dependencies = [
  "android-tzdata",
  "iana-time-zone",
@@ -2494,7 +2506,7 @@ version = "0.10.4"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58"
 dependencies = [
- "http",
+ "http 0.2.9",
  "prost 0.11.8",
  "tokio",
  "tokio-stream",
@@ -2893,7 +2905,26 @@ dependencies = [
  "futures-core",
  "futures-sink",
  "futures-util",
- "http",
+ "http 0.2.9",
+ "indexmap 2.0.0",
+ "slab",
+ "tokio",
+ "tokio-util",
+ "tracing",
+]
+
+[[package]]
+name = "h2"
+version = "0.4.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab"
+dependencies = [
+ "atomic-waker",
+ "bytes",
+ "fnv",
+ "futures-core",
+ "futures-sink",
+ "http 1.1.0",
  "indexmap 2.0.0",
  "slab",
  "tokio",
@@ -2978,7 +3009,7 @@ dependencies = [
  "bitflags 1.3.2",
  "bytes",
  "headers-core",
- "http",
+ "http 0.2.9",
  "httpdate",
  "mime",
  "sha1",
@@ -2990,7 +3021,7 @@ version = "0.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
 dependencies = [
- "http",
+ "http 0.2.9",
 ]
 
 [[package]]
@@ -3047,7 +3078,7 @@ dependencies = [
  "clap",
  "lazy_static",
  "prettytable",
- "reqwest",
+ "reqwest 0.11.24",
  "serde",
  "shell-words",
  "tokio",
@@ -3112,7 +3143,7 @@ dependencies = [
  "async-trait",
  "horaedb-client",
  "local-ip-address",
- "reqwest",
+ "reqwest 0.11.24",
  "serde",
  "sqlness",
  "tokio",
@@ -3155,6 +3186,17 @@ dependencies = [
  "itoa",
 ]
 
+[[package]]
+name = "http"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
+dependencies = [
+ "bytes",
+ "fnv",
+ "itoa",
+]
+
 [[package]]
 name = "http-body"
 version = "0.4.5"
@@ -3162,7 +3204,30 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
 dependencies = [
  "bytes",
- "http",
+ "http 0.2.9",
+ "pin-project-lite",
+]
+
+[[package]]
+name = "http-body"
+version = "1.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643"
+dependencies = [
+ "bytes",
+ "http 1.1.0",
+]
+
+[[package]]
+name = "http-body-util"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f"
+dependencies = [
+ "bytes",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.0",
  "pin-project-lite",
 ]
 
@@ -3203,9 +3268,9 @@ dependencies = [
  "futures-channel",
  "futures-core",
  "futures-util",
- "h2",
- "http",
- "http-body",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
  "httparse",
  "httpdate",
  "itoa",
@@ -3217,6 +3282,26 @@ dependencies = [
  "want",
 ]
 
+[[package]]
+name = "hyper"
+version = "1.3.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "h2 0.4.5",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "httparse",
+ "itoa",
+ "pin-project-lite",
+ "smallvec",
+ "tokio",
+ "want",
+]
+
 [[package]]
 name = "hyper-rustls"
 version = "0.24.2"
@@ -3224,25 +3309,62 @@ source = 
"registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
 dependencies = [
  "futures-util",
- "http",
- "hyper",
+ "http 0.2.9",
+ "hyper 0.14.25",
  "rustls 0.21.6",
  "tokio",
  "tokio-rustls 0.24.1",
 ]
 
+[[package]]
+name = "hyper-rustls"
+version = "0.26.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
+dependencies = [
+ "futures-util",
+ "http 1.1.0",
+ "hyper 1.3.1",
+ "hyper-util",
+ "rustls 0.22.2",
+ "rustls-pki-types",
+ "tokio",
+ "tokio-rustls 0.25.0",
+ "tower-service",
+]
+
 [[package]]
 name = "hyper-timeout"
 version = "0.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
 dependencies = [
- "hyper",
+ "hyper 0.14.25",
  "pin-project-lite",
  "tokio",
  "tokio-io-timeout",
 ]
 
+[[package]]
+name = "hyper-util"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56"
+dependencies = [
+ "bytes",
+ "futures-channel",
+ "futures-util",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "hyper 1.3.1",
+ "pin-project-lite",
+ "socket2 0.5.3",
+ "tokio",
+ "tower",
+ "tower-service",
+ "tracing",
+]
+
 [[package]]
 name = "hyperloglog"
 version = "1.0.2"
@@ -3904,10 +4026,11 @@ checksum = 
"b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
 
 [[package]]
 name = "md-5"
-version = "0.10.5"
+version = "0.10.6"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca"
+checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf"
 dependencies = [
+ "cfg-if 1.0.0",
  "digest",
 ]
 
@@ -3988,7 +4111,7 @@ dependencies = [
  "logger",
  "macros",
  "prost 0.11.8",
- "reqwest",
+ "reqwest 0.11.24",
  "serde",
  "serde_json",
  "snafu 0.6.10",
@@ -4471,24 +4594,18 @@ dependencies = [
 
 [[package]]
 name = "object_store"
-version = "0.5.6"
+version = "0.8.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ec9cd6ca25e796a49fa242876d1c4de36a24a6da5258e9f0bc062dbf5e81c53b"
+checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050"
 dependencies = [
  "async-trait",
- "base64 0.21.0",
  "bytes",
  "chrono",
  "futures 0.3.28",
- "itertools 0.10.5",
+ "humantime 2.1.0",
+ "itertools 0.11.0",
  "parking_lot 0.12.1",
  "percent-encoding",
- "quick-xml 0.28.2",
- "rand 0.8.5",
- "reqwest",
- "ring 0.16.20",
- "serde",
- "serde_json",
  "snafu 0.7.4",
  "tokio",
  "tracing",
@@ -4498,18 +4615,27 @@ dependencies = [
 
 [[package]]
 name = "object_store"
-version = "0.8.0"
+version = "0.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050"
+checksum = "fbebfd32c213ba1907fa7a9c9138015a8de2b43e30c5aa45b18f7deb46786ad6"
 dependencies = [
  "async-trait",
+ "base64 0.22.1",
  "bytes",
  "chrono",
  "futures 0.3.28",
  "humantime 2.1.0",
- "itertools 0.11.0",
+ "hyper 1.3.1",
+ "itertools 0.12.0",
+ "md-5",
  "parking_lot 0.12.1",
  "percent-encoding",
+ "quick-xml 0.31.0",
+ "rand 0.8.5",
+ "reqwest 0.12.4",
+ "ring 0.17.7",
+ "serde",
+ "serde_json",
  "snafu 0.7.4",
  "tokio",
  "tracing",
@@ -4535,7 +4661,7 @@ dependencies = [
  "lru 0.7.8",
  "macros",
  "notifier",
- "object_store 0.5.6",
+ "object_store 0.10.1",
  "partitioned_lock",
  "prometheus 0.12.0",
  "prometheus-static-metric",
@@ -4576,7 +4702,7 @@ dependencies = [
  "quick-error",
  "r2d2",
  "rand 0.8.5",
- "reqwest",
+ "reqwest 0.11.24",
  "rust-crypto",
  "scheduled-thread-pool",
  "serde",
@@ -4627,6 +4753,12 @@ dependencies = [
  "tokio",
 ]
 
+[[package]]
+name = "openssl-probe"
+version = "0.1.5"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
+
 [[package]]
 name = "ordered-float"
 version = "2.10.0"
@@ -5450,7 +5582,7 @@ dependencies = [
  "futures 0.3.28",
  "generic_error",
  "horaedbproto 2.0.0",
- "http",
+ "http 0.2.9",
  "influxdb-line-protocol",
  "interpreters",
  "iox_query",
@@ -5636,9 +5768,9 @@ dependencies = [
 
 [[package]]
 name = "quick-xml"
-version = "0.28.2"
+version = "0.31.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0ce5e73202a820a31f8a0ee32ada5e21029c81fd9e3ebf668a40832e4219d9d1"
+checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
 dependencies = [
  "memchr",
  "serde",
@@ -5894,11 +6026,11 @@ dependencies = [
  "encoding_rs",
  "futures-core",
  "futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
- "hyper-rustls",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
+ "hyper-rustls 0.24.2",
  "ipnet",
  "js-sys",
  "log",
@@ -5915,6 +6047,49 @@ dependencies = [
  "system-configuration",
  "tokio",
  "tokio-rustls 0.24.1",
+ "tower-service",
+ "url",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+ "webpki-roots 0.25.4",
+ "winreg 0.50.0",
+]
+
+[[package]]
+name = "reqwest"
+version = "0.12.4"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "566cafdd92868e0939d3fb961bd0dc25fcfaaed179291093b3d43e6b3150ea10"
+dependencies = [
+ "base64 0.22.1",
+ "bytes",
+ "futures-core",
+ "futures-util",
+ "h2 0.4.5",
+ "http 1.1.0",
+ "http-body 1.0.0",
+ "http-body-util",
+ "hyper 1.3.1",
+ "hyper-rustls 0.26.0",
+ "hyper-util",
+ "ipnet",
+ "js-sys",
+ "log",
+ "mime",
+ "once_cell",
+ "percent-encoding",
+ "pin-project-lite",
+ "rustls 0.22.2",
+ "rustls-native-certs",
+ "rustls-pemfile 2.1.2",
+ "rustls-pki-types",
+ "serde",
+ "serde_json",
+ "serde_urlencoded",
+ "sync_wrapper",
+ "tokio",
+ "tokio-rustls 0.25.0",
  "tokio-util",
  "tower-service",
  "url",
@@ -5922,8 +6097,7 @@ dependencies = [
  "wasm-bindgen-futures",
  "wasm-streams",
  "web-sys",
- "webpki-roots 0.25.4",
- "winreg",
+ "winreg 0.52.0",
 ]
 
 [[package]]
@@ -6172,6 +6346,19 @@ dependencies = [
  "zeroize",
 ]
 
+[[package]]
+name = "rustls-native-certs"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
+dependencies = [
+ "openssl-probe",
+ "rustls-pemfile 2.1.2",
+ "rustls-pki-types",
+ "schannel",
+ "security-framework",
+]
+
 [[package]]
 name = "rustls-pemfile"
 version = "0.2.1"
@@ -6190,11 +6377,21 @@ dependencies = [
  "base64 0.21.0",
 ]
 
+[[package]]
+name = "rustls-pemfile"
+version = "2.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d"
+dependencies = [
+ "base64 0.22.1",
+ "rustls-pki-types",
+]
+
 [[package]]
 name = "rustls-pki-types"
-version = "1.1.0"
+version = "1.7.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a"
+checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d"
 
 [[package]]
 name = "rustls-webpki"
@@ -6267,6 +6464,15 @@ version = "0.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "ece8e78b2f38ec51c51f5d475df0a7187ba5111b2a28bdc761ee05b075d40a71"
 
+[[package]]
+name = "schannel"
+version = "0.1.23"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534"
+dependencies = [
+ "windows-sys 0.52.0",
+]
+
 [[package]]
 name = "scheduled-thread-pool"
 version = "0.2.7"
@@ -6323,6 +6529,29 @@ version = "4.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b"
 
+[[package]]
+name = "security-framework"
+version = "2.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "770452e37cad93e0a50d5abc3990d2bc351c36d0328f86cefec2f2fb206eaef6"
+dependencies = [
+ "bitflags 1.3.2",
+ "core-foundation",
+ "core-foundation-sys",
+ "libc",
+ "security-framework-sys",
+]
+
+[[package]]
+name = "security-framework-sys"
+version = "2.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7"
+dependencies = [
+ "core-foundation-sys",
+ "libc",
+]
+
 [[package]]
 name = "semver"
 version = "1.0.17"
@@ -6421,7 +6650,7 @@ dependencies = [
  "futures 0.3.28",
  "generic_error",
  "horaedbproto 2.0.0",
- "http",
+ "http 0.2.9",
  "influxdb-line-protocol",
  "interpreters",
  "lazy_static",
@@ -6649,9 +6878,9 @@ dependencies = [
 
 [[package]]
 name = "smallvec"
-version = "1.10.0"
+version = "1.13.2"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
+checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67"
 
 [[package]]
 name = "snafu"
@@ -7459,10 +7688,10 @@ dependencies = [
  "bytes",
  "futures-core",
  "futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
  "hyper-timeout",
  "percent-encoding",
  "pin-project",
@@ -7492,10 +7721,10 @@ dependencies = [
  "bytes",
  "futures-core",
  "futures-util",
- "h2",
- "http",
- "http-body",
- "hyper",
+ "h2 0.3.26",
+ "http 0.2.9",
+ "http-body 0.4.5",
+ "hyper 0.14.25",
  "hyper-timeout",
  "percent-encoding",
  "pin-project",
@@ -7713,7 +7942,7 @@ dependencies = [
  "base64 0.13.1",
  "byteorder",
  "bytes",
- "http",
+ "http 0.2.9",
  "httparse",
  "log",
  "rand 0.8.5",
@@ -7950,8 +8179,8 @@ dependencies = [
  "futures-channel",
  "futures-util",
  "headers",
- "http",
- "hyper",
+ "http 0.2.9",
+ "hyper 0.14.25",
  "log",
  "mime",
  "mime_guess",
@@ -8441,6 +8670,16 @@ dependencies = [
  "windows-sys 0.48.0",
 ]
 
+[[package]]
+name = "winreg"
+version = "0.52.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a277a57398d4bfa075df44f501a17cfdf8542d224f0d36095a2adc7aee4ef0a5"
+dependencies = [
+ "cfg-if 1.0.0",
+ "windows-sys 0.48.0",
+]
+
 [[package]]
 name = "wyz"
 version = "0.5.1"
diff --git a/src/analytic_engine/src/setup.rs b/src/analytic_engine/src/setup.rs
index 1e25fb2e..be0d9354 100644
--- a/src/analytic_engine/src/setup.rs
+++ b/src/analytic_engine/src/setup.rs
@@ -27,13 +27,11 @@ use object_store::{
     disk_cache::DiskCacheStore,
     mem_cache::{MemCache, MemCacheStore},
     metrics::StoreWithMetrics,
-    obkv,
     prefix::StoreWithPrefix,
     s3, LocalFileSystem, ObjectStoreRef,
 };
 use snafu::{ResultExt, Snafu};
 use table_engine::engine::{EngineRuntimes, TableEngineRef};
-use table_kv::obkv::ObkvImpl;
 use wal::manager::{OpenedWals, WalManagerRef};
 
 use crate::{
@@ -55,9 +53,6 @@ pub enum Error {
         source: crate::instance::engine::Error,
     },
 
-    #[snafu(display("Failed to open obkv, err:{}", source))]
-    OpenObkv { source: table_kv::obkv::Error },
-
     #[snafu(display("Failed to execute in runtime, err:{}", source))]
     RuntimeExec { source: runtime::Error },
 
@@ -214,26 +209,6 @@ fn open_storage(
                 let store_with_prefix = 
StoreWithPrefix::new(aliyun_opts.prefix, oss);
                 Arc::new(store_with_prefix.context(OpenObjectStore)?) as _
             }
-            ObjectStoreOptions::Obkv(obkv_opts) => {
-                let obkv_config = obkv_opts.client;
-                let obkv = engine_runtimes
-                    .write_runtime
-                    .spawn_blocking(move || 
ObkvImpl::new(obkv_config).context(OpenObkv))
-                    .await
-                    .context(RuntimeExec)??;
-
-                let oss: ObjectStoreRef = Arc::new(
-                    obkv::ObkvObjectStore::try_new(
-                        Arc::new(obkv),
-                        obkv_opts.shard_num,
-                        obkv_opts.part_size.0 as usize,
-                        obkv_opts.max_object_size.0 as usize,
-                        obkv_opts.upload_parallelism,
-                    )
-                    .context(OpenObjectStore)?,
-                );
-                Arc::new(StoreWithPrefix::new(obkv_opts.prefix, 
oss).context(OpenObjectStore)?) as _
-            }
             ObjectStoreOptions::S3(s3_option) => {
                 let oss: ObjectStoreRef =
                     
Arc::new(s3::try_new(&s3_option).context(OpenObjectStore)?);
diff --git a/src/analytic_engine/src/sst/meta_data/cache.rs 
b/src/analytic_engine/src/sst/meta_data/cache.rs
index 016c1cbb..8ddaf487 100644
--- a/src/analytic_engine/src/sst/meta_data/cache.rs
+++ b/src/analytic_engine/src/sst/meta_data/cache.rs
@@ -290,7 +290,7 @@ mod tests {
 
         let bytes = 
encoding::encode_sst_meta_data(custom_meta_data.clone()).unwrap();
         let meta_path = object_store::Path::from(meta_path);
-        store.put(&meta_path, bytes).await.unwrap();
+        store.put(&meta_path, bytes.into()).await.unwrap();
     }
 
     #[tokio::test]
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs 
b/src/analytic_engine/src/sst/parquet/writer.rs
index 59c68f42..5fe669c5 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -28,10 +28,9 @@ use datafusion::parquet::basic::Compression;
 use futures::StreamExt;
 use generic_error::BoxError;
 use logger::{debug, error};
-use object_store::{ObjectStoreRef, Path};
-use parquet::data_type::AsBytes;
+use object_store::{MultiUploadWriter, ObjectStore, ObjectStoreRef, Path, 
WriteMultipartRef};
 use snafu::{OptionExt, ResultExt};
-use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio::io::AsyncWrite;
 
 use crate::{
     sst::{
@@ -45,8 +44,8 @@ use crate::{
             },
         },
         writer::{
-            self, BuildParquetFilter, EncodePbData, EncodeRecordBatch, 
ExpectTimestampColumn, Io,
-            MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo, 
SstWriter, Storage,
+            BuildParquetFilter, EncodePbData, EncodeRecordBatch, 
ExpectTimestampColumn, MetaData,
+            PollRecordBatch, RecordBatchStream, Result, SstInfo, SstWriter, 
Storage,
         },
     },
     table::sst_util,
@@ -405,67 +404,24 @@ impl<'a> RecordBatchGroupWriter<'a> {
     }
 }
 
-struct ObjectStoreMultiUploadAborter<'a> {
-    location: &'a Path,
-    session_id: String,
-    object_store: &'a ObjectStoreRef,
-}
-
-impl<'a> ObjectStoreMultiUploadAborter<'a> {
-    async fn initialize_upload(
-        object_store: &'a ObjectStoreRef,
-        location: &'a Path,
-    ) -> Result<(
-        ObjectStoreMultiUploadAborter<'a>,
-        Box<dyn AsyncWrite + Unpin + Send>,
-    )> {
-        let (session_id, upload_writer) = object_store
-            .put_multipart(location)
-            .await
-            .context(Storage)?;
-        let aborter = Self {
-            location,
-            session_id,
-            object_store,
-        };
-        Ok((aborter, upload_writer))
-    }
-
-    async fn abort(self) -> Result<()> {
-        self.object_store
-            .abort_multipart(self.location, &self.session_id)
-            .await
-            .context(Storage)
-    }
-}
-
-async fn write_metadata<W>(
-    mut meta_sink: W,
+async fn write_metadata(
+    meta_sink: MultiUploadWriter,
     parquet_metadata: ParquetMetaData,
-    meta_path: &object_store::Path,
-) -> writer::Result<usize>
-where
-    W: AsyncWrite + Send + Unpin,
-{
+) -> Result<usize> {
     let buf = encode_sst_meta_data(parquet_metadata).context(EncodePbData)?;
-    let bytes = buf.as_bytes();
-    let bytes_size = bytes.len();
-    meta_sink.write_all(bytes).await.with_context(|| Io {
-        file: meta_path.clone(),
-    })?;
-
-    meta_sink.shutdown().await.with_context(|| Io {
-        file: meta_path.clone(),
-    })?;
+    let buf_size = buf.len();
+    let mut uploader = meta_sink.multi_upload.lock().await;
+    uploader.put(buf);
+    uploader.finish().await.context(Storage)?;
 
-    Ok(bytes_size)
+    Ok(buf_size)
 }
 
-async fn multi_upload_abort(path: &Path, aborter: 
ObjectStoreMultiUploadAborter<'_>) {
-    // The uploading file will be leaked if failed to abort. A repair command 
will
-    // be provided to clean up the leaked files.
-    if let Err(e) = aborter.abort().await {
-        error!("Failed to abort multi-upload for sst:{}, err:{}", path, e);
+async fn multi_upload_abort(aborter: WriteMultipartRef) {
+    // The uploading file will be leaked if failed to abort. A repair command
+    // will be provided to clean up the leaked files.
+    if let Err(e) = aborter.lock().await.abort().await {
+        error!("Failed to abort multi-upload sst, err:{}", e);
     }
 }
 
@@ -476,7 +432,7 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
         request_id: RequestId,
         meta: &MetaData,
         input: RecordBatchStream,
-    ) -> writer::Result<SstInfo> {
+    ) -> Result<SstInfo> {
         debug!(
             "Build parquet file, request_id:{}, meta:{:?}, 
num_rows_per_row_group:{}",
             request_id, meta, self.options.num_rows_per_row_group
@@ -491,8 +447,10 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
         };
         let group_writer = RecordBatchGroupWriter::new(request_id, input, 
meta, write_options);
 
-        let (aborter, sink) =
-            ObjectStoreMultiUploadAborter::initialize_upload(self.store, 
self.path).await?;
+        let sink = MultiUploadWriter::new(self.store, self.path)
+            .await
+            .context(Storage)?;
+        let aborter = sink.aborter();
 
         let meta_path = 
Path::from(sst_util::new_metadata_path(self.path.as_ref()));
 
@@ -500,19 +458,21 @@ impl<'a> SstWriter for ParquetSstWriter<'a> {
             match group_writer.write_all(sink, &meta_path).await {
                 Ok(v) => v,
                 Err(e) => {
-                    multi_upload_abort(self.path, aborter).await;
+                    multi_upload_abort(aborter).await;
                     return Err(e);
                 }
             };
         let time_range = parquet_metadata.time_range;
 
-        let (meta_aborter, meta_sink) =
-            ObjectStoreMultiUploadAborter::initialize_upload(self.store, 
&meta_path).await?;
-        let meta_size = match write_metadata(meta_sink, parquet_metadata, 
&meta_path).await {
+        let meta_sink = MultiUploadWriter::new(self.store, &meta_path)
+            .await
+            .context(Storage)?;
+        let meta_aborter = meta_sink.aborter();
+        let meta_size = match write_metadata(meta_sink, 
parquet_metadata).await {
             Ok(v) => v,
             Err(e) => {
-                multi_upload_abort(self.path, aborter).await;
-                multi_upload_abort(&meta_path, meta_aborter).await;
+                multi_upload_abort(aborter).await;
+                multi_upload_abort(meta_aborter).await;
                 return Err(e);
             }
         };
diff --git a/src/components/object_store/Cargo.toml 
b/src/components/object_store/Cargo.toml
index 66c3437e..f9221e1d 100644
--- a/src/components/object_store/Cargo.toml
+++ b/src/components/object_store/Cargo.toml
@@ -59,7 +59,7 @@ table_kv = { workspace = true }
 time_ext = { workspace = true }
 tokio = { workspace = true }
 twox-hash = "1.6"
-upstream = { package = "object_store", version = "0.5.6", features = [ "aws" ] 
}
+upstream = { package = "object_store", version = "0.10.1", features = [ "aws" 
] }
 uuid = { version = "1.3.3", features = ["v4"] }
 
 [dev-dependencies]
diff --git a/src/components/object_store/src/config.rs 
b/src/components/object_store/src/config.rs
index 0fbae62b..d0ecbfb0 100644
--- a/src/components/object_store/src/config.rs
+++ b/src/components/object_store/src/config.rs
@@ -19,7 +19,6 @@ use std::time::Duration;
 
 use serde::{Deserialize, Serialize};
 use size_ext::ReadableSize;
-use table_kv::config::ObkvConfig;
 use time_ext::ReadableDuration;
 
 #[derive(Debug, Clone, Deserialize, Serialize)]
@@ -63,7 +62,6 @@ impl Default for StorageOptions {
 pub enum ObjectStoreOptions {
     Local(LocalOptions),
     Aliyun(AliyunOptions),
-    Obkv(ObkvOptions),
     S3(S3Options),
 }
 
@@ -85,39 +83,6 @@ pub struct AliyunOptions {
     pub retry: RetryOptions,
 }
 
-#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct ObkvOptions {
-    pub prefix: String,
-    #[serde(default = "ObkvOptions::default_shard_num")]
-    pub shard_num: usize,
-    #[serde(default = "ObkvOptions::default_part_size")]
-    pub part_size: ReadableSize,
-    #[serde(default = "ObkvOptions::default_max_object_size")]
-    pub max_object_size: ReadableSize,
-    #[serde(default = "ObkvOptions::default_upload_parallelism")]
-    pub upload_parallelism: usize,
-    /// Obkv client config
-    pub client: ObkvConfig,
-}
-
-impl ObkvOptions {
-    fn default_max_object_size() -> ReadableSize {
-        ReadableSize::gb(1)
-    }
-
-    fn default_part_size() -> ReadableSize {
-        ReadableSize::mb(1)
-    }
-
-    fn default_shard_num() -> usize {
-        512
-    }
-
-    fn default_upload_parallelism() -> usize {
-        8
-    }
-}
-
 #[derive(Debug, Clone, Deserialize, Serialize)]
 pub struct S3Options {
     pub region: String,
diff --git a/src/components/object_store/src/disk_cache.rs 
b/src/components/object_store/src/disk_cache.rs
index b0c7ba13..33ab7776 100644
--- a/src/components/object_store/src/disk_cache.rs
+++ b/src/components/object_store/src/disk_cache.rs
@@ -40,12 +40,12 @@ use snafu::{ensure, Backtrace, ResultExt, Snafu};
 use time_ext;
 use tokio::{
     fs::{self, File, OpenOptions},
-    io::{AsyncReadExt, AsyncSeekExt, AsyncWrite, AsyncWriteExt},
+    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
     sync::oneshot::{self, error::RecvError, Receiver},
 };
 use upstream::{
-    path::Path, Error as ObjectStoreError, GetResult, ListResult, MultipartId, 
ObjectMeta,
-    ObjectStore, Result,
+    path::Path, Error as ObjectStoreError, GetOptions, GetResult, ListResult, 
MultipartUpload,
+    ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, 
PutResult, Result,
 };
 
 use crate::metrics::{
@@ -828,20 +828,32 @@ impl Display for DiskCacheStore {
 
 #[async_trait]
 impl ObjectStore for DiskCacheStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.underlying_store.put(location, bytes).await
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
+        self.underlying_store.put(location, payload).await
     }
 
-    async fn put_multipart(
+    async fn put_opts(
         &self,
         location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        self.underlying_store
+            .put_opts(location, payload, opts)
+            .await
+    }
+
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         self.underlying_store.put_multipart(location).await
     }
 
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         self.underlying_store
-            .abort_multipart(location, multipart_id)
+            .put_multipart_opts(location, opts)
             .await
     }
 
@@ -851,6 +863,10 @@ impl ObjectStore for DiskCacheStore {
         self.underlying_store.get(location).await
     }
 
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
+        self.underlying_store.get_opts(location, options).await
+    }
+
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
         let file_meta = self.fetch_file_meta(location).await?;
         ensure!(
@@ -987,6 +1003,8 @@ impl ObjectStore for DiskCacheStore {
             location: location.clone(),
             last_modified: file_meta.last_modified,
             size: file_meta.size,
+            e_tag: None,
+            version: None,
         })
     }
 
@@ -994,8 +1012,8 @@ impl ObjectStore for DiskCacheStore {
         self.underlying_store.delete(location).await
     }
 
-    async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
-        self.underlying_store.list(prefix).await
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
+        self.underlying_store.list(prefix)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -1068,7 +1086,7 @@ mod test {
             let location = Path::from("out_of_range_test.sst");
             let store = prepare_store(page_size, 32, 0, rt.clone()).await;
             let buf = Bytes::from_static(data);
-            store.inner.put(&location, buf.clone()).await.unwrap();
+            store.inner.put(&location, buf.into()).await.unwrap();
 
             // Read one page out of range.
             let res = store.inner.get_range(&location, 48..54).await;
@@ -1095,7 +1113,7 @@ mod test {
             for _ in 0..4 {
                 buf.extend_from_slice(data);
             }
-            store.inner.put(&location, buf.freeze()).await.unwrap();
+            store.inner.put(&location, buf.freeze().into()).await.unwrap();
 
             let testcases = vec![
                 (0..6, "a b c "),
@@ -1164,7 +1182,7 @@ mod test {
             for _ in 0..4 {
                 buf.extend_from_slice(data);
             }
-            store.inner.put(&location, buf.freeze()).await.unwrap();
+            store.inner.put(&location, buf.freeze().into()).await.unwrap();
 
             let testcases = [
                 (0..6, "a b c "),
@@ -1212,7 +1230,11 @@ mod test {
             for _ in 0..4 {
                 buf.extend_from_slice(data);
             }
-            store.inner.put(&location, buf.freeze()).await.unwrap();
+            store
+                .inner
+                .put(&location, buf.freeze().into())
+                .await
+                .unwrap();
 
             let _ = store.inner.get_range(&location, 0..16).await.unwrap();
             let _ = store.inner.get_range(&location, 16..32).await.unwrap();
@@ -1247,7 +1269,11 @@ mod test {
             for _ in 0..8 {
                 buf.extend_from_slice(data);
             }
-            store.inner.put(&location, buf.freeze()).await.unwrap();
+            store
+                .inner
+                .put(&location, buf.freeze().into())
+                .await
+                .unwrap();
             // use seahash
             // 0..16: partition 1
             // 16..32 partition 1
@@ -1409,7 +1435,7 @@ mod test {
                     buf.extend_from_slice(data);
                 }
                 let buf = buf.freeze();
-                store.put(&location, buf.clone()).await.unwrap();
+                store.put(&location, buf.clone().into()).await.unwrap();
                 let read_range = 16..100;
                 let bytes = store
                     .get_range(&location, read_range.clone())
@@ -1477,7 +1503,7 @@ mod test {
 
         // Put data into store and get it to let the cache load the data.
         store
-            .put(&test_file_path, test_file_bytes.clone())
+            .put(&test_file_path, test_file_bytes.clone().into())
             .await
             .unwrap();
 
diff --git a/src/components/object_store/src/lib.rs 
b/src/components/object_store/src/lib.rs
index 3436cdcf..350ccfa0 100644
--- a/src/components/object_store/src/lib.rs
+++ b/src/components/object_store/src/lib.rs
@@ -19,9 +19,11 @@
 
 use std::sync::Arc;
 
+pub use multi_part::{ConcurrentMultipartUpload, MultiUploadWriter};
+use tokio::sync::Mutex;
 pub use upstream::{
-    local::LocalFileSystem, path::Path, Error as ObjectStoreError, GetResult, 
ListResult,
-    ObjectMeta, ObjectStore,
+    local::LocalFileSystem, path::Path, Error as ObjectStoreError, Error, 
GetResult, ListResult,
+    ObjectMeta, ObjectStore, PutPayloadMut,
 };
 
 pub mod aliyun;
@@ -29,11 +31,13 @@ pub mod config;
 pub mod disk_cache;
 pub mod mem_cache;
 pub mod metrics;
-pub mod multipart;
-pub mod obkv;
+mod multi_part;
 pub mod prefix;
 pub mod s3;
 #[cfg(test)]
 pub mod test_util;
 
 pub type ObjectStoreRef = Arc<dyn ObjectStore>;
+
+// TODO: remove Mutex and make ConcurrentMultipartUpload thread-safe
+pub type WriteMultipartRef = Arc<Mutex<ConcurrentMultipartUpload>>;
diff --git a/src/components/object_store/src/mem_cache.rs 
b/src/components/object_store/src/mem_cache.rs
index f602eee6..0fa8a912 100644
--- a/src/components/object_store/src/mem_cache.rs
+++ b/src/components/object_store/src/mem_cache.rs
@@ -34,10 +34,9 @@ use hash_ext::{ahash::RandomState, 
build_fixed_seed_ahasher_builder};
 use macros::define_result;
 use partitioned_lock::PartitionedMutex;
 use snafu::{OptionExt, Snafu};
-use tokio::io::AsyncWrite;
 use upstream::{
-    path::Path, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
-    Result as ObjectStoreResult,
+    path::Path, GetOptions, GetResult, ListResult, MultipartUpload, 
ObjectMeta, ObjectStore,
+    PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as 
ObjectStoreResult,
 };
 
 use crate::{
@@ -219,24 +218,32 @@ impl fmt::Debug for MemCacheStore {
 
 #[async_trait]
 impl ObjectStore for MemCacheStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> 
ObjectStoreResult<()> {
-        self.underlying_store.put(location, bytes).await
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
ObjectStoreResult<PutResult> {
+        self.underlying_store.put(location, payload).await
     }
 
-    async fn put_multipart(
+    async fn put_opts(
         &self,
         location: &Path,
-    ) -> ObjectStoreResult<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> ObjectStoreResult<PutResult> {
+        self.underlying_store
+            .put_opts(location, payload, opts)
+            .await
+    }
+
+    async fn put_multipart(&self, location: &Path) -> 
ObjectStoreResult<Box<dyn MultipartUpload>> {
         self.underlying_store.put_multipart(location).await
     }
 
-    async fn abort_multipart(
+    async fn put_multipart_opts(
         &self,
         location: &Path,
-        multipart_id: &MultipartId,
-    ) -> ObjectStoreResult<()> {
+        opts: PutMultipartOpts,
+    ) -> ObjectStoreResult<Box<dyn MultipartUpload>> {
         self.underlying_store
-            .abort_multipart(location, multipart_id)
+            .put_multipart_opts(location, opts)
             .await
     }
 
@@ -247,6 +254,10 @@ impl ObjectStore for MemCacheStore {
         self.underlying_store.get(location).await
     }
 
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> 
ObjectStoreResult<GetResult> {
+        self.underlying_store.get_opts(location, options).await
+    }
+
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
ObjectStoreResult<Bytes> {
         if self.readonly_cache {
             self.get_range_with_ro_cache(location, range).await
@@ -263,11 +274,8 @@ impl ObjectStore for MemCacheStore {
         self.underlying_store.delete(location).await
     }
 
-    async fn list(
-        &self,
-        prefix: Option<&Path>,
-    ) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjectMeta>>> {
-        self.underlying_store.list(prefix).await
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, 
ObjectStoreResult<ObjectMeta>> {
+        self.underlying_store.list(prefix)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
ObjectStoreResult<ListResult> {
@@ -307,7 +315,7 @@ mod test {
         // write date
         let location = Path::from("1.sst");
         store
-            .put(&location, Bytes::from_static(&[1; 1024]))
+            .put(&location, Bytes::from_static(&[1; 1024]).into())
             .await
             .unwrap();
 
@@ -358,7 +366,7 @@ mod test {
         let store = prepare_store(2, 100);
         let location = Path::from("partition.sst");
         store
-            .put(&location, Bytes::from_static(&[1; 1024]))
+            .put(&location, Bytes::from_static(&[1; 1024]).into())
             .await
             .unwrap();
 
diff --git a/src/components/object_store/src/metrics.rs 
b/src/components/object_store/src/metrics.rs
index 8000a9ac..2847d2bf 100644
--- a/src/components/object_store/src/metrics.rs
+++ b/src/components/object_store/src/metrics.rs
@@ -27,10 +27,9 @@ use prometheus::{
 };
 use prometheus_static_metric::make_static_metric;
 use runtime::Runtime;
-use tokio::io::AsyncWrite;
 use upstream::{
-    path::Path, Error as StoreError, GetResult, ListResult, MultipartId, 
ObjectMeta, ObjectStore,
-    Result,
+    path::Path, Error as StoreError, GetOptions, GetResult, ListResult, 
MultipartUpload,
+    ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, 
PutResult, Result,
 };
 
 use crate::ObjectStoreRef;
@@ -39,9 +38,12 @@ make_static_metric! {
     pub struct ObjectStoreDurationHistogram: Histogram {
         "op" => {
             put,
+            put_opts,
             put_multipart,
+            put_multipart_opts,
             abort_multipart,
             get,
+            get_opts,
             get_range,
             get_ranges,
             head,
@@ -58,6 +60,7 @@ make_static_metric! {
     pub struct ObjectStoreThroughputHistogram: Histogram {
         "op" => {
             put,
+            put_opts,
             get_range,
             get_ranges,
         },
@@ -142,16 +145,16 @@ impl Display for StoreWithMetrics {
 
 #[async_trait]
 impl ObjectStore for StoreWithMetrics {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM.put.start_timer();
         OBJECT_STORE_THROUGHPUT_HISTOGRAM
             .put
-            .observe(bytes.len() as f64);
+            .observe(payload.content_length() as f64);
 
         let loc = location.clone();
         let store = self.store.clone();
         self.runtime
-            .spawn(async move { store.put(&loc, bytes).await })
+            .spawn(async move { store.put(&loc, payload).await })
             .await
             .map_err(|source| StoreError::Generic {
                 store: METRICS,
@@ -159,10 +162,29 @@ impl ObjectStore for StoreWithMetrics {
             })?
     }
 
-    async fn put_multipart(
+    async fn put_opts(
         &self,
         location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        let _timer = OBJECT_STORE_DURATION_HISTOGRAM.put_opts.start_timer();
+        OBJECT_STORE_THROUGHPUT_HISTOGRAM
+            .put_opts
+            .observe(payload.content_length() as f64);
+
+        let loc = location.clone();
+        let store = self.store.clone();
+        self.runtime
+            .spawn(async move { store.put_opts(&loc, payload, opts).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
+    }
+
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         let _timer = 
OBJECT_STORE_DURATION_HISTOGRAM.put_multipart.start_timer();
 
         let instant = Instant::now();
@@ -187,11 +209,35 @@ impl ObjectStore for StoreWithMetrics {
         res
     }
 
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM
-            .abort_multipart
+            .put_multipart_opts
             .start_timer();
-        self.store.abort_multipart(location, multipart_id).await
+
+        let instant = Instant::now();
+        let loc = location.clone();
+        let store = self.store.clone();
+        let res = self
+            .runtime
+            .spawn(async move { store.put_multipart_opts(&loc, opts).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?;
+
+        trace!(
+            "Object store with metrics put_multipart_opts cost:{}ms, 
location:{}, thread:{}-{:?}",
+            instant.elapsed().as_millis(),
+            location,
+            thread::current().name().unwrap_or("noname").to_string(),
+            thread::current().id()
+        );
+        res
     }
 
     async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -207,6 +253,19 @@ impl ObjectStore for StoreWithMetrics {
             })?
     }
 
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
+        let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_opts.start_timer();
+        let store = self.store.clone();
+        let loc = location.clone();
+        self.runtime
+            .spawn(async move { store.get_opts(&loc, options).await })
+            .await
+            .map_err(|source| StoreError::Generic {
+                store: METRICS,
+                source: Box::new(source),
+            })?
+    }
+
     async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM.get_range.start_timer();
 
@@ -292,9 +351,9 @@ impl ObjectStore for StoreWithMetrics {
             })?
     }
 
-    async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
         let _timer = OBJECT_STORE_DURATION_HISTOGRAM.list.start_timer();
-        self.store.list(prefix).await
+        self.store.list(prefix)
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
diff --git a/src/components/object_store/src/multi_part.rs 
b/src/components/object_store/src/multi_part.rs
new file mode 100644
index 00000000..871ffe2a
--- /dev/null
+++ b/src/components/object_store/src/multi_part.rs
@@ -0,0 +1,221 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    io::Error as IoError,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use bytes::Bytes;
+use futures::{future::BoxFuture, ready, Future, FutureExt};
+use tokio::{io::AsyncWrite, sync::Mutex, task::JoinSet};
+pub use upstream::PutPayloadMut;
+use upstream::{path::Path, Error, MultipartUpload, PutPayload, PutResult};
+
+use crate::{ObjectStoreRef, WriteMultipartRef};
+
+#[derive(Debug)]
+pub struct ConcurrentMultipartUpload {
+    upload: Box<dyn MultipartUpload>,
+
+    buffer: PutPayloadMut,
+
+    chunk_size: usize,
+
+    tasks: JoinSet<Result<(), Error>>,
+}
+
+impl ConcurrentMultipartUpload {
+    pub fn new(upload: Box<dyn MultipartUpload>, chunk_size: usize) -> Self {
+        Self {
+            upload,
+            chunk_size,
+            buffer: PutPayloadMut::new(),
+            tasks: Default::default(),
+        }
+    }
+
+    pub fn poll_tasks(
+        &mut self,
+        cx: &mut Context<'_>,
+        max_concurrency: usize,
+    ) -> Poll<Result<(), Error>> {
+        while !self.tasks.is_empty() && self.tasks.len() >= max_concurrency {
+            ready!(self.tasks.poll_join_next(cx)).unwrap()??
+        }
+        Poll::Ready(Ok(()))
+    }
+
+    fn put_part(&mut self, part: PutPayload) {
+        self.tasks.spawn(self.upload.put_part(part));
+    }
+
+    pub fn put(&mut self, mut bytes: Bytes) {
+        while !bytes.is_empty() {
+            let remaining = self.chunk_size - self.buffer.content_length();
+            if bytes.len() < remaining {
+                self.buffer.push(bytes);
+                return;
+            }
+            self.buffer.push(bytes.split_to(remaining));
+            let buffer = std::mem::take(&mut self.buffer);
+            self.put_part(buffer.into())
+        }
+    }
+
+    pub fn write(&mut self, mut buf: &[u8]) {
+        while !buf.is_empty() {
+            let remaining = self.chunk_size - self.buffer.content_length();
+            let to_read = buf.len().min(remaining);
+            self.buffer.extend_from_slice(&buf[..to_read]);
+            if to_read == remaining {
+                let buffer = std::mem::take(&mut self.buffer);
+                self.put_part(buffer.into())
+            }
+            buf = &buf[to_read..]
+        }
+    }
+
+    pub async fn flush(&mut self, max_concurrency: usize) -> Result<(), Error> 
{
+        futures::future::poll_fn(|cx| self.poll_tasks(cx, 
max_concurrency)).await
+    }
+
+    pub async fn finish(&mut self) -> Result<PutResult, Error> {
+        if !self.buffer.is_empty() {
+            let part = std::mem::take(&mut self.buffer);
+            self.put_part(part.into())
+        }
+
+        self.flush(0).await?;
+        self.upload.complete().await
+    }
+
+    pub async fn abort(&mut self) -> Result<(), Error> {
+        self.tasks.shutdown().await;
+        self.upload.abort().await
+    }
+}
+
+pub struct MultiUploadWriter {
+    pub multi_upload: WriteMultipartRef,
+    upload_task: Option<BoxFuture<'static, std::result::Result<usize, 
IoError>>>,
+    flush_task: Option<BoxFuture<'static, std::result::Result<(), IoError>>>,
+    completion_task: Option<BoxFuture<'static, std::result::Result<(), 
IoError>>>,
+}
+
+const CHUNK_SIZE: usize = 5 * 1024 * 1024;
+const MAX_CONCURRENCY: usize = 10;
+
+impl<'a> MultiUploadWriter {
+    pub async fn new(object_store: &'a ObjectStoreRef, location: &'a Path) -> 
Result<Self, Error> {
+        let upload_writer = object_store.put_multipart(location).await?;
+
+        let multi_upload = Arc::new(Mutex::new(ConcurrentMultipartUpload::new(
+            upload_writer,
+            CHUNK_SIZE,
+        )));
+
+        let multi_upload = Self {
+            multi_upload,
+            upload_task: None,
+            flush_task: None,
+            completion_task: None,
+        };
+
+        Ok(multi_upload)
+    }
+
+    pub fn aborter(&self) -> WriteMultipartRef {
+        self.multi_upload.clone()
+    }
+}
+
+impl AsyncWrite for MultiUploadWriter {
+    fn poll_write(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<Result<usize, IoError>> {
+        let multi_upload = self.multi_upload.clone();
+        let buf = buf.to_owned();
+
+        let upload_task = self.upload_task.insert(
+            async move {
+                multi_upload
+                    .lock()
+                    .await
+                    .flush(MAX_CONCURRENCY)
+                    .await
+                    .map_err(IoError::other)?;
+
+                multi_upload.lock().await.write(&buf);
+                Ok(buf.len())
+            }
+            .boxed(),
+        );
+
+        Pin::new(upload_task).poll(cx)
+    }
+
+    fn poll_flush(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<(), IoError>> {
+        let multi_upload = self.multi_upload.clone();
+
+        let flush_task = self.flush_task.insert(
+            async move {
+                multi_upload
+                    .lock()
+                    .await
+                    .flush(0)
+                    .await
+                    .map_err(IoError::other)?;
+
+                Ok(())
+            }
+            .boxed(),
+        );
+
+        Pin::new(flush_task).poll(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<(), IoError>> {
+        let multi_upload = self.multi_upload.clone();
+
+        let completion_task = self.completion_task.get_or_insert_with(|| {
+            async move {
+                multi_upload
+                    .lock()
+                    .await
+                    .finish()
+                    .await
+                    .map_err(IoError::other)?;
+
+                Ok(())
+            }
+            .boxed()
+        });
+
+        Pin::new(completion_task).poll(cx)
+    }
+}
diff --git a/src/components/object_store/src/multipart.rs 
b/src/components/object_store/src/multipart.rs
deleted file mode 100644
index cb5c7d7e..00000000
--- a/src/components/object_store/src/multipart.rs
+++ /dev/null
@@ -1,280 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Implement multipart upload of [ObjectStore](upstream::ObjectStore), and 
most
-//! of the codes are forked from 
`arrow-rs`:https://github.com/apache/arrow-rs/blob/master/object_store/src/multipart.rs
-
-use std::{io, pin::Pin, sync::Arc, task::Poll};
-
-use async_trait::async_trait;
-use futures::{stream::FuturesUnordered, Future, StreamExt};
-use tokio::io::AsyncWrite;
-use upstream::Result;
-
-type BoxedTryFuture<T> = Pin<Box<dyn Future<Output = Result<T, io::Error>> + 
Send>>;
-
-/// A trait that can be implemented by cloud-based object stores
-/// and used in combination with [`CloudMultiPartUpload`] to provide
-/// multipart upload support.
-#[async_trait]
-pub trait CloudMultiPartUploadImpl: 'static {
-    /// Upload a single part
-    async fn put_multipart_part(
-        &self,
-        buf: Vec<u8>,
-        part_idx: usize,
-    ) -> Result<UploadPart, io::Error>;
-
-    /// Complete the upload with the provided parts
-    ///
-    /// `completed_parts` is in order of part number
-    async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), 
io::Error>;
-}
-
-#[derive(Debug, Clone)]
-pub struct UploadPart {
-    pub content_id: String,
-}
-
-pub struct CloudMultiPartUpload<T>
-where
-    T: CloudMultiPartUploadImpl,
-{
-    inner: Arc<T>,
-    /// A list of completed parts, in sequential order.
-    completed_parts: Vec<Option<UploadPart>>,
-    /// Part upload tasks currently running.
-    ///
-    /// Every task uploads data with `part_size` to objectstore.
-    tasks: FuturesUnordered<BoxedTryFuture<(usize, UploadPart)>>,
-    /// Maximum number of upload tasks to run concurrently
-    max_concurrency: usize,
-    /// Buffer that will be sent in next upload.
-    ///
-    /// TODO: Maybe we can use a list of Vec<u8> to ensure every buffer is
-    /// aligned with the part_size to avoid any extra copy in the future.
-    current_buffer: Vec<u8>,
-    /// Size of a part in bytes, size of last part may be smaller than
-    /// `part_size`.
-    part_size: usize,
-    /// Index of current part
-    current_part_idx: usize,
-    /// The completion task
-    completion_task: Option<BoxedTryFuture<()>>,
-}
-
-impl<T> CloudMultiPartUpload<T>
-where
-    T: CloudMultiPartUploadImpl,
-{
-    pub fn new(inner: T, max_concurrency: usize, part_size: usize) -> Self {
-        Self {
-            inner: Arc::new(inner),
-            completed_parts: Vec::new(),
-            tasks: FuturesUnordered::new(),
-            max_concurrency,
-            current_buffer: Vec::new(),
-            part_size,
-            current_part_idx: 0,
-            completion_task: None,
-        }
-    }
-
-    pub fn poll_tasks(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Result<(), io::Error> {
-        if self.tasks.is_empty() {
-            return Ok(());
-        }
-        while let Poll::Ready(Some(res)) = self.tasks.poll_next_unpin(cx) {
-            let (part_idx, part) = res?;
-            let total_parts = self.completed_parts.len();
-            self.completed_parts
-                .resize(std::cmp::max(part_idx + 1, total_parts), None);
-            self.completed_parts[part_idx] = Some(part);
-        }
-        Ok(())
-    }
-}
-
-/// **Note: Methods in this impl are added by horaedb, not included in the
-/// `object_store` crate.**
-impl<T> CloudMultiPartUpload<T>
-where
-    T: CloudMultiPartUploadImpl + Send + Sync,
-{
-    /// Send all buffer data to object store in final stage.
-    fn final_flush_buffer(mut self: Pin<&mut Self>) {
-        while !self.current_buffer.is_empty() {
-            let size = self.part_size.min(self.current_buffer.len());
-            let out_buffer = 
self.current_buffer.drain(0..size).collect::<Vec<_>>();
-
-            self.as_mut().submit_part_upload_task(out_buffer);
-        }
-    }
-
-    /// Send buffer data to object store in write stage.
-    fn flush_buffer(mut self: Pin<&mut Self>) {
-        let part_size = self.part_size;
-
-        // We will continuously submit tasks until size of the buffer is 
smaller than
-        // `part_size`.
-        while self.current_buffer.len() >= part_size {
-            let out_buffer = 
self.current_buffer.drain(0..part_size).collect::<Vec<_>>();
-            self.as_mut().submit_part_upload_task(out_buffer);
-        }
-    }
-
-    fn submit_part_upload_task(mut self: Pin<&mut Self>, out_buffer: Vec<u8>) {
-        let inner = Arc::clone(&self.inner);
-        let part_idx = self.current_part_idx;
-        self.tasks.push(Box::pin(async move {
-            let upload_part = inner.put_multipart_part(out_buffer, 
part_idx).await?;
-
-            Ok((part_idx, upload_part))
-        }));
-        self.current_part_idx += 1;
-    }
-}
-
-/// The process of ObjectStore write multipart upload is:
-/// - Obtain a `AsyncWrite` by `ObjectStore::multi_upload` to begin multipart
-///   upload;
-/// - Write all the data parts by `AsyncWrite::poll_write`;
-/// - Call `AsyncWrite::poll_shutdown` to finish current mulipart upload;
-///
-/// The `multi_upload` is used in
-/// [`analytic_engine::sst::parquet::writer::ParquetSstWriter::write`].
-impl<T> CloudMultiPartUpload<T>
-where
-    T: CloudMultiPartUploadImpl + Send + Sync,
-{
-    /// Compared with `poll_flush` which only flushes the in-progress tasks,
-    /// `final_flush` is called during `poll_shutdown`, and will flush the
-    /// `current_buffer` along with in-progress tasks.
-    fn final_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        // Poll current tasks
-        self.as_mut().poll_tasks(cx)?;
-
-        // If current_buffer is not empty, see if it can be submitted
-        if self.tasks.len() < self.max_concurrency {
-            self.as_mut().final_flush_buffer();
-        }
-
-        self.as_mut().poll_tasks(cx)?;
-
-        // If tasks and current_buffer are empty, return Ready
-        if self.tasks.is_empty() && self.current_buffer.is_empty() {
-            Poll::Ready(Ok(()))
-        } else {
-            Poll::Pending
-        }
-    }
-}
-
-impl<T> AsyncWrite for CloudMultiPartUpload<T>
-where
-    T: CloudMultiPartUploadImpl + Send + Sync,
-{
-    fn poll_write(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-        buf: &[u8],
-    ) -> Poll<Result<usize, io::Error>> {
-        // Poll current tasks
-        self.as_mut().poll_tasks(cx)?;
-
-        // If adding buf to pending buffer would trigger send, check
-        // whether we have capacity for another task.
-        let enough_to_send = (buf.len() + self.current_buffer.len()) >= 
self.part_size;
-        // The current buffer is not enough to send.
-        if !enough_to_send {
-            self.current_buffer.extend_from_slice(buf);
-            return Poll::Ready(Ok(buf.len()));
-        }
-
-        if self.tasks.len() < self.max_concurrency {
-            // If we do, copy into the buffer and submit the task, and return 
ready.
-            self.current_buffer.extend_from_slice(buf);
-            // Flush buffer data, use custom method
-            self.as_mut().flush_buffer();
-            // We need to poll immediately after adding to setup waker
-            self.as_mut().poll_tasks(cx)?;
-
-            Poll::Ready(Ok(buf.len()))
-        } else {
-            // Waker registered by call to poll_tasks at beginning
-            Poll::Pending
-        }
-    }
-
-    fn poll_flush(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        // Poll current tasks
-        self.as_mut().poll_tasks(cx)?;
-
-        // If tasks is empty, return Ready
-        if self.tasks.is_empty() {
-            Poll::Ready(Ok(()))
-        } else {
-            Poll::Pending
-        }
-    }
-
-    fn poll_shutdown(
-        mut self: Pin<&mut Self>,
-        cx: &mut std::task::Context<'_>,
-    ) -> Poll<Result<(), io::Error>> {
-        // First, poll flush all buffer data to object store.
-        match self.as_mut().final_flush(cx) {
-            Poll::Pending => return Poll::Pending,
-            Poll::Ready(res) => res?,
-        };
-
-        // If shutdown task is not set, set it.
-        let parts = std::mem::take(&mut self.completed_parts);
-        let parts = parts
-            .into_iter()
-            .enumerate()
-            .map(|(idx, part)| {
-                part.ok_or_else(|| {
-                    io::Error::new(
-                        io::ErrorKind::Other,
-                        format!("Missing information for upload part {idx}"),
-                    )
-                })
-            })
-            .collect::<Result<_, _>>()?;
-
-        let inner = Arc::clone(&self.inner);
-        // Last, do completion task in inner.
-        let completion_task = self.completion_task.get_or_insert_with(|| {
-            Box::pin(async move {
-                inner.complete(parts).await?;
-                Ok(())
-            })
-        });
-
-        Pin::new(completion_task).poll(cx)
-    }
-}
diff --git a/src/components/object_store/src/obkv/meta.rs 
b/src/components/object_store/src/obkv/meta.rs
deleted file mode 100644
index ad97f9f6..00000000
--- a/src/components/object_store/src/obkv/meta.rs
+++ /dev/null
@@ -1,437 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{ops::Range, str, sync::Arc, time};
-
-use generic_error::{BoxError, GenericError};
-use macros::define_result;
-use serde::{Deserialize, Serialize};
-use snafu::{ensure, Backtrace, ResultExt, Snafu};
-use table_kv::{ScanContext, ScanIter, TableKv, WriteBatch, WriteContext};
-use upstream::{path::Path, Error as StoreError, Result as StoreResult};
-
-use crate::obkv::{util, OBKV};
-
-pub const HEADER: u8 = 0x00_u8;
-
-pub const SCAN_TIMEOUT_SECS: u64 = 10;
-
-pub const SCAN_BATCH_SIZE: i32 = 1000;
-
-#[derive(Debug, Snafu)]
-pub enum Error {
-    #[snafu(display("Invalid utf8 string, 
err:{source}.\nBacktrace:\n{backtrace}"))]
-    InvalidUtf8 {
-        source: std::str::Utf8Error,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("Invalid json, err:{source}, 
json:{json}.\nBacktrace:\n{backtrace}"))]
-    InvalidJson {
-        json: String,
-        source: serde_json::Error,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("Failed to encode json, 
err:{source}.\nBacktrace:\n{backtrace}"))]
-    EncodeJson {
-        source: serde_json::Error,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("Failed to save meta, location:{location}, err:{source}"))]
-    SaveMeta {
-        location: String,
-        source: GenericError,
-    },
-
-    #[snafu(display("Failed to delete meta, location:{location}, 
err:{source}"))]
-    DeleteMeta {
-        location: String,
-        source: GenericError,
-    },
-
-    #[snafu(display("Failed to read meta, location:{location}, err:{source}"))]
-    ReadMeta {
-        location: String,
-        source: GenericError,
-    },
-
-    #[snafu(display(
-        "Invalid header found, header:{header}, 
expect:{expect}.\nBacktrace:\n{backtrace}"
-    ))]
-    InvalidHeader {
-        header: u8,
-        expect: u8,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display(
-        "Out of range occurs, end:{end}, 
object_size:{object_size}.\nBacktrace:\n{backtrace}"
-    ))]
-    OutOfRange {
-        end: usize,
-        object_size: usize,
-        backtrace: Backtrace,
-    },
-}
-
-define_result!(Error);
-
-pub const OBJECT_STORE_META: &str = "obkv_object_store_meta";
-
-/// The meta info of Obkv Object
-///
-/// **WARN: Do not change the field name, may lead to breaking changes!**
-#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
-#[serde(default)]
-pub struct ObkvObjectMeta {
-    /// The full path to the object
-    #[serde(rename = "location")]
-    pub location: String,
-    /// The last modified time in ms
-    #[serde(rename = "last_modified")]
-    pub last_modified: i64,
-    /// The size in bytes of the object
-    #[serde(rename = "size")]
-    pub size: usize,
-    /// The unique identifier for the object; For Obkv, it is composed with
-    /// table_name @ path @ upload_id
-    #[serde(rename = "unique_id")]
-    pub unique_id: Option<String>,
-    /// The size in bytes of one part. Note: maybe the size of last part less
-    /// than part_size.
-    #[serde(rename = "part_size")]
-    pub part_size: usize,
-    /// The paths of multi upload parts.
-    #[serde(rename = "parts")]
-    pub parts: Vec<String>,
-    /// The version of object, Now we use the upload_id as version.
-    #[serde(rename = "version")]
-    pub version: String,
-}
-
-impl ObkvObjectMeta {
-    #[inline]
-    pub fn decode(data: &[u8]) -> Result<Self> {
-        ensure!(
-            data[0] == HEADER,
-            InvalidHeader {
-                header: data[0],
-                expect: HEADER,
-            }
-        );
-        let json = str::from_utf8(&data[1..]).context(InvalidUtf8)?;
-        serde_json::from_str(json).context(InvalidJson { json })
-    }
-
-    #[inline]
-    pub fn encode(&self) -> Result<Vec<u8>> {
-        let size = self.estimate_size_of_json();
-        let mut encode_bytes = Vec::with_capacity(size + 1);
-        encode_bytes.push(HEADER);
-        serde_json::to_writer(&mut encode_bytes, self).context(EncodeJson)?;
-        Ok(encode_bytes)
-    }
-
-    /// Estimate the json string size of ObkvObjectMeta
-    #[inline]
-    pub fn estimate_size_of_json(&self) -> usize {
-        // {}
-        let mut size = 2;
-        // size of key name, `,`, `""` and `:`
-        size += (8 + 13 + 4 + 9 + 9 + 5 + 7) + 4 * 7;
-        size += self.location.len() + 2;
-        // last_modified
-        size += 8;
-        // size
-        size += 8;
-        // unique_id
-        if let Some(id) = &self.unique_id {
-            size += id.len() + 2;
-        } else {
-            size += 4;
-        }
-        // part_size
-        size += 8;
-        // parts
-        for part in &self.parts {
-            // part.len, `""`, `:`, and `,`
-            size += part.len() + 4;
-        }
-        //{}
-        size += 2;
-        // version
-        size += self.version.len();
-        size
-    }
-
-    /// Compute the convered parts based on given range parameter
-    pub fn compute_covered_parts(&self, range: Range<usize>) -> 
Result<Option<ConveredParts>> {
-        ensure!(
-            range.end <= self.size,
-            OutOfRange {
-                end: range.end,
-                object_size: self.size,
-            }
-        );
-
-        // if the range is empty, return empty parts
-        if range.is_empty() {
-            return Ok(None);
-        }
-
-        let batch_size = self.part_size;
-        let start_index = range.start / batch_size;
-        let start_offset = range.start % batch_size;
-
-        let inclusive_end = range.end - 1;
-
-        let end_index = inclusive_end / batch_size;
-        let end_offset = inclusive_end % batch_size;
-
-        Ok(Some(ConveredParts {
-            part_keys: &self.parts[start_index..=end_index],
-            start_offset,
-            end_offset,
-        }))
-    }
-}
-
-#[derive(Debug, Clone)]
-pub struct ConveredParts<'a> {
-    /// The table kv client
-    pub part_keys: &'a [String],
-    pub start_offset: usize,
-    pub end_offset: usize,
-}
-
-#[derive(Debug, Clone)]
-pub struct MetaManager<T> {
-    /// The table kv client
-    pub client: Arc<T>,
-}
-
-impl<T: TableKv> std::fmt::Display for MetaManager<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "ObjectStore-Obkv-MetaManager({:?})", self.client)?;
-        Ok(())
-    }
-}
-
-impl<T: TableKv> MetaManager<T> {
-    pub async fn save(&self, meta: ObkvObjectMeta) -> Result<()> {
-        let mut batch = T::WriteBatch::default();
-        let encode_bytes = meta.encode()?;
-        batch.insert_or_update(meta.location.as_bytes(), &encode_bytes);
-        self.client
-            .as_ref()
-            .write(WriteContext::default(), OBJECT_STORE_META, batch)
-            .box_err()
-            .with_context(|| SaveMeta {
-                location: meta.location,
-            })?;
-        Ok(())
-    }
-
-    pub async fn read(&self, location: &Path) -> 
Result<Option<ObkvObjectMeta>> {
-        let value = self
-            .client
-            .as_ref()
-            .get(OBJECT_STORE_META, location.as_ref().as_bytes())
-            .box_err()
-            .context(ReadMeta {
-                location: location.as_ref().to_string(),
-            })?;
-
-        value.map(|v| ObkvObjectMeta::decode(&v)).transpose()
-    }
-
-    pub async fn delete(&self, meta: ObkvObjectMeta, location: &Path) -> 
Result<()> {
-        self.client
-            .as_ref()
-            .delete(OBJECT_STORE_META, location.as_ref().as_bytes())
-            .box_err()
-            .context(DeleteMeta {
-                location: meta.location,
-            })?;
-
-        Ok(())
-    }
-
-    pub async fn delete_with_version(&self, location: &Path, version: &str) -> 
Result<()> {
-        let meta_result = self.read(location).await?;
-        if let Some(meta) = meta_result {
-            if meta.version == version {
-                self.delete(meta, location).await?;
-            }
-        }
-        Ok(())
-    }
-
-    pub async fn list(&self, prefix: &Path) -> 
StoreResult<Vec<ObkvObjectMeta>, std::io::Error> {
-        let scan_context: ScanContext = ScanContext {
-            timeout: time::Duration::from_secs(SCAN_TIMEOUT_SECS),
-            batch_size: SCAN_BATCH_SIZE,
-        };
-
-        let scan_request = 
util::scan_request_with_prefix(prefix.as_ref().as_bytes());
-
-        let mut iter = self
-            .client
-            .scan(scan_context, OBJECT_STORE_META, scan_request)
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        let mut metas = vec![];
-        while iter.valid() {
-            let value = iter.value();
-            let meta = ObkvObjectMeta::decode(value).map_err(|source| 
StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-            metas.push(meta);
-            iter.next().map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        }
-        Ok(metas)
-    }
-}
-
-#[cfg(test)]
-mod test {
-
-    use std::ops::Range;
-
-    use crate::obkv::meta::ObkvObjectMeta;
-
-    #[test]
-    fn test_estimate_size() {
-        let meta = build_test_meta0();
-
-        let expect = meta.estimate_size_of_json();
-        let json = &serde_json::to_string(&meta).unwrap();
-        let real = json.len();
-        println!("expect:{expect},real:{real}");
-        assert!(expect.abs_diff(real) as f32 / (real as f32) < 0.1);
-    }
-
-    #[test]
-    fn test_compute_convered_parts() {
-        let meta = build_test_meta0();
-
-        let range1 = Range { start: 0, end: 1 };
-        let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
-        assert!(expect.part_keys.len() == 1);
-        assert!(expect.start_offset == 0);
-        assert!(expect.end_offset == 0);
-
-        let range1 = Range {
-            start: 0,
-            end: 1024,
-        };
-        let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
-        assert!(expect.part_keys.len() == 1);
-        assert!(expect.start_offset == 0);
-        assert!(expect.end_offset == 1023);
-
-        let range1 = Range {
-            start: 0,
-            end: 8190,
-        };
-        let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
-        assert!(expect.part_keys.len() == 8);
-        assert!(expect.start_offset == 0);
-        assert!(expect.end_offset == 1021);
-
-        let range1 = Range {
-            start: 1023,
-            end: 1025,
-        };
-        let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
-        assert!(expect.part_keys.len() == 2);
-        assert!(expect.start_offset == 1023);
-        assert!(expect.end_offset == 0);
-
-        let range1 = Range {
-            start: 8189,
-            end: 8190,
-        };
-        let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
-        assert!(expect.part_keys.len() == 1);
-        assert!(expect.start_offset == 1021);
-        assert!(expect.end_offset == 1021);
-
-        let range1 = Range {
-            start: 8189,
-            end: 8199,
-        };
-        let expect = meta.compute_covered_parts(range1);
-        assert!(expect.is_err());
-
-        let meta = build_test_meta1();
-        let range1 = Range {
-            start: 0,
-            end: 1024,
-        };
-        let expect = meta.compute_covered_parts(range1).unwrap().unwrap();
-        assert!(expect.part_keys.len() == 1);
-        assert!(expect.start_offset == 0);
-        assert!(expect.end_offset == 1023);
-
-        let range1 = Range { start: 0, end: 0 };
-        let expect = meta.compute_covered_parts(range1).unwrap();
-        assert!(expect.is_none());
-    }
-
-    fn build_test_meta0() -> ObkvObjectMeta {
-        ObkvObjectMeta {
-            location: 
String::from("/test/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxfdsfjlajflk"),
-            last_modified: 123456789,
-            size: 8190,
-            unique_id: Some(String::from("1245689u438uferjalfjkda")),
-            part_size: 1024,
-            parts: vec![
-                String::from("/test/xx/0"),
-                String::from("/test/xx/1"),
-                String::from("/test/xx/4"),
-                String::from("/test/xx/5"),
-                String::from("/test/xx/0"),
-                String::from("/test/xx/1"),
-                String::from("/test/xx/4"),
-                String::from("/test/xx/5"),
-            ],
-            version: String::from("123456fsdalfkassa;l;kjfaklasadffsd"),
-        }
-    }
-
-    fn build_test_meta1() -> ObkvObjectMeta {
-        ObkvObjectMeta {
-            location: 
String::from("/test/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxfdsfjlajflk"),
-            last_modified: 123456789,
-            size: 1024,
-            unique_id: Some(String::from("1245689u438uferjalfjkda")),
-            part_size: 1024,
-            parts: vec![String::from("/test/xx/0")],
-            version: String::from("123456fsdalfkassa;l;kjfaklasadffsd"),
-        }
-    }
-}
diff --git a/src/components/object_store/src/obkv/mod.rs 
b/src/components/object_store/src/obkv/mod.rs
deleted file mode 100644
index ecff9ea0..00000000
--- a/src/components/object_store/src/obkv/mod.rs
+++ /dev/null
@@ -1,1015 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::{
-    collections::HashSet,
-    hash::{Hash, Hasher},
-    ops::Range,
-    sync::{
-        atomic::{AtomicU64, Ordering},
-        Arc,
-    },
-    time,
-    time::{SystemTime, UNIX_EPOCH},
-};
-
-use async_trait::async_trait;
-use bytes::Bytes;
-use chrono::{DateTime, TimeZone, Utc};
-use futures::{
-    stream::{BoxStream, FuturesOrdered},
-    StreamExt,
-};
-use generic_error::{BoxError, GenericError};
-use logger::debug;
-use snafu::{ensure, Backtrace, ResultExt, Snafu};
-use table_kv::{ScanContext, ScanIter, TableKv, WriteBatch, WriteContext};
-use tokio::{
-    io::{AsyncWrite, AsyncWriteExt},
-    time::Instant,
-};
-use twox_hash::XxHash64;
-use upstream::{
-    path::{Path, DELIMITER},
-    Error as StoreError, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore, Result,
-};
-use uuid::Uuid;
-
-use crate::{
-    multipart::{CloudMultiPartUpload, CloudMultiPartUploadImpl, UploadPart},
-    obkv::meta::{MetaManager, ObkvObjectMeta, OBJECT_STORE_META},
-};
-
-mod meta;
-mod util;
-
-/// The object store type of obkv
-pub const OBKV: &str = "OBKV";
-
-/// Hash seed to build hasher. Modify the seed will result in different route
-/// result!
-const HASH_SEED: u64 = 0;
-
-#[derive(Debug, Snafu)]
-pub enum Error {
-    #[snafu(display("Failed to scan data, namespace:{namespace}, 
err:{source}"))]
-    ScanData {
-        namespace: String,
-        source: GenericError,
-    },
-
-    #[snafu(display("Failed to put data, path:{path}, err:{source}"))]
-    PutData { path: String, source: GenericError },
-
-    #[snafu(display("Failed to create shard table, table_name:{table_name}, 
err:{source}"))]
-    CreateShardTable {
-        table_name: String,
-        source: GenericError,
-    },
-
-    #[snafu(display("Failed to read meta, path:{path}, err:{source}"))]
-    ReadMeta { path: String, source: GenericError },
-
-    #[snafu(display("Data part is not found in part_key:{part_key}. 
\nBacktrace:\n{backtrace}"))]
-    DataPartNotFound {
-        part_key: String,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display("No meta found, path:{path}. \nBacktrace:\n{backtrace}"))]
-    MetaNotExists { path: String, backtrace: Backtrace },
-
-    #[snafu(display(
-        "Data is too large to put, size:{size}, limit:{limit}. 
\nBacktrace:\n{backtrace}"
-    ))]
-    TooLargeData {
-        size: usize,
-        limit: usize,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display(
-        "Convert timestamp to date time fail, timestamp:{timestamp}. 
\nBacktrace:\n{backtrace}"
-    ))]
-    ConvertTimestamp {
-        timestamp: i64,
-        backtrace: Backtrace,
-    },
-
-    #[snafu(display(
-        "The length of data parts is inconsistent with the length of values, 
parts length:{part_len}, values length:{value_len} \nBacktrace:\n{backtrace}"
-    ))]
-    DataPartsLength {
-        part_len: usize,
-        value_len: usize,
-        backtrace: Backtrace,
-    },
-}
-
-impl<T: TableKv> MetaManager<T> {
-    fn try_new(client: Arc<T>) -> std::result::Result<Self, Error> {
-        create_table_if_not_exists(&client, OBJECT_STORE_META)?;
-        Ok(Self { client })
-    }
-}
-
-/// If table not exists, create shard table; Else, do nothing.
-fn create_table_if_not_exists<T: TableKv>(
-    table_kv: &Arc<T>,
-    table_name: &str,
-) -> std::result::Result<(), Error> {
-    let table_exists = table_kv
-        .table_exists(table_name)
-        .box_err()
-        .context(CreateShardTable { table_name })?;
-    if !table_exists {
-        table_kv
-            .create_table(table_name)
-            .box_err()
-            .context(CreateShardTable { table_name })?;
-    }
-
-    Ok(())
-}
-
-#[derive(Debug, Clone)]
-pub struct ShardManager {
-    shard_num: usize,
-    table_names: Vec<String>,
-}
-
-impl ShardManager {
-    fn try_new<T: TableKv>(client: Arc<T>, shard_num: usize) -> 
std::result::Result<Self, Error> {
-        let mut table_names = Vec::with_capacity(shard_num);
-
-        for shard_id in 0..shard_num {
-            let table_name = format!("object_store_{shard_id}");
-            create_table_if_not_exists(&client, &table_name)?;
-            table_names.push(table_name);
-        }
-
-        Ok(Self {
-            shard_num,
-            table_names,
-        })
-    }
-
-    #[inline]
-    pub fn pick_shard_table(&self, path: &Path) -> &str {
-        let mut hasher = XxHash64::with_seed(HASH_SEED);
-        path.as_ref().as_bytes().hash(&mut hasher);
-        let hash = hasher.finish();
-        let index = hash % (self.table_names.len() as u64);
-        &self.table_names[index as usize]
-    }
-}
-
-impl std::fmt::Display for ShardManager {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "ObjectStore ObkvShardManager({})", self.shard_num)?;
-        Ok(())
-    }
-}
-
-#[derive(Debug)]
-pub struct ObkvObjectStore<T> {
-    /// The manager to manage shard table in obkv
-    shard_manager: ShardManager,
-    /// The manager to manage object store meta, which persist in obkv
-    meta_manager: Arc<MetaManager<T>>,
-    client: Arc<T>,
-    /// The size of one object part persited in obkv
-    /// It may cause problem to save huge data in one obkv value, so we
-    /// need to split data into small parts.
-    part_size: usize,
-    /// The max size of bytes, default is 1GB
-    max_object_size: usize,
-    /// Maximum number of upload tasks to run concurrently
-    max_upload_concurrency: usize,
-}
-
-impl<T: TableKv> std::fmt::Display for ObkvObjectStore<T> {
-    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(
-            f,
-            "ObkvObjectStore({:?},{:?})",
-            self.client, self.shard_manager
-        )?;
-        Ok(())
-    }
-}
-
-impl<T: TableKv> ObkvObjectStore<T> {
-    pub fn try_new(
-        client: Arc<T>,
-        shard_num: usize,
-        part_size: usize,
-        max_object_size: usize,
-        max_upload_concurrency: usize,
-    ) -> Result<Self> {
-        let shard_manager = ShardManager::try_new(client.clone(), 
shard_num).map_err(|source| {
-            StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            }
-        })?;
-        let meta_manager: MetaManager<T> =
-            MetaManager::try_new(client.clone()).map_err(|source| 
StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        Ok(Self {
-            shard_manager,
-            meta_manager: Arc::new(meta_manager),
-            client,
-            part_size,
-            max_object_size,
-            max_upload_concurrency,
-        })
-    }
-
-    #[inline]
-    fn check_size(&self, bytes: &Bytes) -> std::result::Result<(), Error> {
-        ensure!(
-            bytes.len() < self.max_object_size,
-            TooLargeData {
-                size: bytes.len(),
-                limit: self.max_object_size,
-            }
-        );
-
-        Ok(())
-    }
-
-    #[inline]
-    fn normalize_path(location: Option<&Path>) -> Path {
-        if let Some(path) = location {
-            if !path.as_ref().ends_with(DELIMITER) {
-                return Path::from(format!("{}{DELIMITER}", path.as_ref()));
-            }
-            path.clone()
-        } else {
-            Path::from("")
-        }
-    }
-
-    #[inline]
-    pub fn pick_shard_table(&self, path: &Path) -> &str {
-        self.shard_manager.pick_shard_table(path)
-    }
-}
-
-impl<T: TableKv> ObkvObjectStore<T> {
-    async fn read_meta(&self, location: &Path) -> 
std::result::Result<ObkvObjectMeta, Error> {
-        let meta = self
-            .meta_manager
-            .read(location)
-            .await
-            .box_err()
-            .context(ReadMeta {
-                path: location.as_ref().to_string(),
-            })?;
-
-        if let Some(m) = meta {
-            Ok(m)
-        } else {
-            MetaNotExists {
-                path: location.as_ref().to_string(),
-            }
-            .fail()
-        }
-    }
-
-    async fn get_internal(&self, location: &Path) -> 
std::result::Result<GetResult, Error> {
-        let meta = self.read_meta(location).await?;
-        let table_name = self.pick_shard_table(location);
-        // TODO: Let table_kv provide a api `get_batch` to avoid extra IO 
operations.
-        let mut futures = FuturesOrdered::new();
-        for part_key in meta.parts {
-            let client = self.client.clone();
-            let table_name = table_name.to_string();
-            let future = async move {
-                match client.get(&table_name, part_key.as_bytes()) {
-                    Ok(res) => Ok(Bytes::from(res.unwrap())),
-                    Err(err) => Err(StoreError::Generic {
-                        store: OBKV,
-                        source: Box::new(err),
-                    }),
-                }
-            };
-            futures.push_back(future);
-        }
-
-        let boxed = futures.boxed();
-
-        Ok(GetResult::Stream(boxed))
-    }
-
-    fn convert_datetime(&self, timestamp: i64) -> 
std::result::Result<DateTime<Utc>, Error> {
-        let timestamp_millis_opt = Utc.timestamp_millis_opt(timestamp);
-        if let Some(dt) = timestamp_millis_opt.single() {
-            Ok(dt)
-        } else {
-            ConvertTimestamp { timestamp }.fail()
-        }
-    }
-}
-
-#[async_trait]
-impl<T: TableKv> ObjectStore for ObkvObjectStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        let instant = Instant::now();
-
-        self.check_size(&bytes)
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        // Use `put_multipart` to implement `put`.
-        let (_upload_id, mut multipart) = self.put_multipart(location).await?;
-        multipart
-            .write(&bytes)
-            .await
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        // Complete stage: flush buffer data to obkv, and save meta data
-        multipart
-            .shutdown()
-            .await
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        debug!(
-            "ObkvObjectStore put operation, location:{location}, cost:{:?}",
-            instant.elapsed()
-        );
-        Ok(())
-    }
-
-    async fn put_multipart(
-        &self,
-        location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        let instant = Instant::now();
-
-        let upload_id = Uuid::new_v4().to_string();
-        let table_name = self.pick_shard_table(location);
-
-        let upload = ObkvMultiPartUpload {
-            location: location.clone(),
-            upload_id: upload_id.clone(),
-            table_name: table_name.to_string(),
-            size: AtomicU64::new(0),
-            client: Arc::clone(&self.client),
-            part_size: self.part_size,
-            meta_manager: self.meta_manager.clone(),
-        };
-        let multi_part_upload =
-            CloudMultiPartUpload::new(upload, self.max_upload_concurrency, 
self.part_size);
-
-        debug!(
-            "ObkvObjectStore put_multipart operation, location:{location}, 
table_name:{table_name}, cost:{:?}",
-            instant.elapsed()
-        );
-        Ok((upload_id, Box::new(multi_part_upload)))
-    }
-
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
-        let instant = Instant::now();
-
-        let table_name = self.pick_shard_table(location);
-
-        // Before aborting multipart, we need to delete all data parts and 
meta info.
-        // Here to delete data with path `location` and multipart_id.
-        let scan_context: ScanContext = ScanContext {
-            timeout: time::Duration::from_secs(meta::SCAN_TIMEOUT_SECS),
-            batch_size: meta::SCAN_BATCH_SIZE,
-        };
-
-        let prefix = PathKeyEncoder::part_key_prefix(location, multipart_id);
-        let scan_request = util::scan_request_with_prefix(prefix.as_bytes());
-
-        let mut iter = self
-            .client
-            .scan(scan_context, table_name, scan_request)
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        let mut keys = vec![];
-        while iter.valid() {
-            keys.push(iter.key().to_vec());
-            iter.next().map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        }
-
-        self.client
-            .delete_batch(table_name, keys)
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        // Here to delete meta with path `location` and multipart_id
-        self.meta_manager
-            .delete_with_version(location, multipart_id)
-            .await
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        debug!(
-            "ObkvObjectStore abort_multipart operation, location:{location}, 
table_name:{table_name}, cost:{:?}",
-            instant.elapsed()
-        );
-        Ok(())
-    }
-
-    async fn get(&self, location: &Path) -> Result<GetResult> {
-        let instant = Instant::now();
-        let result = self.get_internal(location).await;
-
-        debug!(
-            "ObkvObjectStore get operation, location:{location}, cost:{:?}",
-            instant.elapsed()
-        );
-        result.box_err().map_err(|source| StoreError::NotFound {
-            path: location.to_string(),
-            source,
-        })
-    }
-
-    async fn get_range(&self, location: &Path, range: Range<usize>) -> 
Result<Bytes> {
-        let instant = Instant::now();
-
-        let table_name = self.pick_shard_table(location);
-        let meta =
-            self.read_meta(location)
-                .await
-                .box_err()
-                .map_err(|source| StoreError::NotFound {
-                    path: location.to_string(),
-                    source,
-                })?;
-
-        let covered_parts = meta
-            .compute_covered_parts(range.clone())
-            .box_err()
-            .map_err(|source| StoreError::NotFound {
-                path: location.to_string(),
-                source,
-            })?;
-
-        if let Some(covered_parts) = covered_parts {
-            let mut range_buffer = Vec::with_capacity(range.end - range.start);
-            let keys: Vec<&[u8]> = covered_parts
-                .part_keys
-                .iter()
-                .map(|key| key.as_bytes())
-                .collect();
-            let values =
-                self.client
-                    .get_batch(table_name, keys)
-                    .map_err(|source| StoreError::NotFound {
-                        path: location.to_string(),
-                        source: Box::new(source),
-                    })?;
-
-            if covered_parts.part_keys.len() != values.len() {
-                DataPartsLength {
-                    part_len: covered_parts.part_keys.len(),
-                    value_len: values.len(),
-                }
-                .fail()
-                .map_err(|source| StoreError::Generic {
-                    store: OBKV,
-                    source: Box::new(source),
-                })?
-            }
-
-            for (index, (key, value)) in 
covered_parts.part_keys.iter().zip(values).enumerate() {
-                if let Some(bytes) = value {
-                    let mut begin = 0;
-                    let mut end = bytes.len() - 1;
-                    if index == 0 {
-                        begin = covered_parts.start_offset;
-                    }
-                    // the last one
-                    if index == covered_parts.part_keys.len() - 1 {
-                        end = covered_parts.end_offset;
-                    }
-                    range_buffer.extend_from_slice(&bytes[begin..=end]);
-                } else {
-                    DataPartNotFound { part_key: key }
-                        .fail()
-                        .map_err(|source| StoreError::NotFound {
-                            path: location.to_string(),
-                            source: Box::new(source),
-                        })?;
-                }
-            }
-
-            debug!("ObkvObjectStore get_range operation, location:{location}, 
table:{table_name}, cost:{:?}", instant.elapsed());
-
-            return Ok(range_buffer.into());
-        } else {
-            return Ok(Bytes::new());
-        }
-    }
-
-    /// Return the metadata for the specified location
-    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
-        let instant = Instant::now();
-
-        let meta =
-            self.read_meta(location)
-                .await
-                .box_err()
-                .map_err(|source| StoreError::NotFound {
-                    path: location.to_string(),
-                    source,
-                })?;
-
-        debug!(
-            "ObkvObjectStore head operation, location:{location}, cost:{:?}",
-            instant.elapsed()
-        );
-
-        let last_modified = self
-            .convert_datetime(meta.last_modified)
-            .box_err()
-            .map_err(|source| StoreError::NotFound {
-                path: location.to_string(),
-                source,
-            })?;
-
-        Ok(ObjectMeta {
-            location: (*location).clone(),
-            last_modified,
-            size: meta.size,
-        })
-    }
-
-    /// Delete the object at the specified location.
-    async fn delete(&self, location: &Path) -> Result<()> {
-        let instant = Instant::now();
-
-        // TODO: maybe coerruption here, should not delete data when data is 
reading.
-        let table_name = self.pick_shard_table(location);
-        let meta =
-            self.read_meta(location)
-                .await
-                .box_err()
-                .map_err(|source| StoreError::NotFound {
-                    path: location.to_string(),
-                    source,
-                })?;
-
-        // delete every part of data
-        for part in &meta.parts {
-            let key = part.as_bytes();
-            self.client
-                .delete(table_name, key)
-                .map_err(|source| StoreError::Generic {
-                    store: OBKV,
-                    source: Box::new(source),
-                })?;
-        }
-        // delete meta info
-        self.meta_manager
-            .delete(meta, location)
-            .await
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        debug!(
-            "ObkvObjectStore delete operation, location:{location}, 
table:{table_name}, cost:{:?}",
-            instant.elapsed()
-        );
-
-        Ok(())
-    }
-
-    /// List all the objects with the given prefix.
-    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a
-    /// prefix of `foo/bar/x` but not of `foo/bar_baz/x`.
-    /// TODO: Currently this method may return lots of object meta, we should
-    /// limit the count of return ojects in future. Maybe a better
-    /// implementation is to fetch and send the list results in a stream way.
-    async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
-        let instant = Instant::now();
-
-        let path = Self::normalize_path(prefix);
-        let raw_metas =
-            self.meta_manager
-                .list(&path)
-                .await
-                .map_err(|source| StoreError::Generic {
-                    store: OBKV,
-                    source: Box::new(source),
-                })?;
-
-        let mut meta_list = Vec::new();
-        for meta in raw_metas {
-            meta_list.push(Ok(ObjectMeta {
-                location: Path::from(meta.location),
-                last_modified: 
Utc.timestamp_millis_opt(meta.last_modified).unwrap(),
-                size: meta.size,
-            }));
-        }
-
-        let iter = futures::stream::iter(meta_list);
-        debug!(
-            "ObkvObjectStore list operation, prefix:{path}, cost:{:?}",
-            instant.elapsed()
-        );
-        Ok(iter.boxed())
-    }
-
-    /// List all the objects and common paths(directories) with the given
-    /// prefix. Prefixes are evaluated on a path segment basis, i.e.
-    /// `foo/bar/` is a prefix of `foo/bar/x` but not of `foo/bar_baz/x`.
-    /// see detail in: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html
-    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
-        let instant = Instant::now();
-
-        let path = Self::normalize_path(prefix);
-        let metas = self
-            .meta_manager
-            .list(&path)
-            .await
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-
-        let mut common_prefixes = HashSet::new();
-        let mut objects = Vec::new();
-        for meta in metas {
-            let location = meta.location;
-            let subfix = &location[path.as_ref().len()..];
-            if let Some(pos) = subfix.find(DELIMITER) {
-                // common_prefix endswith '/'
-                let common_prefix = &subfix[0..pos + 1];
-                common_prefixes.insert(Path::from(common_prefix));
-            } else {
-                objects.push(ObjectMeta {
-                    location: Path::from(location),
-                    last_modified: 
Utc.timestamp_millis_opt(meta.last_modified).unwrap(),
-                    size: meta.size,
-                })
-            }
-        }
-
-        let common_prefixes = Vec::from_iter(common_prefixes);
-        debug!(
-            "ObkvObjectStore list_with_delimiter operation, prefix:{path}, 
cost:{:?}",
-            instant.elapsed()
-        );
-        Ok(ListResult {
-            common_prefixes,
-            objects,
-        })
-    }
-
-    async fn copy(&self, _from: &Path, _to: &Path) -> Result<()> {
-        // TODO:
-        Err(StoreError::NotImplemented)
-    }
-
-    async fn copy_if_not_exists(&self, _from: &Path, _to: &Path) -> Result<()> 
{
-        // TODO:
-        Err(StoreError::NotImplemented)
-    }
-}
-
-struct ObkvMultiPartUpload<T> {
-    /// The full path to the object.
-    location: Path,
-    /// The id of multi upload tasks, we use this id as object version.
-    upload_id: String,
-    /// The table name of obkv to save data.
-    table_name: String,
-    /// The client of object store.
-    client: Arc<T>,
-    /// The size of object.
-    size: AtomicU64,
-    /// The size in bytes of one part. Note: maybe the size of last part less
-    /// than part_size.
-    part_size: usize,
-    /// The mananger to process meta info.
-    meta_manager: Arc<MetaManager<T>>,
-}
-
-struct PathKeyEncoder;
-
-impl PathKeyEncoder {
-    #[inline]
-    fn part_key(location: &Path, upload_id: &str, part_idx: usize) -> String {
-        format!("{location}@{upload_id}@{part_idx}")
-    }
-
-    #[inline]
-    fn part_key_prefix(location: &Path, upload_id: &str) -> String {
-        format!("{location}@{upload_id}@")
-    }
-
-    #[inline]
-    fn unique_id(table: &str, location: &Path, upload_id: &str) -> String {
-        format!("{table}@{location}@{upload_id}")
-    }
-}
-
-#[async_trait]
-impl<T: TableKv> CloudMultiPartUploadImpl for ObkvMultiPartUpload<T> {
-    async fn put_multipart_part(
-        &self,
-        buf: Vec<u8>,
-        part_idx: usize,
-    ) -> Result<UploadPart, std::io::Error> {
-        let mut batch = T::WriteBatch::default();
-        let part_key = PathKeyEncoder::part_key(&self.location, 
&self.upload_id, part_idx);
-        batch.insert(part_key.as_bytes(), buf.as_ref());
-
-        self.client
-            .write(WriteContext::default(), &self.table_name, batch)
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        // Record size of object.
-        self.size.fetch_add(buf.len() as u64, Ordering::Relaxed);
-        Ok(UploadPart {
-            content_id: part_key,
-        })
-    }
-
-    async fn complete(&self, completed_parts: Vec<UploadPart>) -> Result<(), 
std::io::Error> {
-        // We should save meta info after finish save data.
-        let mut paths = Vec::with_capacity(completed_parts.len());
-        for upload_part in completed_parts {
-            paths.push(upload_part.content_id);
-        }
-
-        let now = SystemTime::now();
-        let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went 
backwards");
-        let last_modified = since_epoch.as_millis() as i64;
-
-        let meta = ObkvObjectMeta {
-            location: self.location.as_ref().to_string(),
-            last_modified,
-            size: self.size.load(Ordering::SeqCst) as usize,
-            unique_id: Some(PathKeyEncoder::unique_id(
-                &self.table_name,
-                &self.location,
-                &self.upload_id,
-            )),
-            part_size: self.part_size,
-            parts: paths,
-            version: self.upload_id.clone(),
-        };
-
-        // Save meta info to specify obkv table.
-        // TODO: We should remove the previous object data when update object.
-        self.meta_manager
-            .save(meta)
-            .await
-            .map_err(|source| StoreError::Generic {
-                store: OBKV,
-                source: Box::new(source),
-            })?;
-        Ok(())
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use std::sync::Arc;
-
-    use bytes::Bytes;
-    use futures::StreamExt;
-    use rand::{thread_rng, Rng};
-    use runtime::{Builder, Runtime};
-    use table_kv::memory::MemoryImpl;
-    use tokio::io::AsyncWriteExt;
-    use upstream::{path::Path, ObjectStore};
-
-    use crate::obkv::ObkvObjectStore;
-
-    const TEST_PART_SIZE: usize = 1024;
-
-    fn new_runtime() -> Arc<Runtime> {
-        let runtime = Builder::default()
-            .worker_threads(4)
-            .enable_all()
-            .build()
-            .unwrap();
-
-        Arc::new(runtime)
-    }
-
-    #[test]
-    #[warn(unused_must_use)]
-    fn test_with_memory_table_kv() {
-        let runtime = new_runtime();
-        runtime.block_on(async move {
-            let random_str1 = generate_random_string(1000);
-            let input1 = random_str1.as_bytes();
-            let random_str2 = generate_random_string(1000);
-            let input2 = random_str2.as_bytes();
-
-            let oss = init_object_store();
-
-            // write data in multi part
-            let location = Path::from("test/data/1");
-            write_data(oss.clone(), &location, input1, input2).await;
-            test_list(oss.clone(), 1).await;
-
-            let mut expect = vec![];
-            expect.extend_from_slice(input1);
-            expect.extend_from_slice(input2);
-
-            test_simple_read(oss.clone(), &location, &expect).await;
-
-            test_get_range(oss.clone(), &location, &expect).await;
-
-            test_head(oss.clone(), &location).await;
-
-            // test list multi path
-            let location2 = Path::from("test/data/2");
-            write_data(oss.clone(), &location2, input1, input2).await;
-            test_list(oss.clone(), 2).await;
-
-            // test delete by path
-            oss.delete(&location).await.unwrap();
-            test_list(oss.clone(), 1).await;
-
-            // test abort multi part
-            test_abort_upload(oss.clone(), input1, input2).await;
-
-            // test put data
-            test_put_data(oss.clone()).await;
-        });
-    }
-
-    async fn test_abort_upload(
-        oss: Arc<ObkvObjectStore<MemoryImpl>>,
-        input1: &[u8],
-        input2: &[u8],
-    ) {
-        let location3 = Path::from("test/data/3");
-        let multipart_id = write_data(oss.clone(), &location3, input1, 
input2).await;
-        test_list(oss.clone(), 2).await;
-        oss.abort_multipart(&location3, &multipart_id)
-            .await
-            .unwrap();
-        test_list(oss.clone(), 1).await;
-    }
-
-    async fn test_list(oss: Arc<ObkvObjectStore<MemoryImpl>>, expect_len: 
usize) {
-        let prefix = Path::from("test/");
-        let stream = oss.list(Some(&prefix)).await.unwrap();
-        let meta_vec = stream
-            .fold(Vec::new(), |mut acc, item| async {
-                let object_meta = item.unwrap();
-                
assert!(object_meta.location.as_ref().starts_with(prefix.as_ref()));
-                acc.push(object_meta);
-                acc
-            })
-            .await;
-
-        assert_eq!(meta_vec.len(), expect_len);
-    }
-
-    async fn test_head(oss: Arc<ObkvObjectStore<MemoryImpl>>, location: &Path) 
{
-        let object_meta = oss.head(location).await.unwrap();
-        assert_eq!(object_meta.location.as_ref(), location.as_ref());
-        assert_eq!(object_meta.size, 2000);
-    }
-
-    async fn test_get_range(oss: Arc<ObkvObjectStore<MemoryImpl>>, location: 
&Path, expect: &[u8]) {
-        let get = oss
-            .get_range(
-                location,
-                std::ops::Range {
-                    start: 100,
-                    end: 200,
-                },
-            )
-            .await
-            .unwrap();
-        assert!(get.len() == 100);
-        assert_eq!(expect[100..200], get);
-
-        let bytes = oss
-            .get_range(
-                location,
-                std::ops::Range {
-                    start: 500,
-                    end: 1500,
-                },
-            )
-            .await
-            .unwrap();
-        assert!(bytes.len() == 1000);
-        assert_eq!(expect[500..1500], bytes);
-    }
-
-    async fn test_simple_read(
-        oss: Arc<ObkvObjectStore<MemoryImpl>>,
-        location: &Path,
-        expect: &[u8],
-    ) {
-        // read data
-        let get = oss.get(location).await.unwrap();
-        assert_eq!(expect, get.bytes().await.unwrap());
-    }
-
-    #[allow(clippy::unused_io_amount)]
-    async fn write_data(
-        oss: Arc<dyn ObjectStore>,
-        location: &Path,
-        input1: &[u8],
-        input2: &[u8],
-    ) -> String {
-        let (multipart_id, mut async_writer) = 
oss.put_multipart(location).await.unwrap();
-
-        async_writer.write(input1).await.unwrap();
-        async_writer.write(input2).await.unwrap();
-        async_writer.shutdown().await.unwrap();
-        multipart_id
-    }
-
-    fn init_object_store() -> Arc<ObkvObjectStore<MemoryImpl>> {
-        let table_kv = Arc::new(MemoryImpl::default());
-        let obkv_object =
-            ObkvObjectStore::try_new(table_kv, 128, TEST_PART_SIZE, 1024 * 
1024 * 1024, 8).unwrap();
-        Arc::new(obkv_object)
-    }
-
-    fn generate_random_string(length: usize) -> String {
-        let mut rng = thread_rng();
-        let chars: Vec<char> = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
-            .chars()
-            .collect();
-        (0..length)
-            .map(|_| rng.gen::<char>())
-            .map(|c| chars[(c as usize) % chars.len()])
-            .collect()
-    }
-
-    async fn test_put_data(oss: Arc<ObkvObjectStore<MemoryImpl>>) {
-        let length_vec = vec![
-            TEST_PART_SIZE - 10,
-            TEST_PART_SIZE,
-            2 * TEST_PART_SIZE,
-            4 * TEST_PART_SIZE,
-            4 * TEST_PART_SIZE + 10,
-        ];
-        for length in length_vec {
-            let location = Path::from("test/data/4");
-            let rand_str = generate_random_string(length);
-            let buffer = Bytes::from(rand_str);
-            oss.put(&location, buffer.clone()).await.unwrap();
-            let meta = oss.head(&location).await.unwrap();
-            assert_eq!(meta.location, location);
-            assert_eq!(meta.size, length);
-            let body = oss.get(&location).await.unwrap();
-            assert_eq!(buffer, body.bytes().await.unwrap());
-            let inner_meta = oss.meta_manager.read(&location).await.unwrap();
-            assert!(inner_meta.is_some());
-            if let Some(m) = inner_meta {
-                assert_eq!(m.location, location.as_ref());
-                assert_eq!(m.part_size, oss.part_size);
-                let expect_size =
-                    length / TEST_PART_SIZE + if length % TEST_PART_SIZE != 0 
{ 1 } else { 0 };
-                assert_eq!(m.parts.len(), expect_size);
-            }
-            oss.delete(&location).await.unwrap();
-        }
-    }
-}
diff --git a/src/components/object_store/src/obkv/util.rs 
b/src/components/object_store/src/obkv/util.rs
deleted file mode 100644
index 640198c4..00000000
--- a/src/components/object_store/src/obkv/util.rs
+++ /dev/null
@@ -1,122 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use table_kv::{KeyBoundary, ScanRequest};
-
-/// Generate ScanRequest with prefix
-pub fn scan_request_with_prefix(prefix_bytes: &[u8]) -> ScanRequest {
-    let mut start_key = Vec::with_capacity(prefix_bytes.len());
-    start_key.extend(prefix_bytes);
-    let start = KeyBoundary::included(start_key.as_ref());
-
-    let mut end_key = Vec::with_capacity(prefix_bytes.len());
-    end_key.extend(prefix_bytes);
-    let carry = inc_by_one(&mut end_key);
-    // Check add one operation overflow.
-    let end = if carry == 1 {
-        KeyBoundary::MaxIncluded
-    } else {
-        KeyBoundary::excluded(end_key.as_ref())
-    };
-    table_kv::ScanRequest {
-        start,
-        end,
-        reverse: false,
-    }
-}
-
-/// Increment one to the byte array, and return the carry.
-fn inc_by_one(nums: &mut [u8]) -> u8 {
-    let mut carry = 1;
-    for i in (0..nums.len()).rev() {
-        let sum = nums[i].wrapping_add(carry);
-        nums[i] = sum;
-        if sum == 0 {
-            carry = 1;
-        } else {
-            carry = 0;
-            break;
-        }
-    }
-    carry
-}
-
-#[cfg(test)]
-mod test {
-
-    use crate::obkv::util::{inc_by_one, scan_request_with_prefix};
-
-    #[test]
-    fn test_add_one() {
-        let mut case0 = vec![0xff_u8, 0xff, 0xff];
-        let case0_expect = vec![0x00, 0x00, 0x00];
-        assert_eq!(1, inc_by_one(&mut case0));
-        assert_eq!(case0, case0_expect);
-
-        let mut case1 = vec![0x00_u8, 0xff, 0xff];
-        let case1_expect = vec![0x01, 0x00, 0x00];
-        assert_eq!(0, inc_by_one(&mut case1));
-        assert_eq!(case1, case1_expect);
-
-        let mut case2 = vec![0x00_u8, 0x00, 0x00];
-        let case2_expect = vec![0x00, 0x00, 0x01];
-        assert_eq!(0, inc_by_one(&mut case2));
-        assert_eq!(case2, case2_expect);
-    }
-
-    #[test]
-    fn test_scan_request_with_prefix() {
-        let case0 = vec![0xff_u8, 0xff, 0xff];
-        let case0_expect = table_kv::ScanRequest {
-            start: table_kv::KeyBoundary::included(&case0),
-            end: table_kv::KeyBoundary::MaxIncluded,
-            reverse: false,
-        };
-        let case0_actual = scan_request_with_prefix(&case0);
-        assert_eq!(case0_expect, case0_actual);
-
-        let case1 = "abc".as_bytes();
-        let case1_expect_bytes = "abd".as_bytes();
-        let case1_expect = table_kv::ScanRequest {
-            start: table_kv::KeyBoundary::included(case1),
-            end: table_kv::KeyBoundary::excluded(case1_expect_bytes),
-            reverse: false,
-        };
-        let case1_actual = scan_request_with_prefix(case1);
-        assert_eq!(case1_expect, case1_actual);
-
-        let case2 = vec![0x00_u8, 0x00, 0x00];
-        let case2_expect_bytes = vec![0x00_u8, 0x00, 0x01];
-        let case2_expect = table_kv::ScanRequest {
-            start: table_kv::KeyBoundary::included(&case2),
-            end: table_kv::KeyBoundary::excluded(&case2_expect_bytes),
-            reverse: false,
-        };
-        let case2_actual = scan_request_with_prefix(&case2);
-        assert_eq!(case2_expect, case2_actual);
-
-        let case3 = vec![0x00_u8, 0x00, 0xff];
-        let case3_expect_bytes = vec![0x00_u8, 0x01, 0x00];
-        let case3_expect = table_kv::ScanRequest {
-            start: table_kv::KeyBoundary::included(&case3),
-            end: table_kv::KeyBoundary::excluded(&case3_expect_bytes),
-            reverse: false,
-        };
-        let case3_actual = scan_request_with_prefix(&case3);
-        assert_eq!(case3_expect, case3_actual);
-    }
-}
diff --git a/src/components/object_store/src/prefix.rs 
b/src/components/object_store/src/prefix.rs
index 42771c32..187b10a8 100644
--- a/src/components/object_store/src/prefix.rs
+++ b/src/components/object_store/src/prefix.rs
@@ -20,10 +20,10 @@ use std::{fmt::Display, ops::Range};
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::{stream::BoxStream, StreamExt};
-use tokio::io::AsyncWrite;
 use upstream::{
     path::{self, Path, DELIMITER},
-    Error, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result,
+    Error, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
+    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
 };
 
 use crate::ObjectStoreRef;
@@ -96,28 +96,55 @@ impl StoreWithPrefix {
 
 #[async_trait]
 impl ObjectStore for StoreWithPrefix {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
         let new_loc = self.add_prefix_to_loc(location);
-        self.store.put(&new_loc, bytes).await
+        self.store.put(&new_loc, payload).await
     }
 
-    async fn put_multipart(
+    async fn put_opts(
         &self,
         location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
+        payload: PutPayload,
+        opts: PutOptions,
+    ) -> Result<PutResult> {
+        let new_loc = self.add_prefix_to_loc(location);
+        self.store.put_opts(&new_loc, payload, opts).await
+    }
+
+    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         let new_loc = self.add_prefix_to_loc(location);
         self.store.put_multipart(&new_loc).await
     }
 
-    async fn abort_multipart(&self, location: &Path, multipart_id: 
&MultipartId) -> Result<()> {
+    async fn put_multipart_opts(
+        &self,
+        location: &Path,
+        opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
         let new_loc = self.add_prefix_to_loc(location);
-        self.store.abort_multipart(&new_loc, multipart_id).await
+        self.store.put_multipart_opts(&new_loc, opts).await
     }
 
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let new_loc = self.add_prefix_to_loc(location);
         let res = self.store.get(&new_loc).await?;
-        if let GetResult::File(_, _) = &res {
+        if let GetResultPayload::File(_, _) = &res.payload {
+            let err = ErrorWithMsg {
+                msg: "StoreWithPrefix doesn't support object store based on 
local file system"
+                    .to_string(),
+            };
+            return Err(Error::NotSupported {
+                source: Box::new(err),
+            });
+        }
+
+        Ok(res)
+    }
+
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
+        let new_loc = self.add_prefix_to_loc(location);
+        let res = self.store.get_opts(&new_loc, options).await?;
+        if let GetResultPayload::File(_, _) = &res.payload {
             let err = ErrorWithMsg {
                 msg: "StoreWithPrefix doesn't support object store based on 
local file system"
                     .to_string(),
@@ -154,12 +181,12 @@ impl ObjectStore for StoreWithPrefix {
         self.store.delete(&new_loc).await
     }
 
-    async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
+    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> 
{
         let objects = if let Some(loc) = prefix {
             let new_loc = self.add_prefix_to_loc(loc);
-            self.store.list(Some(&new_loc)).await?
+            self.store.list(Some(&new_loc))
         } else {
-            self.store.list(Some(&self.prefix)).await?
+            self.store.list(Some(&self.prefix))
         };
 
         let new_objects = objects.map(|mut obj| {
@@ -169,7 +196,7 @@ impl ObjectStore for StoreWithPrefix {
 
             obj
         });
-        Ok(new_objects.boxed())
+        new_objects.boxed()
     }
 
     async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -209,7 +236,7 @@ mod tests {
     use std::sync::Arc;
 
     use chrono::{DateTime, Utc};
-    use futures::stream;
+    use futures::{stream, stream::StreamExt};
     use tempfile::tempdir;
     use upstream::local::LocalFileSystem;
 
@@ -242,24 +269,29 @@ mod tests {
 
     #[async_trait]
     impl ObjectStore for MockObjectStore {
-        async fn put(&self, location: &Path, _bytes: Bytes) -> Result<()> {
+        async fn put(&self, location: &Path, _payload: PutPayload) -> 
Result<PutResult> {
             self.prefix_checker.check(location);
-            Ok(())
+            Ok(PutResult {
+                e_tag: None,
+                version: None,
+            })
         }
 
-        async fn put_multipart(
+        async fn put_opts(
             &self,
             _location: &Path,
-        ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-            todo!()
+            _payload: PutPayload,
+            _opts: PutOptions,
+        ) -> Result<PutResult> {
+            Err(Error::NotImplemented)
         }
 
-        async fn abort_multipart(
+        async fn put_multipart_opts(
             &self,
             _location: &Path,
-            _multipart_id: &MultipartId,
-        ) -> Result<()> {
-            todo!()
+            _opts: PutMultipartOpts,
+        ) -> Result<Box<dyn MultipartUpload>> {
+            Err(Error::NotImplemented)
         }
 
         async fn get(&self, location: &Path) -> Result<GetResult> {
@@ -267,6 +299,10 @@ mod tests {
             Err(Error::NotImplemented)
         }
 
+        async fn get_opts(&self, _location: &Path, _options: GetOptions) -> 
Result<GetResult> {
+            Err(Error::NotImplemented)
+        }
+
         async fn get_range(&self, location: &Path, _range: Range<usize>) -> 
Result<Bytes> {
             self.prefix_checker.check(location);
             Ok(self.content.clone())
@@ -279,6 +315,8 @@ mod tests {
                 location: location.clone(),
                 last_modified: DateTime::<Utc>::default(),
                 size: 0,
+                e_tag: None,
+                version: None,
             })
         }
 
@@ -288,7 +326,7 @@ mod tests {
             Err(Error::NotImplemented)
         }
 
-        async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
+        fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, 
Result<ObjectMeta>> {
             if let Some(loc) = prefix {
                 self.prefix_checker.check(loc);
             }
@@ -301,11 +339,13 @@ mod tests {
                     location: filepath,
                     last_modified: DateTime::<Utc>::default(),
                     size: 0,
+                    e_tag: None,
+                    version: None,
                 };
                 objects.push(Ok(object));
             }
 
-            Ok(stream::iter(objects).boxed())
+            stream::iter(objects).boxed()
         }
 
         async fn list_with_delimiter(&self, prefix: Option<&Path>) -> 
Result<ListResult> {
@@ -346,7 +386,7 @@ mod tests {
         // Ignore the result and let the `prefix_checker` in the 
`MockObjectStore` to do
         // the assertion.
         let _ = prefix_store
-            .put(&test_filepath, Bytes::from_static(b"1111"))
+            .put(&test_filepath, Bytes::from_static(b"1111").into())
             .await;
 
         let _ = prefix_store.get(&test_filepath).await;
@@ -360,8 +400,6 @@ mod tests {
 
         for meta in prefix_store
             .list(Some(&test_filepath))
-            .await
-            .unwrap()
             .collect::<Vec<_>>()
             .await
         {
diff --git a/src/components/object_store/src/test_util.rs 
b/src/components/object_store/src/test_util.rs
index 68631991..ca643e4e 100644
--- a/src/components/object_store/src/test_util.rs
+++ b/src/components/object_store/src/test_util.rs
@@ -20,8 +20,10 @@ use std::{collections::HashMap, fmt::Display, ops::Range, 
sync::RwLock};
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::{self, BoxStream};
-use tokio::io::AsyncWrite;
-use upstream::{path::Path, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore, Result};
+use upstream::{
+    path::Path, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartUpload, ObjectMeta,
+    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
+};
 
 #[derive(Debug)]
 struct StoreError {
@@ -64,19 +66,33 @@ impl MemoryStore {
 
 #[async_trait]
 impl ObjectStore for MemoryStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, payload: PutPayload) -> 
Result<PutResult> {
         let mut files = self.files.write().unwrap();
-        files.insert(location.clone(), bytes);
-        Ok(())
+        files.insert(location.clone(), Bytes::from(payload));
+        Ok(PutResult {
+            e_tag: None,
+            version: None,
+        })
     }
 
     async fn get(&self, location: &Path) -> Result<GetResult> {
         let files = self.files.read().unwrap();
         if let Some(bs) = files.get(location) {
             let bs = bs.clone();
-            Ok(GetResult::Stream(Box::pin(stream::once(
-                async move { Ok(bs) },
-            ))))
+            let size = bs.len();
+            let payload = GetResultPayload::Stream(Box::pin(stream::once(async 
move { Ok(bs) })));
+            Ok(GetResult {
+                payload,
+                meta: ObjectMeta {
+                    location: location.clone(),
+                    last_modified: Default::default(),
+                    size,
+                    e_tag: None,
+                    version: None,
+                },
+                range: Default::default(),
+                attributes: Default::default(),
+            })
         } else {
             let source = Box::new(StoreError {
                 msg: "not found".to_string(),
@@ -120,7 +136,9 @@ impl ObjectStore for MemoryStore {
             Ok(ObjectMeta {
                 location: location.clone(),
                 size: bs.len(),
+                e_tag: None,
                 last_modified: Default::default(),
+                version: None,
             })
         } else {
             let source = Box::new(StoreError {
@@ -134,14 +152,7 @@ impl ObjectStore for MemoryStore {
         }
     }
 
-    async fn put_multipart(
-        &self,
-        _location: &Path,
-    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
-        unimplemented!()
-    }
-
-    async fn abort_multipart(&self, _location: &Path, _multipart_id: 
&MultipartId) -> Result<()> {
+    async fn put_multipart(&self, _location: &Path) -> Result<Box<dyn 
MultipartUpload>> {
         unimplemented!()
     }
 
@@ -149,7 +160,7 @@ impl ObjectStore for MemoryStore {
         unimplemented!()
     }
 
-    async fn list(&self, _prefix: Option<&Path>) -> Result<BoxStream<'_, 
Result<ObjectMeta>>> {
+    fn list(&self, _prefix: Option<&Path>) -> BoxStream<'_, 
Result<ObjectMeta>> {
         unimplemented!()
     }
 
@@ -172,4 +183,25 @@ impl ObjectStore for MemoryStore {
     async fn rename_if_not_exists(&self, _from: &Path, _to: &Path) -> 
Result<()> {
         unimplemented!()
     }
+
+    async fn put_opts(
+        &self,
+        _location: &Path,
+        _payload: PutPayload,
+        _opts: PutOptions,
+    ) -> Result<PutResult> {
+        unimplemented!()
+    }
+
+    async fn put_multipart_opts(
+        &self,
+        _location: &Path,
+        _opts: PutMultipartOpts,
+    ) -> Result<Box<dyn MultipartUpload>> {
+        unimplemented!()
+    }
+
+    async fn get_opts(&self, _location: &Path, _options: GetOptions) -> 
Result<GetResult> {
+        unimplemented!()
+    }
 }
diff --git a/src/tools/src/bin/sst-metadata.rs 
b/src/tools/src/bin/sst-metadata.rs
index 4199ba8d..bf659960 100644
--- a/src/tools/src/bin/sst-metadata.rs
+++ b/src/tools/src/bin/sst-metadata.rs
@@ -145,7 +145,7 @@ async fn run(args: Args) -> Result<()> {
     let storage: ObjectStoreRef = Arc::new(storage);
 
     let mut join_set = JoinSet::new();
-    let mut ssts = storage.list(None).await?;
+    let mut ssts = storage.list(None);
     let verbose = args.verbose;
     let page_indexes = args.page_indexes;
     while let Some(object_meta) = ssts.next().await {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to