This is an automated email from the ASF dual-hosted git repository.

jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 1d7c5499 feat: add merge stream (#1595)
1d7c5499 is described below

commit 1d7c549971df3e5fdf147c48367dd7552e637954
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Nov 20 17:11:01 2024 +0800

    feat: add merge stream (#1595)
    
    ## Rationale
    
    Part of Metric Engine.
    
    ## Detailed Changes
    - Scan SSTs in parallel based on segment
    - Sort SST using SortPreservingMergeExec, which is more efficient than
    SortExec
    - Add MergeExec to dedup record batch based on sorted batches, currently
    only `overwrite` semantics is supported.
    
    ## Test Plan
    
    Add two new UT.
---
 horaedb/Cargo.lock                                 | 120 ++++---
 horaedb/Cargo.toml                                 |   4 +-
 horaedb/metric_engine/Cargo.toml                   |   4 +
 horaedb/metric_engine/src/lib.rs                   |   3 +
 horaedb/metric_engine/src/{lib.rs => macros.rs}    |  23 +-
 horaedb/metric_engine/src/read.rs                  | 283 ++++++++++++++-
 horaedb/metric_engine/src/storage.rs               | 394 +++++++++++++++------
 horaedb/metric_engine/src/{lib.rs => test_util.rs} |  30 +-
 horaedb/metric_engine/src/types.rs                 |  14 +
 9 files changed, 704 insertions(+), 171 deletions(-)

diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 64912637..4c8aac43 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -421,9 +421,9 @@ dependencies = [
 
 [[package]]
 name = "brotli"
-version = "6.0.0"
+version = "7.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
+checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
 dependencies = [
  "alloc-no-stdlib",
  "alloc-stdlib",
@@ -649,9 +649,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3e8053b4cedc24eb158e4c041b38cfa0677ef5f4a7ccaa31ee5dcad61dd7aa54"
+checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05"
 dependencies = [
  "ahash",
  "arrow",
@@ -706,9 +706,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-catalog"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7d95efedb3a32f6f74df5bb8fda7b69fb9babe80e92137f25de6ddb15e8e8801"
+checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560"
 dependencies = [
  "arrow-schema",
  "async-trait",
@@ -721,9 +721,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b7d766e0d3dec01a0ab70b1b31678c286cddc0bd7afc9bd82504a1d9a70a7d94"
+checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c"
 dependencies = [
  "ahash",
  "arrow",
@@ -733,6 +733,7 @@ dependencies = [
  "chrono",
  "half",
  "hashbrown",
+ "indexmap",
  "instant",
  "libc",
  "num_cpus",
@@ -745,9 +746,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-common-runtime"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "4e55db6df319f9e7cf366d0d4ffae793c863823421b2f2b7314a0fefd8e8c11a"
+checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685"
 dependencies = [
  "log",
  "tokio",
@@ -755,9 +756,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-execution"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6d0c6dc013f955c382438a78fa3de8b0a8bf7b1a4cda5bc46335fe445ff3ff1a"
+checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361"
 dependencies = [
  "arrow",
  "chrono",
@@ -776,9 +777,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "7f31405c0bb854451d755b224d41dc466a8f7fd36f8c041c29d2d8432bd0c08c"
+checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246"
 dependencies = [
  "ahash",
  "arrow",
@@ -788,7 +789,9 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr-common",
  "datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr-common",
+ "indexmap",
  "paste",
  "serde_json",
  "sqlparser",
@@ -798,20 +801,21 @@ dependencies = [
 
 [[package]]
 name = "datafusion-expr-common"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ebc8266b6627c8264c87bc7c82564e3d89ed5f0f9943b49a30dac1f1ac12e4c0"
+checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2"
 dependencies = [
  "arrow",
  "datafusion-common",
+ "itertools 0.13.0",
  "paste",
 ]
 
 [[package]]
 name = "datafusion-functions"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5712668780bc43666ecd10acd188b7df58e2a5501d4dbbd972bf209f1790138b"
+checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162"
 dependencies = [
  "arrow",
  "arrow-buffer",
@@ -836,9 +840,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-aggregate"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8ec138af6b7482fb726f1bfeec010fc063b9614594c36a1051a4d3b365ba6a5f"
+checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741"
 dependencies = [
  "ahash",
  "arrow",
@@ -850,16 +854,16 @@ dependencies = [
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "half",
+ "indexmap",
  "log",
  "paste",
- "sqlparser",
 ]
 
 [[package]]
 name = "datafusion-functions-aggregate-common"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "564499c6bdd3ab9f76c7ad727e858bc6791e4de6c1a484d21d2bf49daaa658d6"
+checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143"
 dependencies = [
  "ahash",
  "arrow",
@@ -871,9 +875,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-nested"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9b55ea2221ae1c1e37d524f8330f763dcdc205edb74fe5f54cbdea475c17fd18"
+checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317"
 dependencies = [
  "arrow",
  "arrow-array",
@@ -894,21 +898,34 @@ dependencies = [
 
 [[package]]
 name = "datafusion-functions-window"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "6932996c4407ee1ebf23ffd706e982729cb9b6f7a31a281abac51fe524c3a049"
+checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe"
 dependencies = [
  "datafusion-common",
  "datafusion-expr",
+ "datafusion-functions-window-common",
+ "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "log",
+ "paste",
+]
+
+[[package]]
+name = "datafusion-functions-window-common"
+version = "43.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b"
+dependencies = [
+ "datafusion-common",
+ "datafusion-physical-expr-common",
 ]
 
 [[package]]
 name = "datafusion-optimizer"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "f7d8afa1eb44e2f00cc8d82b88803e456a681474b8877ceecc04e9517d5c843c"
+checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa"
 dependencies = [
  "arrow",
  "async-trait",
@@ -926,9 +943,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-expr"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "570666d84df483473626fab4e69997d048b40d0e7c67c540299714f244d99e73"
+checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335"
 dependencies = [
  "ahash",
  "arrow",
@@ -937,30 +954,26 @@ dependencies = [
  "arrow-ord",
  "arrow-schema",
  "arrow-string",
- "base64",
  "chrono",
  "datafusion-common",
- "datafusion-execution",
  "datafusion-expr",
  "datafusion-expr-common",
  "datafusion-functions-aggregate-common",
  "datafusion-physical-expr-common",
  "half",
  "hashbrown",
- "hex",
  "indexmap",
  "itertools 0.13.0",
  "log",
  "paste",
  "petgraph",
- "regex",
 ]
 
 [[package]]
 name = "datafusion-physical-expr-common"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "3746cbdfb32d67399dcaad17042e419ac6da454a7e38ff098aa2fbf0a7388982"
+checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b"
 dependencies = [
  "ahash",
  "arrow",
@@ -972,13 +985,15 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-optimizer"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "696f06e79d44f7c50f57cea23493881d86d9d9647884d38ce467c7f75c13e286"
+checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe"
 dependencies = [
+ "arrow",
  "arrow-schema",
  "datafusion-common",
  "datafusion-execution",
+ "datafusion-expr-common",
  "datafusion-physical-expr",
  "datafusion-physical-plan",
  "itertools 0.13.0",
@@ -986,9 +1001,9 @@ dependencies = [
 
 [[package]]
 name = "datafusion-physical-plan"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "04e1d084224023e09cdea14d01ded0f2092c319c7b4594ebc821283b9c7c4a35"
+checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24"
 dependencies = [
  "ahash",
  "arrow",
@@ -1002,8 +1017,8 @@ dependencies = [
  "datafusion-common-runtime",
  "datafusion-execution",
  "datafusion-expr",
- "datafusion-functions-aggregate",
  "datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "futures",
@@ -1021,15 +1036,16 @@ dependencies = [
 
 [[package]]
 name = "datafusion-sql"
-version = "42.1.0"
+version = "43.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c105148357dcbd9e4c97eada2930a59f7923215461d9f47de6e76edd60eab2d5"
+checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51"
 dependencies = [
  "arrow",
  "arrow-array",
  "arrow-schema",
  "datafusion-common",
  "datafusion-expr",
+ "indexmap",
  "log",
  "regex",
  "sqlparser",
@@ -1538,6 +1554,7 @@ version = "2.0.0"
 dependencies = [
  "anyhow",
  "arrow",
+ "arrow-schema",
  "async-trait",
  "bytes",
  "datafusion",
@@ -1549,6 +1566,7 @@ dependencies = [
  "parquet",
  "pb_types",
  "prost",
+ "temp-dir",
  "thiserror",
  "tokio",
 ]
@@ -1750,9 +1768,9 @@ dependencies = [
 
 [[package]]
 name = "parquet"
-version = "53.1.0"
+version = "53.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e"
+checksum = "dea02606ba6f5e856561d8d507dba8bac060aefca2a6c0f1aa1d361fed91ff3e"
 dependencies = [
  "ahash",
  "arrow-array",
@@ -2231,9 +2249,9 @@ dependencies = [
 
 [[package]]
 name = "sqlparser"
-version = "0.50.0"
+version = "0.51.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac"
+checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
 dependencies = [
  "log",
  "sqlparser_derive",
@@ -2295,6 +2313,12 @@ dependencies = [
  "unicode-ident",
 ]
 
+[[package]]
+name = "temp-dir"
+version = "0.1.14"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "bc1ee6eef34f12f765cb94725905c6312b6610ab2b0940889cfe58dae7bc3c72"
+
 [[package]]
 name = "tempfile"
 version = "3.12.0"
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index 628e47a1..6f56f090 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -30,17 +30,19 @@ anyhow = { version = "1.0" }
 metric_engine = { path = "metric_engine" }
 thiserror = "1"
 bytes = "1"
-datafusion = "42"
+datafusion = "43"
 parquet = { version = "53" }
 object_store = { version = "0.11" }
 macros = { path = "../src/components/macros" }
 pb_types = { path = "pb_types" }
 prost = { version = "0.13" }
 arrow = { version = "53", features = ["prettyprint"] }
+arrow-schema = "53"
 tokio = { version = "1", features = ["full"] }
 async-trait = "0.1"
 async-stream = "0.3"
 futures = "0.3"
+temp-dir = "0.1"
 itertools = "0.3"
 lazy_static = "1"
 tracing = "0.1"
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index d2ea85c8..095532bd 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -33,6 +33,7 @@ workspace = true
 [dependencies]
 anyhow = { workspace = true }
 arrow = { workspace = true }
+arrow-schema = { workspace = true }
 async-trait = { workspace = true }
 bytes = { workspace = true }
 datafusion = { workspace = true }
@@ -46,3 +47,6 @@ pb_types = { workspace = true }
 prost = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true }
+
+[dev-dependencies]
+temp-dir = { workspace = true }
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index be7ef2b4..e0dfdb3c 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -17,11 +17,14 @@
 
 //! Storage Engine for metrics.
 
+#![feature(duration_constructors)]
 pub mod error;
+mod macros;
 mod manifest;
 mod read;
 mod sst;
 pub mod storage;
+mod test_util;
 pub mod types;
 
 pub use error::{AnyhowError, Error, Result};
diff --git a/horaedb/metric_engine/src/lib.rs 
b/horaedb/metric_engine/src/macros.rs
similarity index 63%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/macros.rs
index be7ef2b4..71c668db 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/macros.rs
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Storage Engine for metrics.
-
-pub mod error;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-pub mod types;
-
-pub use error::{AnyhowError, Error, Result};
+#[macro_export]
+macro_rules! compare_primitive_columns {
+    ($lhs_col:expr, $rhs_col:expr, $lhs_idx:expr, $rhs_idx:expr, 
$($type:ty),+) => {
+        $(
+            if let Some(lhs_col) = $lhs_col.as_primitive_opt::<$type>() {
+                let rhs_col = $rhs_col.as_primitive::<$type>();
+                if !lhs_col.value($lhs_idx).eq(&rhs_col.value($rhs_idx)) {
+                    return false;
+                }
+            }
+        )+
+    };
+}
diff --git a/horaedb/metric_engine/src/read.rs 
b/horaedb/metric_engine/src/read.rs
index 88522fe9..26564a1a 100644
--- a/horaedb/metric_engine/src/read.rs
+++ b/horaedb/metric_engine/src/read.rs
@@ -15,15 +15,41 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::{
+    any::Any,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use arrow::{
+    array::{AsArray, RecordBatch},
+    compute::concat_batches,
+    datatypes::{
+        GenericBinaryType, Int32Type, Int64Type, Int8Type, Schema, UInt32Type, 
UInt64Type,
+        UInt8Type,
+    },
+};
+use arrow_schema::SchemaRef;
 use datafusion::{
+    common::internal_err,
     datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
-    error::Result as DfResult,
+    error::{DataFusionError, Result as DfResult},
+    execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
+    logical_expr::AggregateUDFImpl,
     parquet::arrow::async_reader::AsyncFileReader,
-    physical_plan::metrics::ExecutionPlanMetricsSet,
+    physical_plan::{
+        metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution, 
ExecutionPlan, PlanProperties,
+    },
 };
+use futures::{Stream, StreamExt};
+use itertools::Itertools;
 use parquet::arrow::async_reader::ParquetObjectReader;
 
-use crate::types::ObjectStoreRef;
+use crate::{
+    compare_primitive_columns,
+    types::{ObjectStoreRef, SEQ_COLUMN_NAME},
+};
 
 #[derive(Debug, Clone)]
 pub struct DefaultParquetFileReaderFactory {
@@ -53,3 +79,254 @@ impl ParquetFileReaderFactory for 
DefaultParquetFileReaderFactory {
         Ok(Box::new(reader))
     }
 }
+
+/// Execution plan for merge RecordBatch values, like Merge Operator in 
RocksDB.
+///
+/// Input record batches are sorted by the primary key columns and seq
+/// column.
+#[derive(Debug)]
+pub(crate) struct MergeExec {
+    /// Input plan
+    input: Arc<dyn ExecutionPlan>,
+    /// (0..num_primary_keys) are primary key columns
+    num_primary_keys: usize,
+    /// Sequence column index
+    seq_idx: usize,
+    // (idx, merge_op)
+    value_idx: usize,
+    value_op: Arc<dyn AggregateUDFImpl>,
+}
+
+impl MergeExec {
+    pub fn new(
+        input: Arc<dyn ExecutionPlan>,
+        num_primary_keys: usize,
+        seq_idx: usize,
+        value_idx: usize,
+        value_op: Arc<dyn AggregateUDFImpl>,
+    ) -> Self {
+        Self {
+            input,
+            num_primary_keys,
+            seq_idx,
+            value_idx,
+            value_op,
+        }
+    }
+}
+impl DisplayAs for MergeExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(
+            f,
+            "MergeExec: [primary_keys: {}, seq_idx: {}]",
+            self.num_primary_keys, self.seq_idx
+        )?;
+        Ok(())
+    }
+}
+
+impl ExecutionPlan for MergeExec {
+    fn name(&self) -> &str {
+        "MergeExec"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        self.input.properties()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition; self.children().len()]
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![&self.input]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true; self.children().len()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> DfResult<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MergeExec::new(
+            Arc::clone(&children[0]),
+            self.num_primary_keys,
+            self.seq_idx,
+            self.value_idx,
+            self.value_op.clone(),
+        )))
+    }
+
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> DfResult<SendableRecordBatchStream> {
+        if 0 != partition {
+            return internal_err!("MergeExec invalid partition {partition}");
+        }
+
+        Ok(Box::pin(MergeStream::new(
+            self.input.execute(partition, context)?,
+            self.num_primary_keys,
+            self.seq_idx,
+            self.value_idx,
+            self.value_op.clone(),
+        )))
+    }
+}
+
+struct MergeStream {
+    stream: SendableRecordBatchStream,
+    num_primary_keys: usize,
+    seq_idx: usize,
+    value_idx: usize,
+    value_op: Arc<dyn AggregateUDFImpl>,
+
+    pending_batch: Option<RecordBatch>,
+    arrow_schema: SchemaRef,
+}
+
+impl MergeStream {
+    fn new(
+        stream: SendableRecordBatchStream,
+        num_primary_keys: usize,
+        seq_idx: usize,
+        value_idx: usize,
+        value_op: Arc<dyn AggregateUDFImpl>,
+    ) -> Self {
+        let fields = stream
+            .schema()
+            .fields()
+            .into_iter()
+            .filter_map(|f| {
+                if f.name() == SEQ_COLUMN_NAME {
+                    None
+                } else {
+                    Some(f.clone())
+                }
+            })
+            .collect_vec();
+        let arrow_schema = Arc::new(Schema::new_with_metadata(
+            fields,
+            stream.schema().metadata.clone(),
+        ));
+        Self {
+            stream,
+            num_primary_keys,
+            seq_idx,
+            value_idx,
+            value_op,
+            pending_batch: None,
+            arrow_schema,
+        }
+    }
+
+    fn primary_key_eq(
+        &self,
+        lhs: &RecordBatch,
+        lhs_idx: usize,
+        rhs: &RecordBatch,
+        rhs_idx: usize,
+    ) -> bool {
+        for k in 0..self.num_primary_keys {
+            let lhs_col = lhs.column(k);
+            let rhs_col = rhs.column(k);
+            compare_primitive_columns!(
+                lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types 
here
+                UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type, 
Int64Type
+            );
+
+            if let Some(lhs_col) = 
lhs_col.as_bytes_opt::<GenericBinaryType<i32>>() {
+                let rhs_col = rhs_col.as_bytes::<GenericBinaryType<i32>>();
+                if !rhs_col.value(rhs_idx).eq(lhs_col.value(lhs_idx)) {
+                    return false;
+                }
+            }
+        }
+
+        true
+    }
+
+    // TODO: only support deduplication now, merge operation will be added 
later.
+    fn merge_batch(&mut self, batch: RecordBatch) -> DfResult<RecordBatch> {
+        let mut batches = vec![];
+        let mut start_idx = 0;
+        while start_idx < batch.num_rows() {
+            let mut end_idx = start_idx + 1;
+            while end_idx < batch.num_rows()
+                && self.primary_key_eq(&batch, start_idx, &batch, end_idx)
+            {
+                end_idx += 1;
+            }
+            let rows_with_same_primary_keys = batch.slice(start_idx, end_idx - 
start_idx);
+            if let Some(pending) = self.pending_batch.take() {
+                if !self.primary_key_eq(
+                    &pending,
+                    pending.num_rows() - 1,
+                    &rows_with_same_primary_keys,
+                    0,
+                ) {
+                    // only keep the last row in this batch
+                    batches.push(pending.slice(pending.num_rows() - 1, 1));
+                }
+            }
+            batches.push(
+                
rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1, 
1),
+            );
+
+            start_idx = end_idx;
+        }
+
+        // last batch may have overlapping rows with the next batch, so keep 
them in
+        // pending_batch
+        self.pending_batch = batches.pop();
+
+        concat_batches(&self.stream.schema(), batches.iter())
+            .map_err(|e| DataFusionError::ArrowError(e, None))
+            .map(|mut batch| {
+                // Remove seq column
+                batch.remove_column(self.seq_idx);
+                batch
+            })
+    }
+}
+
+impl Stream for MergeStream {
+    type Item = DfResult<RecordBatch>;
+
+    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> 
Poll<Option<Self::Item>> {
+        match self.stream.poll_next_unpin(ctx) {
+            Poll::Pending => Poll::Pending,
+            Poll::Ready(None) => {
+                let value = if let Some(mut pending) = 
self.pending_batch.take() {
+                    pending.remove_column(self.seq_idx);
+                    Some(Ok(pending))
+                } else {
+                    None
+                };
+                Poll::Ready(value)
+            }
+            Poll::Ready(Some(v)) => Poll::Ready(Some(v.and_then(|batch| {
+                let batch = self.merge_batch(batch)?;
+                Ok(batch)
+            }))),
+        }
+    }
+}
+
+impl RecordBatchStream for MergeStream {
+    fn schema(&self) -> SchemaRef {
+        self.arrow_schema.clone()
+    }
+}
diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index 1419503a..02816088 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{sync::Arc, vec};
+use std::{sync::Arc, time::Duration, vec};
 
 use anyhow::Context;
 use arrow::{
-    array::{Int64Array, RecordBatch},
+    array::{RecordBatch, UInt64Array},
     datatypes::SchemaRef,
 };
+use arrow_schema::{DataType, Field, Schema};
 use async_trait::async_trait;
 use datafusion::{
     common::DFSchema,
@@ -30,13 +31,21 @@ use datafusion::{
         physical_plan::{FileScanConfig, ParquetExec},
     },
     execution::{context::ExecutionProps, object_store::ObjectStoreUrl, 
SendableRecordBatchStream},
+    functions_aggregate::first_last::LastValue,
     logical_expr::{utils::conjunction, Expr},
     physical_expr::{create_physical_expr, LexOrdering},
-    physical_plan::{execute_stream, memory::MemoryExec, sorts::sort::SortExec},
+    physical_plan::{
+        execute_stream,
+        memory::MemoryExec,
+        sorts::{sort::SortExec, 
sort_preserving_merge::SortPreservingMergeExec},
+        union::UnionExec,
+        EmptyRecordBatchStream, ExecutionPlan,
+    },
     physical_planner::create_physical_sort_exprs,
     prelude::{ident, SessionContext},
 };
 use futures::StreamExt;
+use itertools::Itertools;
 use macros::ensure;
 use object_store::path::Path;
 use parquet::{
@@ -48,14 +57,17 @@ use parquet::{
 
 use crate::{
     manifest::Manifest,
-    read::DefaultParquetFileReaderFactory,
-    sst::{allocate_id, FileId, FileMeta},
-    types::{ObjectStoreRef, TimeRange, Timestamp, WriteOptions, WriteResult},
+    read::{DefaultParquetFileReaderFactory, MergeExec},
+    sst::{allocate_id, FileId, FileMeta, SstFile},
+    types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult, 
SEQ_COLUMN_NAME},
     Result,
 };
 
 pub struct WriteRequest {
-    batch: RecordBatch,
+    pub batch: RecordBatch,
+    pub time_range: TimeRange,
+    // Check data is valid if it's true.
+    pub enable_check: bool,
 }
 
 pub struct ScanRequest {
@@ -81,13 +93,17 @@ pub trait TimeMergeStorage {
     async fn compact(&self, req: CompactRequest) -> Result<()>;
 }
 
-/// `TimeMergeStorage` implementation using cloud object storage.
+/// `TimeMergeStorage` implementation using cloud object storage, it will split
+/// data into different segments(aka `segment_duration`) based time range.
+///
+/// Compaction will be done by merging segments within a segment, and segment
+/// will make it easy to support expiration.
 pub struct CloudObjectStorage {
+    segment_duration: Duration,
     path: String,
     store: ObjectStoreRef,
     arrow_schema: SchemaRef,
-    num_primary_key: usize,
-    timestamp_index: usize,
+    num_primary_keys: usize,
     manifest: Manifest,
 
     df_schema: DFSchema,
@@ -104,24 +120,35 @@ pub struct CloudObjectStorage {
 /// {root_path}/data/timestamp_b.sst
 /// {root_path}/data/...
 /// ```
+/// `root_path` is composed of `path` and `segment_duration`.
 impl CloudObjectStorage {
     pub async fn try_new(
-        root_path: String,
+        path: String,
+        segment_duration: Duration,
         store: ObjectStoreRef,
         arrow_schema: SchemaRef,
-        num_primary_key: usize,
-        timestamp_index: usize,
+        num_primary_keys: usize,
         write_options: WriteOptions,
     ) -> Result<Self> {
         let manifest_prefix = crate::manifest::PREFIX_PATH;
         let manifest =
-            Manifest::try_new(format!("{root_path}/{manifest_prefix}"), 
store.clone()).await?;
+            Manifest::try_new(format!("{path}/{manifest_prefix}"), 
store.clone()).await?;
+        let mut new_fields = arrow_schema.fields.clone().to_vec();
+        new_fields.push(Arc::new(Field::new(
+            SEQ_COLUMN_NAME,
+            DataType::UInt64,
+            true,
+        )));
+        let arrow_schema = Arc::new(Schema::new_with_metadata(
+            new_fields,
+            arrow_schema.metadata.clone(),
+        ));
         let df_schema = 
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
-        let write_props = Self::build_write_props(write_options, 
num_primary_key);
+        let write_props = Self::build_write_props(write_options, 
num_primary_keys);
         Ok(Self {
-            path: root_path,
-            num_primary_key,
-            timestamp_index,
+            path,
+            num_primary_keys,
+            segment_duration,
             store,
             arrow_schema,
             manifest,
@@ -136,7 +163,7 @@ impl CloudObjectStorage {
         format!("{root}/{prefix}/{id}")
     }
 
-    async fn write_batch(&self, req: WriteRequest) -> Result<WriteResult> {
+    async fn write_batch(&self, batch: RecordBatch) -> Result<WriteResult> {
         let file_id = allocate_id();
         let file_path = self.build_file_path(file_id);
         let file_path = Path::from(file_path);
@@ -149,10 +176,21 @@ impl CloudObjectStorage {
         .context("create arrow writer")?;
 
         // sort record batch
-        let mut batches = self.sort_batch(req.batch).await?;
+        let mut batches = self.sort_batch(batch).await?;
         while let Some(batch) = batches.next().await {
             let batch = batch.context("get sorted batch")?;
-            writer.write(&batch).await.context("write arrow batch")?;
+            let batch_with_seq = {
+                let mut new_cols = batch.columns().to_vec();
+                // Since file_id in increasing order, we can use it as 
sequence.
+                let seq_column = Arc::new(UInt64Array::from(vec![file_id; 
batch.num_rows()]));
+                new_cols.push(seq_column);
+                RecordBatch::try_new(self.arrow_schema.clone(), new_cols)
+                    .context("construct record batch with seq column")?
+            };
+            writer
+                .write(&batch_with_seq)
+                .await
+                .context("write arrow batch")?;
         }
         writer.close().await.context("close arrow writer")?;
         let object_meta = self
@@ -163,17 +201,21 @@ impl CloudObjectStorage {
 
         Ok(WriteResult {
             id: file_id,
+            seq: file_id,
             size: object_meta.size,
         })
     }
 
-    fn build_sort_exprs(&self) -> Result<LexOrdering> {
-        let sort_exprs = (0..self.num_primary_key)
+    fn build_sort_exprs(&self, sort_seq: bool) -> Result<LexOrdering> {
+        let mut sort_exprs = (0..self.num_primary_keys)
             .map(|i| {
                 ident(self.schema().field(i).name())
                     .sort(true /* asc */, true /* nulls_first */)
             })
             .collect::<Vec<_>>();
+        if sort_seq {
+            sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true));
+        }
         let sort_exprs =
             create_physical_sort_exprs(&sort_exprs, &self.df_schema, 
&ExecutionProps::default())
                 .context("create physical sort exprs")?;
@@ -184,7 +226,7 @@ impl CloudObjectStorage {
     async fn sort_batch(&self, batch: RecordBatch) -> 
Result<SendableRecordBatchStream> {
         let ctx = SessionContext::default();
         let schema = batch.schema();
-        let sort_exprs = self.build_sort_exprs()?;
+        let sort_exprs = self.build_sort_exprs(false /* sort_seq */)?;
         let batch_plan =
             MemoryExec::try_new(&[vec![batch]], schema, None).context("build 
batch plan")?;
         let physical_plan = Arc::new(SortExec::new(sort_exprs, 
Arc::new(batch_plan)));
@@ -235,6 +277,57 @@ impl CloudObjectStorage {
 
         builder.build()
     }
+
+    async fn build_scan_plan(
+        &self,
+        ssts: Vec<SstFile>,
+        projections: Option<Vec<usize>>,
+        predicates: Vec<Expr>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        // we won't use url for selecting object_store.
+        let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
+        let sort_exprs = self.build_sort_exprs(true /* sort_seq */)?;
+
+        let file_groups = ssts
+            .into_iter()
+            .map(|f| {
+                vec![PartitionedFile::new(
+                    self.build_file_path(f.id),
+                    f.meta.size as u64,
+                )]
+            })
+            .collect::<Vec<_>>();
+        let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
+            .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
+            .with_file_groups(file_groups)
+            .with_projection(projections);
+
+        let mut builder = 
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
+            Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
+        );
+        if let Some(expr) = conjunction(predicates) {
+            let filters = create_physical_expr(&expr, &self.df_schema, 
&ExecutionProps::new())
+                .context("create physical expr")?;
+            builder = builder.with_predicate(filters);
+        }
+
+        // TODO: fetch using multiple threads since read from parquet will 
incur CPU
+        // when convert between arrow and parquet.
+        let parquet_exec = builder.build();
+        let sort_exec = SortPreservingMergeExec::new(sort_exprs, 
Arc::new(parquet_exec))
+            // TODO: make fetch size configurable.
+            .with_fetch(Some(1024))
+            .with_round_robin_repartition(true);
+
+        let merge_exec = MergeExec::new(
+            Arc::new(sort_exec),
+            self.num_primary_keys,
+            self.schema().fields.len() - 1,
+            0, // TODO: value_idx, not used now.
+            Arc::new(LastValue::new()),
+        );
+        Ok(Arc::new(merge_exec))
+    }
 }
 
 #[async_trait]
@@ -244,32 +337,27 @@ impl TimeMergeStorage for CloudObjectStorage {
     }
 
     async fn write(&self, req: WriteRequest) -> Result<()> {
-        ensure!(req.batch.schema_ref().eq(self.schema()), "schema not match");
+        if req.enable_check {
+            let segment_duration = self.segment_duration.as_millis() as i64;
+            ensure!(
+                req.time_range.start.0 / segment_duration
+                    == (req.time_range.end.0 - 1) / segment_duration,
+                "time range can't cross segment, value:{:?}",
+                &req.time_range
+            );
+        }
 
         let num_rows = req.batch.num_rows();
-        let time_column = req
-            .batch
-            .column(self.timestamp_index)
-            .as_any()
-            .downcast_ref::<Int64Array>()
-            .context("timestamp column should be int64")?;
-
-        let mut start = Timestamp::MAX;
-        let mut end = Timestamp::MIN;
-        for v in time_column.values() {
-            start = start.min(Timestamp(*v));
-            end = end.max(Timestamp(*v));
-        }
-        let time_range = TimeRange::new(start, end + 1);
         let WriteResult {
             id: file_id,
+            seq,
             size: file_size,
-        } = self.write_batch(req).await?;
+        } = self.write_batch(req.batch).await?;
         let file_meta = FileMeta {
-            max_sequence: file_id, // Since file_id in increasing order, we 
can use it as sequence.
+            max_sequence: seq,
             num_rows: num_rows as u32,
             size: file_size as u32,
-            time_range,
+            time_range: req.time_range,
         };
         self.manifest.add_file(file_id, file_meta).await?;
 
@@ -277,39 +365,36 @@ impl TimeMergeStorage for CloudObjectStorage {
     }
 
     async fn scan(&self, req: ScanRequest) -> 
Result<SendableRecordBatchStream> {
-        let ssts = self.manifest.find_ssts(&req.range).await;
-        // we won't use url for selecting object_store.
-        let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
-        // TODO: we could group ssts based on time range.
-        // TODO: fetch using multiple threads since read from parquet will 
incur CPU
-        // when convert between arrow and parquet.
-        let file_groups = ssts
-            .iter()
-            .map(|f| PartitionedFile::new(self.build_file_path(f.id), 
f.meta.size as u64))
-            .collect::<Vec<_>>();
-        let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
-            .with_file_group(file_groups)
-            .with_projection(req.projections);
-
-        let mut builder = 
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
-            Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
-        );
-        if let Some(expr) = conjunction(req.predicate) {
-            let filters = create_physical_expr(&expr, &self.df_schema, 
&ExecutionProps::new())
-                .context("create pyhsical expr")?;
-            builder = builder.with_predicate(filters);
+        let total_ssts = self.manifest.find_ssts(&req.range).await;
+        if total_ssts.is_empty() {
+            return Ok(Box::pin(EmptyRecordBatchStream::new(
+                self.arrow_schema.clone(),
+            )));
         }
 
-        let parquet_exec = builder.build();
-        let sort_exprs = self.build_sort_exprs()?;
-        let physical_plan = Arc::new(SortExec::new(sort_exprs, 
Arc::new(parquet_exec)));
+        let ssts_by_segment = total_ssts.into_iter().group_by(|file| {
+            file.meta.time_range.start.0 / self.segment_duration.as_millis() 
as i64
+        });
+
+        let mut plan_for_all_segments = Vec::new();
+        for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) {
+            let plan = self
+                .build_scan_plan(ssts, req.projections.clone(), 
req.predicate.clone())
+                .await?;
+
+            plan_for_all_segments.push(plan);
+        }
 
         let ctx = SessionContext::default();
-        // TODO: dedup record batch based on primary keys and sequence number.
-        let res =
-            execute_stream(physical_plan, ctx.task_ctx()).context("execute 
sort physical plan")?;
+        if plan_for_all_segments.len() == 1 {
+            let res = execute_stream(plan_for_all_segments.remove(0), 
ctx.task_ctx())
+                .context("execute stream")?;
+            return Ok(res);
+        }
 
-        Ok(res)
+        let union_exec = Arc::new(UnionExec::new(plan_for_all_segments));
+        let res = execute_stream(union_exec, ctx.task_ctx()).context("execute 
stream")?;
+        return Ok(res);
     }
 
     async fn compact(&self, req: CompactRequest) -> Result<()> {
@@ -319,64 +404,171 @@ impl TimeMergeStorage for CloudObjectStorage {
 
 #[cfg(test)]
 mod tests {
-    use arrow::{
-        array::UInt8Array,
-        datatypes::{DataType, Field, Schema},
-    };
+    use arrow::array::{self as arrow_array};
+    use datafusion::common::record_batch;
     use object_store::local::LocalFileSystem;
 
     use super::*;
+    use crate::{arrow_schema, types::Timestamp};
 
     #[tokio::test]
-    async fn test_sort_batch() {
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::UInt8, false),
-            Field::new("b", DataType::UInt8, false),
-            Field::new("c", DataType::UInt8, false),
-            Field::new("d", DataType::UInt8, false),
-        ]));
+    async fn test_build_scan_plan() {
+        let schema = arrow_schema!(("pk1", UInt8));
+        let store = Arc::new(LocalFileSystem::new());
+        let storage = CloudObjectStorage::try_new(
+            "mock".to_string(),
+            Duration::from_hours(2),
+            store,
+            schema.clone(),
+            1, // num_primary_keys
+            WriteOptions::default(),
+        )
+        .await
+        .unwrap();
+        let plan = storage
+            .build_scan_plan(
+                (100..103)
+                    .map(|id| SstFile {
+                        id,
+                        meta: FileMeta {
+                            max_sequence: id,
+                            num_rows: 1,
+                            size: 1,
+                            time_range: (1..10).into(),
+                        },
+                    })
+                    .collect(),
+                None,
+                vec![],
+            )
+            .await
+            .unwrap();
+        let display_plan =
+            
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
+                .indent(true);
+        assert_eq!(
+            r#"MergeExec: [primary_keys: 1, seq_idx: 1]
+  SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024
+    ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101], 
[mock/data/102]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC, 
__seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]]
+"#,
+            format!("{display_plan}")
+        );
+    }
 
+    #[tokio::test]
+    async fn test_storage_write_and_scan() {
+        let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value", 
Int64));
+        let root_dir = temp_dir::TempDir::new().unwrap();
         let store = Arc::new(LocalFileSystem::new());
         let storage = CloudObjectStorage::try_new(
-            "/tmp/storage".to_string(),
+            root_dir.path().to_string_lossy().to_string(),
+            Duration::from_hours(2),
             store,
             schema.clone(),
-            1,
-            1,
+            2, // num_primary_keys
             WriteOptions::default(),
         )
         .await
         .unwrap();
 
-        let batch = RecordBatch::try_new(
+        let batch = record_batch!(
+            ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
+            ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
+            ("value", Int64, vec![2, 7, 4, 6, 1])
+        )
+        .unwrap();
+        storage
+            .write(WriteRequest {
+                batch,
+                time_range: (1..10).into(),
+                enable_check: true,
+            })
+            .await
+            .unwrap();
+
+        let batch = record_batch!(
+            ("pk1", UInt8, vec![11, 11, 9, 10]),
+            ("pk2", UInt8, vec![100, 99, 1, 2]),
+            ("value", Int64, vec![22, 77, 44, 66])
+        )
+        .unwrap();
+        storage
+            .write(WriteRequest {
+                batch,
+                time_range: (10..20).into(),
+                enable_check: true,
+            })
+            .await
+            .unwrap();
+
+        let mut result_stream = storage
+            .scan(ScanRequest {
+                range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+                predicate: vec![],
+                projections: None,
+            })
+            .await
+            .unwrap();
+        let expected_batch = [
+            record_batch!(
+                ("pk1", UInt8, vec![5, 9, 10, 11]),
+                ("pk2", UInt8, vec![3, 1, 2, 99]),
+                ("value", Int64, vec![1, 44, 66, 77])
+            )
+            .unwrap(),
+            record_batch!(
+                ("pk1", UInt8, vec![11]),
+                ("pk2", UInt8, vec![100]),
+                ("value", Int64, vec![22])
+            )
+            .unwrap(),
+        ];
+        let mut idx = 0;
+        while let Some(batch) = result_stream.next().await {
+            let batch = batch.unwrap();
+            assert_eq!(expected_batch[idx], batch);
+            idx += 1;
+        }
+    }
+
+    #[tokio::test]
+    async fn test_storage_sort_batch() {
+        let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8), 
("c", UInt8));
+        let root_dir = temp_dir::TempDir::new().unwrap();
+        let store = Arc::new(LocalFileSystem::new());
+        let storage = CloudObjectStorage::try_new(
+            root_dir.path().to_string_lossy().to_string(),
+            Duration::from_hours(2),
+            store,
             schema.clone(),
-            vec![
-                Arc::new(UInt8Array::from(vec![2, 1, 3, 4, 8, 6, 5, 7])),
-                Arc::new(UInt8Array::from(vec![1, 3, 4, 8, 2, 6, 5, 7])),
-                Arc::new(UInt8Array::from(vec![8, 6, 2, 4, 3, 1, 5, 7])),
-                Arc::new(UInt8Array::from(vec![2, 7, 4, 6, 1, 3, 5, 8])),
-            ],
+            1,
+            WriteOptions::default(),
         )
+        .await
         .unwrap();
 
-        let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
-        let expected_bacth = RecordBatch::try_new(
-            schema,
-            vec![
-                Arc::new(UInt8Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8])),
-                Arc::new(UInt8Array::from(vec![3, 1, 4, 8, 5, 6, 7, 2])),
-                Arc::new(UInt8Array::from(vec![6, 8, 2, 4, 5, 1, 7, 3])),
-                Arc::new(UInt8Array::from(vec![7, 2, 4, 6, 5, 3, 8, 1])),
-            ],
+        let batch = record_batch!(
+            ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
+            ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
+            ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
+            ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
         )
         .unwrap();
 
+        let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
+        let expected_bacth = record_batch!(
+            ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
+            ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
+            ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
+            ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
+        )
+        .unwrap();
         let mut offset = 0;
         while let Some(sorted_batch) = sorted_batches.next().await {
             let sorted_batch = sorted_batch.unwrap();
             let length = sorted_batch.num_rows();
             let batch = expected_bacth.slice(offset, length);
-            assert!(sorted_batch.eq(&batch));
+            assert_eq!(sorted_batch, batch);
             offset += length;
         }
     }
diff --git a/horaedb/metric_engine/src/lib.rs 
b/horaedb/metric_engine/src/test_util.rs
similarity index 53%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/test_util.rs
index be7ef2b4..9c378b0a 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/test_util.rs
@@ -15,13 +15,27 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Storage Engine for metrics.
+#[macro_export]
+macro_rules! arrow_schema {
+    ($(($field_name:expr, $data_type:ident)),* $(,)?) => {{
+        let fields = vec![
+            $(
+                arrow::datatypes::Field::new($field_name, 
arrow::datatypes::DataType::$data_type, true),
+            )*
+        ];
+        std::sync::Arc::new(arrow::datatypes::Schema::new(fields))
+    }};
+}
 
-pub mod error;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-pub mod types;
+#[cfg(test)]
+mod tests {
+    #[test]
+    fn test_arrow_schema_macro() {
+        let schema = arrow_schema![("a", UInt8), ("b", UInt8), ("c", UInt8), 
("d", UInt8),];
 
-pub use error::{AnyhowError, Error, Result};
+        let expected_names = ["a", "b", "c", "d"];
+        for (i, f) in schema.fields().iter().enumerate() {
+            assert_eq!(f.name(), expected_names[i]);
+        }
+    }
+}
diff --git a/horaedb/metric_engine/src/types.rs 
b/horaedb/metric_engine/src/types.rs
index 7bc9da18..624b8a2e 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -26,6 +26,10 @@ use parquet::basic::{Compression, Encoding, ZstdLevel};
 
 use crate::sst::FileId;
 
+// Seq column is a builtin column, and it will be appended to the end of
+// user-defined schema.
+pub const SEQ_COLUMN_NAME: &str = "__seq__";
+
 #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
 pub struct Timestamp(pub i64);
 
@@ -73,6 +77,15 @@ impl From<Range<Timestamp>> for TimeRange {
     }
 }
 
+impl From<Range<i64>> for TimeRange {
+    fn from(value: Range<i64>) -> Self {
+        Self(Range {
+            start: value.start.into(),
+            end: value.end.into(),
+        })
+    }
+}
+
 impl Deref for TimeRange {
     type Target = Range<Timestamp>;
 
@@ -95,6 +108,7 @@ pub type ObjectStoreRef = Arc<dyn ObjectStore>;
 
 pub struct WriteResult {
     pub id: FileId,
+    pub seq: u64,
     pub size: usize,
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to