This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 1d7c5499 feat: add merge stream (#1595)
1d7c5499 is described below
commit 1d7c549971df3e5fdf147c48367dd7552e637954
Author: Jiacai Liu <[email protected]>
AuthorDate: Wed Nov 20 17:11:01 2024 +0800
feat: add merge stream (#1595)
## Rationale
Part of Metric Engine.
## Detailed Changes
- Scan SSTs in parallel based on segment
- Sort SST using SortPreservingMergeExec, which is more efficient than
SortExec
- Add MergeExec to dedup record batch based on sorted batches, currently
only `overwrite` semantics is supported.
## Test Plan
Add two new UT.
---
horaedb/Cargo.lock | 120 ++++---
horaedb/Cargo.toml | 4 +-
horaedb/metric_engine/Cargo.toml | 4 +
horaedb/metric_engine/src/lib.rs | 3 +
horaedb/metric_engine/src/{lib.rs => macros.rs} | 23 +-
horaedb/metric_engine/src/read.rs | 283 ++++++++++++++-
horaedb/metric_engine/src/storage.rs | 394 +++++++++++++++------
horaedb/metric_engine/src/{lib.rs => test_util.rs} | 30 +-
horaedb/metric_engine/src/types.rs | 14 +
9 files changed, 704 insertions(+), 171 deletions(-)
diff --git a/horaedb/Cargo.lock b/horaedb/Cargo.lock
index 64912637..4c8aac43 100644
--- a/horaedb/Cargo.lock
+++ b/horaedb/Cargo.lock
@@ -421,9 +421,9 @@ dependencies = [
[[package]]
name = "brotli"
-version = "6.0.0"
+version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b"
+checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
@@ -649,9 +649,9 @@ dependencies = [
[[package]]
name = "datafusion"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3e8053b4cedc24eb158e4c041b38cfa0677ef5f4a7ccaa31ee5dcad61dd7aa54"
+checksum = "cbba0799cf6913b456ed07a94f0f3b6e12c62a5d88b10809e2284a0f2b915c05"
dependencies = [
"ahash",
"arrow",
@@ -706,9 +706,9 @@ dependencies = [
[[package]]
name = "datafusion-catalog"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7d95efedb3a32f6f74df5bb8fda7b69fb9babe80e92137f25de6ddb15e8e8801"
+checksum = "7493c5c2d40eec435b13d92e5703554f4efc7059451fcb8d3a79580ff0e45560"
dependencies = [
"arrow-schema",
"async-trait",
@@ -721,9 +721,9 @@ dependencies = [
[[package]]
name = "datafusion-common"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b7d766e0d3dec01a0ab70b1b31678c286cddc0bd7afc9bd82504a1d9a70a7d94"
+checksum = "24953049ebbd6f8964f91f60aa3514e121b5e81e068e33b60e77815ab369b25c"
dependencies = [
"ahash",
"arrow",
@@ -733,6 +733,7 @@ dependencies = [
"chrono",
"half",
"hashbrown",
+ "indexmap",
"instant",
"libc",
"num_cpus",
@@ -745,9 +746,9 @@ dependencies = [
[[package]]
name = "datafusion-common-runtime"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e55db6df319f9e7cf366d0d4ffae793c863823421b2f2b7314a0fefd8e8c11a"
+checksum = "f06df4ef76872e11c924d3c814fd2a8dd09905ed2e2195f71c857d78abd19685"
dependencies = [
"log",
"tokio",
@@ -755,9 +756,9 @@ dependencies = [
[[package]]
name = "datafusion-execution"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6d0c6dc013f955c382438a78fa3de8b0a8bf7b1a4cda5bc46335fe445ff3ff1a"
+checksum = "6bbdcb628d690f3ce5fea7de81642b514486d58ff9779a51f180a69a4eadb361"
dependencies = [
"arrow",
"chrono",
@@ -776,9 +777,9 @@ dependencies = [
[[package]]
name = "datafusion-expr"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7f31405c0bb854451d755b224d41dc466a8f7fd36f8c041c29d2d8432bd0c08c"
+checksum = "8036495980e3131f706b7d33ab00b4492d73dc714e3cb74d11b50f9602a73246"
dependencies = [
"ahash",
"arrow",
@@ -788,7 +789,9 @@ dependencies = [
"datafusion-common",
"datafusion-expr-common",
"datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
"datafusion-physical-expr-common",
+ "indexmap",
"paste",
"serde_json",
"sqlparser",
@@ -798,20 +801,21 @@ dependencies = [
[[package]]
name = "datafusion-expr-common"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ebc8266b6627c8264c87bc7c82564e3d89ed5f0f9943b49a30dac1f1ac12e4c0"
+checksum = "4da0f3cb4669f9523b403d6b5a0ec85023e0ab3bf0183afd1517475b3e64fdd2"
dependencies = [
"arrow",
"datafusion-common",
+ "itertools 0.13.0",
"paste",
]
[[package]]
name = "datafusion-functions"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5712668780bc43666ecd10acd188b7df58e2a5501d4dbbd972bf209f1790138b"
+checksum = "f52c4012648b34853e40a2c6bcaa8772f837831019b68aca384fb38436dba162"
dependencies = [
"arrow",
"arrow-buffer",
@@ -836,9 +840,9 @@ dependencies = [
[[package]]
name = "datafusion-functions-aggregate"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8ec138af6b7482fb726f1bfeec010fc063b9614594c36a1051a4d3b365ba6a5f"
+checksum = "e5b8bb624597ba28ed7446df4a9bd7c7a7bde7c578b6b527da3f47371d5f6741"
dependencies = [
"ahash",
"arrow",
@@ -850,16 +854,16 @@ dependencies = [
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"half",
+ "indexmap",
"log",
"paste",
- "sqlparser",
]
[[package]]
name = "datafusion-functions-aggregate-common"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "564499c6bdd3ab9f76c7ad727e858bc6791e4de6c1a484d21d2bf49daaa658d6"
+checksum = "6fb06208fc470bc8cf1ce2d9a1159d42db591f2c7264a8c1776b53ad8f675143"
dependencies = [
"ahash",
"arrow",
@@ -871,9 +875,9 @@ dependencies = [
[[package]]
name = "datafusion-functions-nested"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9b55ea2221ae1c1e37d524f8330f763dcdc205edb74fe5f54cbdea475c17fd18"
+checksum = "fca25bbb87323716d05e54114666e942172ccca23c5a507e9c7851db6e965317"
dependencies = [
"arrow",
"arrow-array",
@@ -894,21 +898,34 @@ dependencies = [
[[package]]
name = "datafusion-functions-window"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6932996c4407ee1ebf23ffd706e982729cb9b6f7a31a281abac51fe524c3a049"
+checksum = "5ae23356c634e54c59f7c51acb7a5b9f6240ffb2cf997049a1a24a8a88598dbe"
dependencies = [
"datafusion-common",
"datafusion-expr",
+ "datafusion-functions-window-common",
+ "datafusion-physical-expr",
"datafusion-physical-expr-common",
"log",
+ "paste",
+]
+
+[[package]]
+name = "datafusion-functions-window-common"
+version = "43.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d4b3d6ff7794acea026de36007077a06b18b89e4f9c3fea7f2215f9f7dd9059b"
+dependencies = [
+ "datafusion-common",
+ "datafusion-physical-expr-common",
]
[[package]]
name = "datafusion-optimizer"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f7d8afa1eb44e2f00cc8d82b88803e456a681474b8877ceecc04e9517d5c843c"
+checksum = "bec6241eb80c595fa0e1a8a6b69686b5cf3bd5fdacb8319582a0943b0bd788aa"
dependencies = [
"arrow",
"async-trait",
@@ -926,9 +943,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "570666d84df483473626fab4e69997d048b40d0e7c67c540299714f244d99e73"
+checksum = "3370357b8fc75ec38577700644e5d1b0bc78f38babab99c0b8bd26bafb3e4335"
dependencies = [
"ahash",
"arrow",
@@ -937,30 +954,26 @@ dependencies = [
"arrow-ord",
"arrow-schema",
"arrow-string",
- "base64",
"chrono",
"datafusion-common",
- "datafusion-execution",
"datafusion-expr",
"datafusion-expr-common",
"datafusion-functions-aggregate-common",
"datafusion-physical-expr-common",
"half",
"hashbrown",
- "hex",
"indexmap",
"itertools 0.13.0",
"log",
"paste",
"petgraph",
- "regex",
]
[[package]]
name = "datafusion-physical-expr-common"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3746cbdfb32d67399dcaad17042e419ac6da454a7e38ff098aa2fbf0a7388982"
+checksum = "b8b7734d94bf2fa6f6e570935b0ddddd8421179ce200065be97874e13d46a47b"
dependencies = [
"ahash",
"arrow",
@@ -972,13 +985,15 @@ dependencies = [
[[package]]
name = "datafusion-physical-optimizer"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "696f06e79d44f7c50f57cea23493881d86d9d9647884d38ce467c7f75c13e286"
+checksum = "7eee8c479522df21d7b395640dff88c5ed05361852dce6544d7c98e9dbcebffe"
dependencies = [
+ "arrow",
"arrow-schema",
"datafusion-common",
"datafusion-execution",
+ "datafusion-expr-common",
"datafusion-physical-expr",
"datafusion-physical-plan",
"itertools 0.13.0",
@@ -986,9 +1001,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-plan"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "04e1d084224023e09cdea14d01ded0f2092c319c7b4594ebc821283b9c7c4a35"
+checksum = "17e1fc2e2c239d14e8556f2622b19a726bf6bc6962cc00c71fc52626274bee24"
dependencies = [
"ahash",
"arrow",
@@ -1002,8 +1017,8 @@ dependencies = [
"datafusion-common-runtime",
"datafusion-execution",
"datafusion-expr",
- "datafusion-functions-aggregate",
"datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"futures",
@@ -1021,15 +1036,16 @@ dependencies = [
[[package]]
name = "datafusion-sql"
-version = "42.1.0"
+version = "43.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c105148357dcbd9e4c97eada2930a59f7923215461d9f47de6e76edd60eab2d5"
+checksum = "63e3a4ed41dbee20a5d947a59ca035c225d67dc9cbe869c10f66dcdf25e7ce51"
dependencies = [
"arrow",
"arrow-array",
"arrow-schema",
"datafusion-common",
"datafusion-expr",
+ "indexmap",
"log",
"regex",
"sqlparser",
@@ -1538,6 +1554,7 @@ version = "2.0.0"
dependencies = [
"anyhow",
"arrow",
+ "arrow-schema",
"async-trait",
"bytes",
"datafusion",
@@ -1549,6 +1566,7 @@ dependencies = [
"parquet",
"pb_types",
"prost",
+ "temp-dir",
"thiserror",
"tokio",
]
@@ -1750,9 +1768,9 @@ dependencies = [
[[package]]
name = "parquet"
-version = "53.1.0"
+version = "53.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "310c46a70a3ba90d98fec39fa2da6d9d731e544191da6fb56c9d199484d0dd3e"
+checksum = "dea02606ba6f5e856561d8d507dba8bac060aefca2a6c0f1aa1d361fed91ff3e"
dependencies = [
"ahash",
"arrow-array",
@@ -2231,9 +2249,9 @@ dependencies = [
[[package]]
name = "sqlparser"
-version = "0.50.0"
+version = "0.51.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b2e5b515a2bd5168426033e9efbfd05500114833916f1d5c268f938b4ee130ac"
+checksum = "5fe11944a61da0da3f592e19a45ebe5ab92dc14a779907ff1f08fbb797bfefc7"
dependencies = [
"log",
"sqlparser_derive",
@@ -2295,6 +2313,12 @@ dependencies = [
"unicode-ident",
]
+[[package]]
+name = "temp-dir"
+version = "0.1.14"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc1ee6eef34f12f765cb94725905c6312b6610ab2b0940889cfe58dae7bc3c72"
+
[[package]]
name = "tempfile"
version = "3.12.0"
diff --git a/horaedb/Cargo.toml b/horaedb/Cargo.toml
index 628e47a1..6f56f090 100644
--- a/horaedb/Cargo.toml
+++ b/horaedb/Cargo.toml
@@ -30,17 +30,19 @@ anyhow = { version = "1.0" }
metric_engine = { path = "metric_engine" }
thiserror = "1"
bytes = "1"
-datafusion = "42"
+datafusion = "43"
parquet = { version = "53" }
object_store = { version = "0.11" }
macros = { path = "../src/components/macros" }
pb_types = { path = "pb_types" }
prost = { version = "0.13" }
arrow = { version = "53", features = ["prettyprint"] }
+arrow-schema = "53"
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
async-stream = "0.3"
futures = "0.3"
+temp-dir = "0.1"
itertools = "0.3"
lazy_static = "1"
tracing = "0.1"
diff --git a/horaedb/metric_engine/Cargo.toml b/horaedb/metric_engine/Cargo.toml
index d2ea85c8..095532bd 100644
--- a/horaedb/metric_engine/Cargo.toml
+++ b/horaedb/metric_engine/Cargo.toml
@@ -33,6 +33,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
arrow = { workspace = true }
+arrow-schema = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
datafusion = { workspace = true }
@@ -46,3 +47,6 @@ pb_types = { workspace = true }
prost = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
+
+[dev-dependencies]
+temp-dir = { workspace = true }
diff --git a/horaedb/metric_engine/src/lib.rs b/horaedb/metric_engine/src/lib.rs
index be7ef2b4..e0dfdb3c 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/lib.rs
@@ -17,11 +17,14 @@
//! Storage Engine for metrics.
+#![feature(duration_constructors)]
pub mod error;
+mod macros;
mod manifest;
mod read;
mod sst;
pub mod storage;
+mod test_util;
pub mod types;
pub use error::{AnyhowError, Error, Result};
diff --git a/horaedb/metric_engine/src/lib.rs
b/horaedb/metric_engine/src/macros.rs
similarity index 63%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/macros.rs
index be7ef2b4..71c668db 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/macros.rs
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-//! Storage Engine for metrics.
-
-pub mod error;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-pub mod types;
-
-pub use error::{AnyhowError, Error, Result};
+#[macro_export]
+macro_rules! compare_primitive_columns {
+ ($lhs_col:expr, $rhs_col:expr, $lhs_idx:expr, $rhs_idx:expr,
$($type:ty),+) => {
+ $(
+ if let Some(lhs_col) = $lhs_col.as_primitive_opt::<$type>() {
+ let rhs_col = $rhs_col.as_primitive::<$type>();
+ if !lhs_col.value($lhs_idx).eq(&rhs_col.value($rhs_idx)) {
+ return false;
+ }
+ }
+ )+
+ };
+}
diff --git a/horaedb/metric_engine/src/read.rs
b/horaedb/metric_engine/src/read.rs
index 88522fe9..26564a1a 100644
--- a/horaedb/metric_engine/src/read.rs
+++ b/horaedb/metric_engine/src/read.rs
@@ -15,15 +15,41 @@
// specific language governing permissions and limitations
// under the License.
+use std::{
+ any::Any,
+ pin::Pin,
+ sync::Arc,
+ task::{Context, Poll},
+};
+
+use arrow::{
+ array::{AsArray, RecordBatch},
+ compute::concat_batches,
+ datatypes::{
+ GenericBinaryType, Int32Type, Int64Type, Int8Type, Schema, UInt32Type,
UInt64Type,
+ UInt8Type,
+ },
+};
+use arrow_schema::SchemaRef;
use datafusion::{
+ common::internal_err,
datasource::physical_plan::{FileMeta, ParquetFileReaderFactory},
- error::Result as DfResult,
+ error::{DataFusionError, Result as DfResult},
+ execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext},
+ logical_expr::AggregateUDFImpl,
parquet::arrow::async_reader::AsyncFileReader,
- physical_plan::metrics::ExecutionPlanMetricsSet,
+ physical_plan::{
+ metrics::ExecutionPlanMetricsSet, DisplayAs, Distribution,
ExecutionPlan, PlanProperties,
+ },
};
+use futures::{Stream, StreamExt};
+use itertools::Itertools;
use parquet::arrow::async_reader::ParquetObjectReader;
-use crate::types::ObjectStoreRef;
+use crate::{
+ compare_primitive_columns,
+ types::{ObjectStoreRef, SEQ_COLUMN_NAME},
+};
#[derive(Debug, Clone)]
pub struct DefaultParquetFileReaderFactory {
@@ -53,3 +79,254 @@ impl ParquetFileReaderFactory for
DefaultParquetFileReaderFactory {
Ok(Box::new(reader))
}
}
+
+/// Execution plan for merge RecordBatch values, like Merge Operator in
RocksDB.
+///
+/// Input record batches are sorted by the primary key columns and seq
+/// column.
+#[derive(Debug)]
+pub(crate) struct MergeExec {
+ /// Input plan
+ input: Arc<dyn ExecutionPlan>,
+ /// (0..num_primary_keys) are primary key columns
+ num_primary_keys: usize,
+ /// Sequence column index
+ seq_idx: usize,
+ // (idx, merge_op)
+ value_idx: usize,
+ value_op: Arc<dyn AggregateUDFImpl>,
+}
+
+impl MergeExec {
+ pub fn new(
+ input: Arc<dyn ExecutionPlan>,
+ num_primary_keys: usize,
+ seq_idx: usize,
+ value_idx: usize,
+ value_op: Arc<dyn AggregateUDFImpl>,
+ ) -> Self {
+ Self {
+ input,
+ num_primary_keys,
+ seq_idx,
+ value_idx,
+ value_op,
+ }
+ }
+}
+impl DisplayAs for MergeExec {
+ fn fmt_as(
+ &self,
+ _t: datafusion::physical_plan::DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ write!(
+ f,
+ "MergeExec: [primary_keys: {}, seq_idx: {}]",
+ self.num_primary_keys, self.seq_idx
+ )?;
+ Ok(())
+ }
+}
+
+impl ExecutionPlan for MergeExec {
+ fn name(&self) -> &str {
+ "MergeExec"
+ }
+
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn properties(&self) -> &PlanProperties {
+ self.input.properties()
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::SinglePartition; self.children().len()]
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![true; self.children().len()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> DfResult<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(MergeExec::new(
+ Arc::clone(&children[0]),
+ self.num_primary_keys,
+ self.seq_idx,
+ self.value_idx,
+ self.value_op.clone(),
+ )))
+ }
+
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> DfResult<SendableRecordBatchStream> {
+ if 0 != partition {
+ return internal_err!("MergeExec invalid partition {partition}");
+ }
+
+ Ok(Box::pin(MergeStream::new(
+ self.input.execute(partition, context)?,
+ self.num_primary_keys,
+ self.seq_idx,
+ self.value_idx,
+ self.value_op.clone(),
+ )))
+ }
+}
+
+struct MergeStream {
+ stream: SendableRecordBatchStream,
+ num_primary_keys: usize,
+ seq_idx: usize,
+ value_idx: usize,
+ value_op: Arc<dyn AggregateUDFImpl>,
+
+ pending_batch: Option<RecordBatch>,
+ arrow_schema: SchemaRef,
+}
+
+impl MergeStream {
+ fn new(
+ stream: SendableRecordBatchStream,
+ num_primary_keys: usize,
+ seq_idx: usize,
+ value_idx: usize,
+ value_op: Arc<dyn AggregateUDFImpl>,
+ ) -> Self {
+ let fields = stream
+ .schema()
+ .fields()
+ .into_iter()
+ .filter_map(|f| {
+ if f.name() == SEQ_COLUMN_NAME {
+ None
+ } else {
+ Some(f.clone())
+ }
+ })
+ .collect_vec();
+ let arrow_schema = Arc::new(Schema::new_with_metadata(
+ fields,
+ stream.schema().metadata.clone(),
+ ));
+ Self {
+ stream,
+ num_primary_keys,
+ seq_idx,
+ value_idx,
+ value_op,
+ pending_batch: None,
+ arrow_schema,
+ }
+ }
+
+ fn primary_key_eq(
+ &self,
+ lhs: &RecordBatch,
+ lhs_idx: usize,
+ rhs: &RecordBatch,
+ rhs_idx: usize,
+ ) -> bool {
+ for k in 0..self.num_primary_keys {
+ let lhs_col = lhs.column(k);
+ let rhs_col = rhs.column(k);
+ compare_primitive_columns!(
+ lhs_col, rhs_col, lhs_idx, rhs_idx, // TODO: Add more types
here
+ UInt8Type, Int8Type, UInt32Type, Int32Type, UInt64Type,
Int64Type
+ );
+
+ if let Some(lhs_col) =
lhs_col.as_bytes_opt::<GenericBinaryType<i32>>() {
+ let rhs_col = rhs_col.as_bytes::<GenericBinaryType<i32>>();
+ if !rhs_col.value(rhs_idx).eq(lhs_col.value(lhs_idx)) {
+ return false;
+ }
+ }
+ }
+
+ true
+ }
+
+ // TODO: only support deduplication now, merge operation will be added
later.
+ fn merge_batch(&mut self, batch: RecordBatch) -> DfResult<RecordBatch> {
+ let mut batches = vec![];
+ let mut start_idx = 0;
+ while start_idx < batch.num_rows() {
+ let mut end_idx = start_idx + 1;
+ while end_idx < batch.num_rows()
+ && self.primary_key_eq(&batch, start_idx, &batch, end_idx)
+ {
+ end_idx += 1;
+ }
+ let rows_with_same_primary_keys = batch.slice(start_idx, end_idx -
start_idx);
+ if let Some(pending) = self.pending_batch.take() {
+ if !self.primary_key_eq(
+ &pending,
+ pending.num_rows() - 1,
+ &rows_with_same_primary_keys,
+ 0,
+ ) {
+ // only keep the last row in this batch
+ batches.push(pending.slice(pending.num_rows() - 1, 1));
+ }
+ }
+ batches.push(
+
rows_with_same_primary_keys.slice(rows_with_same_primary_keys.num_rows() - 1,
1),
+ );
+
+ start_idx = end_idx;
+ }
+
+ // last batch may have overlapping rows with the next batch, so keep
them in
+ // pending_batch
+ self.pending_batch = batches.pop();
+
+ concat_batches(&self.stream.schema(), batches.iter())
+ .map_err(|e| DataFusionError::ArrowError(e, None))
+ .map(|mut batch| {
+ // Remove seq column
+ batch.remove_column(self.seq_idx);
+ batch
+ })
+ }
+}
+
+impl Stream for MergeStream {
+ type Item = DfResult<RecordBatch>;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) ->
Poll<Option<Self::Item>> {
+ match self.stream.poll_next_unpin(ctx) {
+ Poll::Pending => Poll::Pending,
+ Poll::Ready(None) => {
+ let value = if let Some(mut pending) =
self.pending_batch.take() {
+ pending.remove_column(self.seq_idx);
+ Some(Ok(pending))
+ } else {
+ None
+ };
+ Poll::Ready(value)
+ }
+ Poll::Ready(Some(v)) => Poll::Ready(Some(v.and_then(|batch| {
+ let batch = self.merge_batch(batch)?;
+ Ok(batch)
+ }))),
+ }
+ }
+}
+
+impl RecordBatchStream for MergeStream {
+ fn schema(&self) -> SchemaRef {
+ self.arrow_schema.clone()
+ }
+}
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index 1419503a..02816088 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use std::{sync::Arc, vec};
+use std::{sync::Arc, time::Duration, vec};
use anyhow::Context;
use arrow::{
- array::{Int64Array, RecordBatch},
+ array::{RecordBatch, UInt64Array},
datatypes::SchemaRef,
};
+use arrow_schema::{DataType, Field, Schema};
use async_trait::async_trait;
use datafusion::{
common::DFSchema,
@@ -30,13 +31,21 @@ use datafusion::{
physical_plan::{FileScanConfig, ParquetExec},
},
execution::{context::ExecutionProps, object_store::ObjectStoreUrl,
SendableRecordBatchStream},
+ functions_aggregate::first_last::LastValue,
logical_expr::{utils::conjunction, Expr},
physical_expr::{create_physical_expr, LexOrdering},
- physical_plan::{execute_stream, memory::MemoryExec, sorts::sort::SortExec},
+ physical_plan::{
+ execute_stream,
+ memory::MemoryExec,
+ sorts::{sort::SortExec,
sort_preserving_merge::SortPreservingMergeExec},
+ union::UnionExec,
+ EmptyRecordBatchStream, ExecutionPlan,
+ },
physical_planner::create_physical_sort_exprs,
prelude::{ident, SessionContext},
};
use futures::StreamExt;
+use itertools::Itertools;
use macros::ensure;
use object_store::path::Path;
use parquet::{
@@ -48,14 +57,17 @@ use parquet::{
use crate::{
manifest::Manifest,
- read::DefaultParquetFileReaderFactory,
- sst::{allocate_id, FileId, FileMeta},
- types::{ObjectStoreRef, TimeRange, Timestamp, WriteOptions, WriteResult},
+ read::{DefaultParquetFileReaderFactory, MergeExec},
+ sst::{allocate_id, FileId, FileMeta, SstFile},
+ types::{ObjectStoreRef, TimeRange, WriteOptions, WriteResult,
SEQ_COLUMN_NAME},
Result,
};
pub struct WriteRequest {
- batch: RecordBatch,
+ pub batch: RecordBatch,
+ pub time_range: TimeRange,
+ // Check data is valid if it's true.
+ pub enable_check: bool,
}
pub struct ScanRequest {
@@ -81,13 +93,17 @@ pub trait TimeMergeStorage {
async fn compact(&self, req: CompactRequest) -> Result<()>;
}
-/// `TimeMergeStorage` implementation using cloud object storage.
+/// `TimeMergeStorage` implementation using cloud object storage, it will split
+/// data into different segments(aka `segment_duration`) based time range.
+///
+/// Compaction will be done by merging segments within a segment, and segment
+/// will make it easy to support expiration.
pub struct CloudObjectStorage {
+ segment_duration: Duration,
path: String,
store: ObjectStoreRef,
arrow_schema: SchemaRef,
- num_primary_key: usize,
- timestamp_index: usize,
+ num_primary_keys: usize,
manifest: Manifest,
df_schema: DFSchema,
@@ -104,24 +120,35 @@ pub struct CloudObjectStorage {
/// {root_path}/data/timestamp_b.sst
/// {root_path}/data/...
/// ```
+/// `root_path` is composed of `path` and `segment_duration`.
impl CloudObjectStorage {
pub async fn try_new(
- root_path: String,
+ path: String,
+ segment_duration: Duration,
store: ObjectStoreRef,
arrow_schema: SchemaRef,
- num_primary_key: usize,
- timestamp_index: usize,
+ num_primary_keys: usize,
write_options: WriteOptions,
) -> Result<Self> {
let manifest_prefix = crate::manifest::PREFIX_PATH;
let manifest =
- Manifest::try_new(format!("{root_path}/{manifest_prefix}"),
store.clone()).await?;
+ Manifest::try_new(format!("{path}/{manifest_prefix}"),
store.clone()).await?;
+ let mut new_fields = arrow_schema.fields.clone().to_vec();
+ new_fields.push(Arc::new(Field::new(
+ SEQ_COLUMN_NAME,
+ DataType::UInt64,
+ true,
+ )));
+ let arrow_schema = Arc::new(Schema::new_with_metadata(
+ new_fields,
+ arrow_schema.metadata.clone(),
+ ));
let df_schema =
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
- let write_props = Self::build_write_props(write_options,
num_primary_key);
+ let write_props = Self::build_write_props(write_options,
num_primary_keys);
Ok(Self {
- path: root_path,
- num_primary_key,
- timestamp_index,
+ path,
+ num_primary_keys,
+ segment_duration,
store,
arrow_schema,
manifest,
@@ -136,7 +163,7 @@ impl CloudObjectStorage {
format!("{root}/{prefix}/{id}")
}
- async fn write_batch(&self, req: WriteRequest) -> Result<WriteResult> {
+ async fn write_batch(&self, batch: RecordBatch) -> Result<WriteResult> {
let file_id = allocate_id();
let file_path = self.build_file_path(file_id);
let file_path = Path::from(file_path);
@@ -149,10 +176,21 @@ impl CloudObjectStorage {
.context("create arrow writer")?;
// sort record batch
- let mut batches = self.sort_batch(req.batch).await?;
+ let mut batches = self.sort_batch(batch).await?;
while let Some(batch) = batches.next().await {
let batch = batch.context("get sorted batch")?;
- writer.write(&batch).await.context("write arrow batch")?;
+ let batch_with_seq = {
+ let mut new_cols = batch.columns().to_vec();
+ // Since file_id in increasing order, we can use it as
sequence.
+ let seq_column = Arc::new(UInt64Array::from(vec![file_id;
batch.num_rows()]));
+ new_cols.push(seq_column);
+ RecordBatch::try_new(self.arrow_schema.clone(), new_cols)
+ .context("construct record batch with seq column")?
+ };
+ writer
+ .write(&batch_with_seq)
+ .await
+ .context("write arrow batch")?;
}
writer.close().await.context("close arrow writer")?;
let object_meta = self
@@ -163,17 +201,21 @@ impl CloudObjectStorage {
Ok(WriteResult {
id: file_id,
+ seq: file_id,
size: object_meta.size,
})
}
- fn build_sort_exprs(&self) -> Result<LexOrdering> {
- let sort_exprs = (0..self.num_primary_key)
+ fn build_sort_exprs(&self, sort_seq: bool) -> Result<LexOrdering> {
+ let mut sort_exprs = (0..self.num_primary_keys)
.map(|i| {
ident(self.schema().field(i).name())
.sort(true /* asc */, true /* nulls_first */)
})
.collect::<Vec<_>>();
+ if sort_seq {
+ sort_exprs.push(ident(SEQ_COLUMN_NAME).sort(true, true));
+ }
let sort_exprs =
create_physical_sort_exprs(&sort_exprs, &self.df_schema,
&ExecutionProps::default())
.context("create physical sort exprs")?;
@@ -184,7 +226,7 @@ impl CloudObjectStorage {
async fn sort_batch(&self, batch: RecordBatch) ->
Result<SendableRecordBatchStream> {
let ctx = SessionContext::default();
let schema = batch.schema();
- let sort_exprs = self.build_sort_exprs()?;
+ let sort_exprs = self.build_sort_exprs(false /* sort_seq */)?;
let batch_plan =
MemoryExec::try_new(&[vec![batch]], schema, None).context("build
batch plan")?;
let physical_plan = Arc::new(SortExec::new(sort_exprs,
Arc::new(batch_plan)));
@@ -235,6 +277,57 @@ impl CloudObjectStorage {
builder.build()
}
+
+ async fn build_scan_plan(
+ &self,
+ ssts: Vec<SstFile>,
+ projections: Option<Vec<usize>>,
+ predicates: Vec<Expr>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // we won't use url for selecting object_store.
+ let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
+ let sort_exprs = self.build_sort_exprs(true /* sort_seq */)?;
+
+ let file_groups = ssts
+ .into_iter()
+ .map(|f| {
+ vec![PartitionedFile::new(
+ self.build_file_path(f.id),
+ f.meta.size as u64,
+ )]
+ })
+ .collect::<Vec<_>>();
+ let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
+ .with_output_ordering(vec![sort_exprs.clone(); file_groups.len()])
+ .with_file_groups(file_groups)
+ .with_projection(projections);
+
+ let mut builder =
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
+ Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
+ );
+ if let Some(expr) = conjunction(predicates) {
+ let filters = create_physical_expr(&expr, &self.df_schema,
&ExecutionProps::new())
+ .context("create physical expr")?;
+ builder = builder.with_predicate(filters);
+ }
+
+ // TODO: fetch using multiple threads since read from parquet will
incur CPU
+ // when convert between arrow and parquet.
+ let parquet_exec = builder.build();
+ let sort_exec = SortPreservingMergeExec::new(sort_exprs,
Arc::new(parquet_exec))
+ // TODO: make fetch size configurable.
+ .with_fetch(Some(1024))
+ .with_round_robin_repartition(true);
+
+ let merge_exec = MergeExec::new(
+ Arc::new(sort_exec),
+ self.num_primary_keys,
+ self.schema().fields.len() - 1,
+ 0, // TODO: value_idx, not used now.
+ Arc::new(LastValue::new()),
+ );
+ Ok(Arc::new(merge_exec))
+ }
}
#[async_trait]
@@ -244,32 +337,27 @@ impl TimeMergeStorage for CloudObjectStorage {
}
async fn write(&self, req: WriteRequest) -> Result<()> {
- ensure!(req.batch.schema_ref().eq(self.schema()), "schema not match");
+ if req.enable_check {
+ let segment_duration = self.segment_duration.as_millis() as i64;
+ ensure!(
+ req.time_range.start.0 / segment_duration
+ == (req.time_range.end.0 - 1) / segment_duration,
+ "time range can't cross segment, value:{:?}",
+ &req.time_range
+ );
+ }
let num_rows = req.batch.num_rows();
- let time_column = req
- .batch
- .column(self.timestamp_index)
- .as_any()
- .downcast_ref::<Int64Array>()
- .context("timestamp column should be int64")?;
-
- let mut start = Timestamp::MAX;
- let mut end = Timestamp::MIN;
- for v in time_column.values() {
- start = start.min(Timestamp(*v));
- end = end.max(Timestamp(*v));
- }
- let time_range = TimeRange::new(start, end + 1);
let WriteResult {
id: file_id,
+ seq,
size: file_size,
- } = self.write_batch(req).await?;
+ } = self.write_batch(req.batch).await?;
let file_meta = FileMeta {
- max_sequence: file_id, // Since file_id in increasing order, we
can use it as sequence.
+ max_sequence: seq,
num_rows: num_rows as u32,
size: file_size as u32,
- time_range,
+ time_range: req.time_range,
};
self.manifest.add_file(file_id, file_meta).await?;
@@ -277,39 +365,36 @@ impl TimeMergeStorage for CloudObjectStorage {
}
async fn scan(&self, req: ScanRequest) ->
Result<SendableRecordBatchStream> {
- let ssts = self.manifest.find_ssts(&req.range).await;
- // we won't use url for selecting object_store.
- let dummy_url = ObjectStoreUrl::parse("empty://").unwrap();
- // TODO: we could group ssts based on time range.
- // TODO: fetch using multiple threads since read from parquet will
incur CPU
- // when convert between arrow and parquet.
- let file_groups = ssts
- .iter()
- .map(|f| PartitionedFile::new(self.build_file_path(f.id),
f.meta.size as u64))
- .collect::<Vec<_>>();
- let scan_config = FileScanConfig::new(dummy_url, self.schema().clone())
- .with_file_group(file_groups)
- .with_projection(req.projections);
-
- let mut builder =
ParquetExec::builder(scan_config).with_parquet_file_reader_factory(
- Arc::new(DefaultParquetFileReaderFactory::new(self.store.clone())),
- );
- if let Some(expr) = conjunction(req.predicate) {
- let filters = create_physical_expr(&expr, &self.df_schema,
&ExecutionProps::new())
- .context("create pyhsical expr")?;
- builder = builder.with_predicate(filters);
+ let total_ssts = self.manifest.find_ssts(&req.range).await;
+ if total_ssts.is_empty() {
+ return Ok(Box::pin(EmptyRecordBatchStream::new(
+ self.arrow_schema.clone(),
+ )));
}
- let parquet_exec = builder.build();
- let sort_exprs = self.build_sort_exprs()?;
- let physical_plan = Arc::new(SortExec::new(sort_exprs,
Arc::new(parquet_exec)));
+ let ssts_by_segment = total_ssts.into_iter().group_by(|file| {
+ file.meta.time_range.start.0 / self.segment_duration.as_millis()
as i64
+ });
+
+ let mut plan_for_all_segments = Vec::new();
+ for (_, ssts) in ssts_by_segment.sorted_by(|a, b| a.0.cmp(&b.0)) {
+ let plan = self
+ .build_scan_plan(ssts, req.projections.clone(),
req.predicate.clone())
+ .await?;
+
+ plan_for_all_segments.push(plan);
+ }
let ctx = SessionContext::default();
- // TODO: dedup record batch based on primary keys and sequence number.
- let res =
- execute_stream(physical_plan, ctx.task_ctx()).context("execute
sort physical plan")?;
+ if plan_for_all_segments.len() == 1 {
+ let res = execute_stream(plan_for_all_segments.remove(0),
ctx.task_ctx())
+ .context("execute stream")?;
+ return Ok(res);
+ }
- Ok(res)
+ let union_exec = Arc::new(UnionExec::new(plan_for_all_segments));
+ let res = execute_stream(union_exec, ctx.task_ctx()).context("execute
stream")?;
+ return Ok(res);
}
async fn compact(&self, req: CompactRequest) -> Result<()> {
@@ -319,64 +404,171 @@ impl TimeMergeStorage for CloudObjectStorage {
#[cfg(test)]
mod tests {
- use arrow::{
- array::UInt8Array,
- datatypes::{DataType, Field, Schema},
- };
+ use arrow::array::{self as arrow_array};
+ use datafusion::common::record_batch;
use object_store::local::LocalFileSystem;
use super::*;
+ use crate::{arrow_schema, types::Timestamp};
#[tokio::test]
- async fn test_sort_batch() {
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::UInt8, false),
- Field::new("b", DataType::UInt8, false),
- Field::new("c", DataType::UInt8, false),
- Field::new("d", DataType::UInt8, false),
- ]));
+ async fn test_build_scan_plan() {
+ let schema = arrow_schema!(("pk1", UInt8));
+ let store = Arc::new(LocalFileSystem::new());
+ let storage = CloudObjectStorage::try_new(
+ "mock".to_string(),
+ Duration::from_hours(2),
+ store,
+ schema.clone(),
+ 1, // num_primary_keys
+ WriteOptions::default(),
+ )
+ .await
+ .unwrap();
+ let plan = storage
+ .build_scan_plan(
+ (100..103)
+ .map(|id| SstFile {
+ id,
+ meta: FileMeta {
+ max_sequence: id,
+ num_rows: 1,
+ size: 1,
+ time_range: (1..10).into(),
+ },
+ })
+ .collect(),
+ None,
+ vec![],
+ )
+ .await
+ .unwrap();
+ let display_plan =
+
datafusion::physical_plan::display::DisplayableExecutionPlan::new(plan.as_ref())
+ .indent(true);
+ assert_eq!(
+ r#"MergeExec: [primary_keys: 1, seq_idx: 1]
+ SortPreservingMergeExec: [pk1@0 ASC, __seq__@1 ASC], fetch=1024
+ ParquetExec: file_groups={3 groups: [[mock/data/100], [mock/data/101],
[mock/data/102]]}, projection=[pk1, __seq__], output_orderings=[[pk1@0 ASC,
__seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC], [pk1@0 ASC, __seq__@1 ASC]]
+"#,
+ format!("{display_plan}")
+ );
+ }
+ #[tokio::test]
+ async fn test_storage_write_and_scan() {
+ let schema = arrow_schema!(("pk1", UInt8), ("pk2", UInt8), ("value",
Int64));
+ let root_dir = temp_dir::TempDir::new().unwrap();
let store = Arc::new(LocalFileSystem::new());
let storage = CloudObjectStorage::try_new(
- "/tmp/storage".to_string(),
+ root_dir.path().to_string_lossy().to_string(),
+ Duration::from_hours(2),
store,
schema.clone(),
- 1,
- 1,
+ 2, // num_primary_keys
WriteOptions::default(),
)
.await
.unwrap();
- let batch = RecordBatch::try_new(
+ let batch = record_batch!(
+ ("pk1", UInt8, vec![11, 11, 9, 10, 5]),
+ ("pk2", UInt8, vec![100, 100, 1, 2, 3]),
+ ("value", Int64, vec![2, 7, 4, 6, 1])
+ )
+ .unwrap();
+ storage
+ .write(WriteRequest {
+ batch,
+ time_range: (1..10).into(),
+ enable_check: true,
+ })
+ .await
+ .unwrap();
+
+ let batch = record_batch!(
+ ("pk1", UInt8, vec![11, 11, 9, 10]),
+ ("pk2", UInt8, vec![100, 99, 1, 2]),
+ ("value", Int64, vec![22, 77, 44, 66])
+ )
+ .unwrap();
+ storage
+ .write(WriteRequest {
+ batch,
+ time_range: (10..20).into(),
+ enable_check: true,
+ })
+ .await
+ .unwrap();
+
+ let mut result_stream = storage
+ .scan(ScanRequest {
+ range: TimeRange::new(Timestamp(0), Timestamp::MAX),
+ predicate: vec![],
+ projections: None,
+ })
+ .await
+ .unwrap();
+ let expected_batch = [
+ record_batch!(
+ ("pk1", UInt8, vec![5, 9, 10, 11]),
+ ("pk2", UInt8, vec![3, 1, 2, 99]),
+ ("value", Int64, vec![1, 44, 66, 77])
+ )
+ .unwrap(),
+ record_batch!(
+ ("pk1", UInt8, vec![11]),
+ ("pk2", UInt8, vec![100]),
+ ("value", Int64, vec![22])
+ )
+ .unwrap(),
+ ];
+ let mut idx = 0;
+ while let Some(batch) = result_stream.next().await {
+ let batch = batch.unwrap();
+ assert_eq!(expected_batch[idx], batch);
+ idx += 1;
+ }
+ }
+
+ #[tokio::test]
+ async fn test_storage_sort_batch() {
+ let schema = arrow_schema!(("a", UInt8), ("b", UInt8), ("c", UInt8),
("c", UInt8));
+ let root_dir = temp_dir::TempDir::new().unwrap();
+ let store = Arc::new(LocalFileSystem::new());
+ let storage = CloudObjectStorage::try_new(
+ root_dir.path().to_string_lossy().to_string(),
+ Duration::from_hours(2),
+ store,
schema.clone(),
- vec![
- Arc::new(UInt8Array::from(vec![2, 1, 3, 4, 8, 6, 5, 7])),
- Arc::new(UInt8Array::from(vec![1, 3, 4, 8, 2, 6, 5, 7])),
- Arc::new(UInt8Array::from(vec![8, 6, 2, 4, 3, 1, 5, 7])),
- Arc::new(UInt8Array::from(vec![2, 7, 4, 6, 1, 3, 5, 8])),
- ],
+ 1,
+ WriteOptions::default(),
)
+ .await
.unwrap();
- let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
- let expected_bacth = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(UInt8Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8])),
- Arc::new(UInt8Array::from(vec![3, 1, 4, 8, 5, 6, 7, 2])),
- Arc::new(UInt8Array::from(vec![6, 8, 2, 4, 5, 1, 7, 3])),
- Arc::new(UInt8Array::from(vec![7, 2, 4, 6, 5, 3, 8, 1])),
- ],
+ let batch = record_batch!(
+ ("a", UInt8, vec![2, 1, 3, 4, 8, 6, 5, 7]),
+ ("b", UInt8, vec![1, 3, 4, 8, 2, 6, 5, 7]),
+ ("c", UInt8, vec![8, 6, 2, 4, 3, 1, 5, 7]),
+ ("d", UInt8, vec![2, 7, 4, 6, 1, 3, 5, 8])
)
.unwrap();
+ let mut sorted_batches = storage.sort_batch(batch).await.unwrap();
+ let expected_bacth = record_batch!(
+ ("a", UInt8, vec![1, 2, 3, 4, 5, 6, 7, 8]),
+ ("b", UInt8, vec![3, 1, 4, 8, 5, 6, 7, 2]),
+ ("c", UInt8, vec![6, 8, 2, 4, 5, 1, 7, 3]),
+ ("d", UInt8, vec![7, 2, 4, 6, 5, 3, 8, 1])
+ )
+ .unwrap();
let mut offset = 0;
while let Some(sorted_batch) = sorted_batches.next().await {
let sorted_batch = sorted_batch.unwrap();
let length = sorted_batch.num_rows();
let batch = expected_bacth.slice(offset, length);
- assert!(sorted_batch.eq(&batch));
+ assert_eq!(sorted_batch, batch);
offset += length;
}
}
diff --git a/horaedb/metric_engine/src/lib.rs
b/horaedb/metric_engine/src/test_util.rs
similarity index 53%
copy from horaedb/metric_engine/src/lib.rs
copy to horaedb/metric_engine/src/test_util.rs
index be7ef2b4..9c378b0a 100644
--- a/horaedb/metric_engine/src/lib.rs
+++ b/horaedb/metric_engine/src/test_util.rs
@@ -15,13 +15,27 @@
// specific language governing permissions and limitations
// under the License.
-//! Storage Engine for metrics.
+#[macro_export]
+macro_rules! arrow_schema {
+ ($(($field_name:expr, $data_type:ident)),* $(,)?) => {{
+ let fields = vec![
+ $(
+ arrow::datatypes::Field::new($field_name,
arrow::datatypes::DataType::$data_type, true),
+ )*
+ ];
+ std::sync::Arc::new(arrow::datatypes::Schema::new(fields))
+ }};
+}
-pub mod error;
-mod manifest;
-mod read;
-mod sst;
-pub mod storage;
-pub mod types;
+#[cfg(test)]
+mod tests {
+ #[test]
+ fn test_arrow_schema_macro() {
+ let schema = arrow_schema![("a", UInt8), ("b", UInt8), ("c", UInt8),
("d", UInt8),];
-pub use error::{AnyhowError, Error, Result};
+ let expected_names = ["a", "b", "c", "d"];
+ for (i, f) in schema.fields().iter().enumerate() {
+ assert_eq!(f.name(), expected_names[i]);
+ }
+ }
+}
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index 7bc9da18..624b8a2e 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -26,6 +26,10 @@ use parquet::basic::{Compression, Encoding, ZstdLevel};
use crate::sst::FileId;
+// Seq column is a builtin column, and it will be appended to the end of
+// user-defined schema.
+pub const SEQ_COLUMN_NAME: &str = "__seq__";
+
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Timestamp(pub i64);
@@ -73,6 +77,15 @@ impl From<Range<Timestamp>> for TimeRange {
}
}
+impl From<Range<i64>> for TimeRange {
+ fn from(value: Range<i64>) -> Self {
+ Self(Range {
+ start: value.start.into(),
+ end: value.end.into(),
+ })
+ }
+}
+
impl Deref for TimeRange {
type Target = Range<Timestamp>;
@@ -95,6 +108,7 @@ pub type ObjectStoreRef = Arc<dyn ObjectStore>;
pub struct WriteResult {
pub id: FileId,
+ pub seq: u64,
pub size: usize,
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]