This is an automated email from the ASF dual-hosted git repository.
jiacai2050 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 106197d2 fix: missing filter index over the primary keys (#1456)
106197d2 is described below
commit 106197d25d8e42df56f82aa27940b396a8becda7
Author: WEI Xikai <[email protected]>
AuthorDate: Tue Jan 23 15:42:17 2024 +0800
fix: missing filter index over the primary keys (#1456)
## Rationale
Current filter index won't be created over the primary key columns, and
it leads to unexpected low prune rate in some cases.
## Detailed Changes
- Separate the filter module as a sub-module of the meta_data
- Create filter index over the primary key columns except the tsid and
timestamp column.
## Test Plan
Should pass the CI.
---
.../src/sst/parquet/async_reader.rs | 2 +-
.../parquet/{meta_data.rs => meta_data/filter.rs} | 259 ++-------------------
.../src/sst/parquet/meta_data/mod.rs | 247 ++++++++++++++++++++
.../src/sst/parquet/row_group_pruner.rs | 2 +-
src/analytic_engine/src/sst/parquet/writer.rs | 25 +-
5 files changed, 280 insertions(+), 255 deletions(-)
diff --git a/src/analytic_engine/src/sst/parquet/async_reader.rs
b/src/analytic_engine/src/sst/parquet/async_reader.rs
index c56836c9..94feeab2 100644
--- a/src/analytic_engine/src/sst/parquet/async_reader.rs
+++ b/src/analytic_engine/src/sst/parquet/async_reader.rs
@@ -71,7 +71,7 @@ use crate::{
metrics::MaybeTableLevelMetrics,
parquet::{
encoding::ParquetDecoder,
- meta_data::{ColumnValueSet, ParquetFilter},
+ meta_data::{filter::ParquetFilter, ColumnValueSet},
row_group_pruner::RowGroupPruner,
},
reader::{error::*, Result, SstReader},
diff --git a/src/analytic_engine/src/sst/parquet/meta_data.rs
b/src/analytic_engine/src/sst/parquet/meta_data/filter.rs
similarity index 59%
rename from src/analytic_engine/src/sst/parquet/meta_data.rs
rename to src/analytic_engine/src/sst/parquet/meta_data/filter.rs
index 4c8b51ce..d64ad974 100644
--- a/src/analytic_engine/src/sst/parquet/meta_data.rs
+++ b/src/analytic_engine/src/sst/parquet/meta_data/filter.rs
@@ -15,66 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-// MetaData for SST based on parquet.
-
-use std::{collections::HashSet, fmt, ops::Index, sync::Arc};
-
-use bytes_ext::Bytes;
-use common_types::{
- datum::DatumKind,
- schema::{RecordSchemaWithKey, Schema},
- time::TimeRange,
- SequenceNumber,
-};
-use horaedbproto::{schema as schema_pb, sst as sst_pb};
-use macros::define_result;
-use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
-use xorfilter::xor8::{Xor8, Xor8Builder};
+// TODO: Better module name should be index.
-use crate::sst::writer::MetaData;
-
-/// Error of sst file.
-#[derive(Debug, Snafu)]
-pub enum Error {
- #[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))]
- TimeRangeNotFound { backtrace: Backtrace },
-
- #[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))]
- TableSchemaNotFound { backtrace: Backtrace },
-
- #[snafu(display(
- "Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}",
- source,
- backtrace
- ))]
- ParseXor8Filter {
- source: std::io::Error,
- backtrace: Backtrace,
- },
-
- #[snafu(display(
- "Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}",
- source,
- backtrace
- ))]
- BuildXor8Filter {
- source: xorfilter::Error,
- backtrace: Backtrace,
- },
-
- #[snafu(display("Failed to convert time range, err:{}", source))]
- ConvertTimeRange { source: common_types::time::Error },
-
- #[snafu(display("Failed to convert table schema, err:{}", source))]
- ConvertTableSchema { source: common_types::schema::Error },
-}
+use std::{fmt, ops::Index};
-define_result!(Error);
+use common_types::{datum::DatumKind, schema::Schema};
+use horaedbproto::sst as sst_pb;
+use snafu::ResultExt;
+use xorfilter::xor8::{Xor8, Xor8Builder};
+use crate::sst::parquet::meta_data::{BuildXor8Filter, Error, ParseXor8Filter,
Result};
+
+// TODO: move this to sst module, and add a FilterBuild trait
/// Filter can be used to test whether an element is a member of a set.
/// False positive matches are possible if space-efficient probabilistic data
/// structure are used.
-// TODO: move this to sst module, and add a FilterBuild trait
trait Filter: fmt::Debug {
fn r#type(&self) -> FilterType;
@@ -89,7 +44,7 @@ trait Filter: fmt::Debug {
self.to_bytes().len()
}
- /// Deserialize the binary array to bitmap index.
+ /// Deserialize the binary array to specific filter.
fn from_bytes(buf: Vec<u8>) -> Result<Self>
where
Self: Sized;
@@ -140,13 +95,19 @@ pub struct RowGroupFilterBuilder {
}
impl RowGroupFilterBuilder {
- pub(crate) fn new(record_schema: &RecordSchemaWithKey) -> Self {
- let builders = record_schema
+ pub(crate) fn new(schema: &Schema) -> Self {
+ let builders = schema
.columns()
.iter()
.enumerate()
.map(|(i, col)| {
- if record_schema.is_primary_key_index(i) {
+ // No need to create filter index over the timestamp column.
+ if schema.timestamp_index() == i {
+ return None;
+ }
+
+ // No need to create filter index over the tsid column.
+ if schema.index_of_tsid().map(|idx| idx == i).unwrap_or(false)
{
return None;
}
@@ -340,185 +301,6 @@ impl TryFrom<sst_pb::ParquetFilter> for ParquetFilter {
}
}
-/// Meta data of a sst file
-#[derive(Clone, PartialEq)]
-pub struct ParquetMetaData {
- pub min_key: Bytes,
- pub max_key: Bytes,
- /// Time Range of the sst
- pub time_range: TimeRange,
- /// Max sequence number in the sst
- pub max_sequence: SequenceNumber,
- pub schema: Schema,
- pub parquet_filter: Option<ParquetFilter>,
- pub column_values: Option<Vec<Option<ColumnValueSet>>>,
-}
-
-pub type ParquetMetaDataRef = Arc<ParquetMetaData>;
-
-impl From<&MetaData> for ParquetMetaData {
- fn from(meta: &MetaData) -> Self {
- Self {
- min_key: meta.min_key.clone(),
- max_key: meta.max_key.clone(),
- time_range: meta.time_range,
- max_sequence: meta.max_sequence,
- schema: meta.schema.clone(),
- parquet_filter: None,
- column_values: None,
- }
- }
-}
-
-impl From<ParquetMetaData> for MetaData {
- fn from(meta: ParquetMetaData) -> Self {
- Self {
- min_key: meta.min_key,
- max_key: meta.max_key,
- time_range: meta.time_range,
- max_sequence: meta.max_sequence,
- schema: meta.schema,
- }
- }
-}
-
-impl From<Arc<ParquetMetaData>> for MetaData {
- fn from(meta: Arc<ParquetMetaData>) -> Self {
- Self {
- min_key: meta.min_key.clone(),
- max_key: meta.max_key.clone(),
- time_range: meta.time_range,
- max_sequence: meta.max_sequence,
- schema: meta.schema.clone(),
- }
- }
-}
-
-impl fmt::Debug for ParquetMetaData {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("ParquetMetaData")
- .field("min_key", &hex::encode(&self.min_key))
- .field("max_key", &hex::encode(&self.max_key))
- .field("time_range", &self.time_range)
- .field("max_sequence", &self.max_sequence)
- .field("schema", &self.schema)
- .field("column_values", &self.column_values)
- .field(
- "filter_size",
- &self
- .parquet_filter
- .as_ref()
- .map(|filter| filter.size())
- .unwrap_or(0),
- )
- .finish()
- }
-}
-
-impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
- fn from(src: ParquetMetaData) -> Self {
- let column_values = if let Some(v) = src.column_values {
- v.into_iter()
- .map(|col| sst_pb::ColumnValueSet {
- value: col.map(|col| col.into()),
- })
- .collect()
- } else {
- Vec::new()
- };
- sst_pb::ParquetMetaData {
- min_key: src.min_key.to_vec(),
- max_key: src.max_key.to_vec(),
- max_sequence: src.max_sequence,
- time_range: Some(src.time_range.into()),
- schema: Some(schema_pb::TableSchema::from(&src.schema)),
- filter: src.parquet_filter.map(|v| v.into()),
- // collapsible_cols_idx is used in hybrid format ,and it's
deprecated.
- collapsible_cols_idx: Vec::new(),
- column_values,
- }
- }
-}
-
-impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
- type Error = Error;
-
- fn try_from(src: sst_pb::ParquetMetaData) -> Result<Self> {
- let time_range = {
- let time_range = src.time_range.context(TimeRangeNotFound)?;
- TimeRange::try_from(time_range).context(ConvertTimeRange)?
- };
- let schema = {
- let schema = src.schema.context(TableSchemaNotFound)?;
- Schema::try_from(schema).context(ConvertTableSchema)?
- };
- let parquet_filter =
src.filter.map(ParquetFilter::try_from).transpose()?;
- let column_values = if src.column_values.is_empty() {
- // Old version sst don't has this, so set to none.
- None
- } else {
- Some(
- src.column_values
- .into_iter()
- .map(|v| v.value.map(|v| v.into()))
- .collect(),
- )
- };
-
- Ok(Self {
- min_key: src.min_key.into(),
- max_key: src.max_key.into(),
- time_range,
- max_sequence: src.max_sequence,
- schema,
- parquet_filter,
- column_values,
- })
- }
-}
-
-#[derive(Debug, PartialEq, Clone)]
-pub enum ColumnValueSet {
- StringValue(HashSet<String>),
-}
-
-impl ColumnValueSet {
- pub fn is_empty(&self) -> bool {
- match self {
- Self::StringValue(sv) => sv.is_empty(),
- }
- }
-
- pub fn len(&self) -> usize {
- match self {
- Self::StringValue(sv) => sv.len(),
- }
- }
-}
-
-impl From<ColumnValueSet> for sst_pb::column_value_set::Value {
- fn from(value: ColumnValueSet) -> Self {
- match value {
- ColumnValueSet::StringValue(values) => {
- let values = values.into_iter().collect();
-
sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet {
- values,
- })
- }
- }
- }
-}
-
-impl From<sst_pb::column_value_set::Value> for ColumnValueSet {
- fn from(value: sst_pb::column_value_set::Value) -> Self {
- match value {
- sst_pb::column_value_set::Value::StringSet(ss) => {
- ColumnValueSet::StringValue(HashSet::from_iter(ss.values))
- }
- }
- }
-}
-
#[cfg(test)]
mod tests {
use common_types::tests::build_schema;
@@ -569,8 +351,7 @@ mod tests {
fn test_row_group_filter_builder() {
// (key1(varbinary), key2(timestamp), field1(double), field2(string))
let schema = build_schema();
- let record_schema = schema.to_record_schema_with_key();
- let mut builders = RowGroupFilterBuilder::new(&record_schema);
+ let mut builders = RowGroupFilterBuilder::new(&schema);
for key in ["host-123", "host-456", "host-789"] {
builders.add_key(3, key.as_bytes());
}
diff --git a/src/analytic_engine/src/sst/parquet/meta_data/mod.rs
b/src/analytic_engine/src/sst/parquet/meta_data/mod.rs
new file mode 100644
index 00000000..0120d649
--- /dev/null
+++ b/src/analytic_engine/src/sst/parquet/meta_data/mod.rs
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// MetaData for SST based on parquet.
+
+use std::{collections::HashSet, fmt, sync::Arc};
+
+use bytes_ext::Bytes;
+use common_types::{schema::Schema, time::TimeRange, SequenceNumber};
+use horaedbproto::{schema as schema_pb, sst as sst_pb};
+use macros::define_result;
+use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
+
+use crate::sst::{parquet::meta_data::filter::ParquetFilter, writer::MetaData};
+
+pub mod filter;
+
+/// Error of sst file.
+#[derive(Debug, Snafu)]
+pub enum Error {
+ #[snafu(display("Time range is not found.\nBacktrace\n:{}", backtrace))]
+ TimeRangeNotFound { backtrace: Backtrace },
+
+ #[snafu(display("Table schema is not found.\nBacktrace\n:{}", backtrace))]
+ TableSchemaNotFound { backtrace: Backtrace },
+
+ #[snafu(display(
+ "Failed to parse Xor8Filter from bytes, err:{}.\nBacktrace\n:{}",
+ source,
+ backtrace
+ ))]
+ ParseXor8Filter {
+ source: std::io::Error,
+ backtrace: Backtrace,
+ },
+
+ #[snafu(display(
+ "Failed to build Xor8Filter, err:{}.\nBacktrace\n:{}",
+ source,
+ backtrace
+ ))]
+ BuildXor8Filter {
+ source: xorfilter::Error,
+ backtrace: Backtrace,
+ },
+
+ #[snafu(display("Failed to convert time range, err:{}", source))]
+ ConvertTimeRange { source: common_types::time::Error },
+
+ #[snafu(display("Failed to convert table schema, err:{}", source))]
+ ConvertTableSchema { source: common_types::schema::Error },
+}
+
+define_result!(Error);
+
+/// Meta data of a sst file
+#[derive(Clone, PartialEq)]
+pub struct ParquetMetaData {
+ pub min_key: Bytes,
+ pub max_key: Bytes,
+ /// Time Range of the sst
+ pub time_range: TimeRange,
+ /// Max sequence number in the sst
+ pub max_sequence: SequenceNumber,
+ pub schema: Schema,
+ pub parquet_filter: Option<ParquetFilter>,
+ pub column_values: Option<Vec<Option<ColumnValueSet>>>,
+}
+
+pub type ParquetMetaDataRef = Arc<ParquetMetaData>;
+
+impl From<&MetaData> for ParquetMetaData {
+ fn from(meta: &MetaData) -> Self {
+ Self {
+ min_key: meta.min_key.clone(),
+ max_key: meta.max_key.clone(),
+ time_range: meta.time_range,
+ max_sequence: meta.max_sequence,
+ schema: meta.schema.clone(),
+ parquet_filter: None,
+ column_values: None,
+ }
+ }
+}
+
+impl From<ParquetMetaData> for MetaData {
+ fn from(meta: ParquetMetaData) -> Self {
+ Self {
+ min_key: meta.min_key,
+ max_key: meta.max_key,
+ time_range: meta.time_range,
+ max_sequence: meta.max_sequence,
+ schema: meta.schema,
+ }
+ }
+}
+
+impl From<Arc<ParquetMetaData>> for MetaData {
+ fn from(meta: Arc<ParquetMetaData>) -> Self {
+ Self {
+ min_key: meta.min_key.clone(),
+ max_key: meta.max_key.clone(),
+ time_range: meta.time_range,
+ max_sequence: meta.max_sequence,
+ schema: meta.schema.clone(),
+ }
+ }
+}
+
+impl fmt::Debug for ParquetMetaData {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetMetaData")
+ .field("min_key", &hex::encode(&self.min_key))
+ .field("max_key", &hex::encode(&self.max_key))
+ .field("time_range", &self.time_range)
+ .field("max_sequence", &self.max_sequence)
+ .field("schema", &self.schema)
+ .field("column_values", &self.column_values)
+ .field(
+ "filter_size",
+ &self
+ .parquet_filter
+ .as_ref()
+ .map(|filter| filter.size())
+ .unwrap_or(0),
+ )
+ .finish()
+ }
+}
+
+impl From<ParquetMetaData> for sst_pb::ParquetMetaData {
+ fn from(src: ParquetMetaData) -> Self {
+ let column_values = if let Some(v) = src.column_values {
+ v.into_iter()
+ .map(|col| sst_pb::ColumnValueSet {
+ value: col.map(|col| col.into()),
+ })
+ .collect()
+ } else {
+ Vec::new()
+ };
+ sst_pb::ParquetMetaData {
+ min_key: src.min_key.to_vec(),
+ max_key: src.max_key.to_vec(),
+ max_sequence: src.max_sequence,
+ time_range: Some(src.time_range.into()),
+ schema: Some(schema_pb::TableSchema::from(&src.schema)),
+ filter: src.parquet_filter.map(|v| v.into()),
+ // collapsible_cols_idx is used in hybrid format ,and it's
deprecated.
+ collapsible_cols_idx: Vec::new(),
+ column_values,
+ }
+ }
+}
+
+impl TryFrom<sst_pb::ParquetMetaData> for ParquetMetaData {
+ type Error = Error;
+
+ fn try_from(src: sst_pb::ParquetMetaData) -> Result<Self> {
+ let time_range = {
+ let time_range = src.time_range.context(TimeRangeNotFound)?;
+ TimeRange::try_from(time_range).context(ConvertTimeRange)?
+ };
+ let schema = {
+ let schema = src.schema.context(TableSchemaNotFound)?;
+ Schema::try_from(schema).context(ConvertTableSchema)?
+ };
+ let parquet_filter =
src.filter.map(ParquetFilter::try_from).transpose()?;
+ let column_values = if src.column_values.is_empty() {
+ // Old version sst don't has this, so set to none.
+ None
+ } else {
+ Some(
+ src.column_values
+ .into_iter()
+ .map(|v| v.value.map(|v| v.into()))
+ .collect(),
+ )
+ };
+
+ Ok(Self {
+ min_key: src.min_key.into(),
+ max_key: src.max_key.into(),
+ time_range,
+ max_sequence: src.max_sequence,
+ schema,
+ parquet_filter,
+ column_values,
+ })
+ }
+}
+
+#[derive(Debug, PartialEq, Clone)]
+pub enum ColumnValueSet {
+ StringValue(HashSet<String>),
+}
+
+impl ColumnValueSet {
+ pub fn is_empty(&self) -> bool {
+ match self {
+ Self::StringValue(sv) => sv.is_empty(),
+ }
+ }
+
+ pub fn len(&self) -> usize {
+ match self {
+ Self::StringValue(sv) => sv.len(),
+ }
+ }
+}
+
+impl From<ColumnValueSet> for sst_pb::column_value_set::Value {
+ fn from(value: ColumnValueSet) -> Self {
+ match value {
+ ColumnValueSet::StringValue(values) => {
+ let values = values.into_iter().collect();
+
sst_pb::column_value_set::Value::StringSet(sst_pb::column_value_set::StringSet {
+ values,
+ })
+ }
+ }
+ }
+}
+
+impl From<sst_pb::column_value_set::Value> for ColumnValueSet {
+ fn from(value: sst_pb::column_value_set::Value) -> Self {
+ match value {
+ sst_pb::column_value_set::Value::StringSet(ss) => {
+ ColumnValueSet::StringValue(HashSet::from_iter(ss.values))
+ }
+ }
+ }
+}
diff --git a/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
b/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
index a101ff05..3aa0c43c 100644
--- a/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
+++ b/src/analytic_engine/src/sst/parquet/row_group_pruner.rs
@@ -40,7 +40,7 @@ use snafu::ensure;
use trace_metric::{MetricsCollector, TraceMetricWhenDrop};
use crate::sst::{
- parquet::meta_data::{ColumnValueSet, ParquetFilter},
+ parquet::meta_data::{filter::ParquetFilter, ColumnValueSet},
reader::error::{OtherNoCause, Result},
};
diff --git a/src/analytic_engine/src/sst/parquet/writer.rs
b/src/analytic_engine/src/sst/parquet/writer.rs
index 5be8712c..88293497 100644
--- a/src/analytic_engine/src/sst/parquet/writer.rs
+++ b/src/analytic_engine/src/sst/parquet/writer.rs
@@ -21,7 +21,8 @@ use std::collections::{HashMap, HashSet};
use async_trait::async_trait;
use common_types::{
- datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId,
time::TimeRange,
+ datum::DatumKind, record_batch::FetchedRecordBatch, request_id::RequestId,
schema::Schema,
+ time::TimeRange,
};
use datafusion::parquet::basic::Compression;
use futures::StreamExt;
@@ -39,14 +40,13 @@ use crate::{
parquet::{
encoding::{encode_sst_meta_data, ColumnEncoding, EncodeOptions,
ParquetEncoder},
meta_data::{
- ColumnValueSet, ParquetFilter, ParquetMetaData, RowGroupFilter,
- RowGroupFilterBuilder,
+ filter::{ParquetFilter, RowGroupFilter, RowGroupFilterBuilder},
+ ColumnValueSet, ParquetMetaData,
},
},
writer::{
- self, BuildParquetFilter, BuildParquetFilterNoCause, EncodePbData,
EncodeRecordBatch,
- ExpectTimestampColumn, Io, MetaData, PollRecordBatch,
RecordBatchStream, Result,
- SstInfo, SstWriter, Storage,
+ self, BuildParquetFilter, EncodePbData, EncodeRecordBatch,
ExpectTimestampColumn, Io,
+ MetaData, PollRecordBatch, RecordBatchStream, Result, SstInfo,
SstWriter, Storage,
},
},
table::sst_util,
@@ -237,15 +237,10 @@ impl<'a> RecordBatchGroupWriter<'a> {
/// Build the parquet filter for the given `row_group`.
fn build_row_group_filter(
&self,
+ schema: &Schema,
row_group_batch: &[FetchedRecordBatch],
) -> Result<RowGroupFilter> {
- let schema_with_key =
- row_group_batch[0]
- .schema_with_key()
- .with_context(|| BuildParquetFilterNoCause {
- msg: "primary key indexes not exist",
- })?;
- let mut builder = RowGroupFilterBuilder::new(&schema_with_key);
+ let mut builder = RowGroupFilterBuilder::new(schema);
for partial_batch in row_group_batch {
for (col_idx, column) in
partial_batch.columns().iter().enumerate() {
@@ -356,7 +351,9 @@ impl<'a> RecordBatchGroupWriter<'a> {
let timestamp_index = self.meta_data.schema.timestamp_index();
while !row_group.is_empty() {
if let Some(filter) = &mut parquet_filter {
-
filter.push_row_group_filter(self.build_row_group_filter(&row_group)?);
+ filter.push_row_group_filter(
+ self.build_row_group_filter(&self.meta_data.schema,
&row_group)?,
+ );
}
let num_batches = row_group.len();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]