This is an automated email from the ASF dual-hosted git repository.
baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git
The following commit(s) were added to refs/heads/main by this push:
new 9931950a refactor: make writing properties independently (#1588)
9931950a is described below
commit 9931950a14845a4b84b6623d512763c64f46ac46
Author: 鲍金日 <[email protected]>
AuthorDate: Wed Nov 6 19:08:26 2024 +0800
refactor: make writing properties independently (#1588)
## Rationale
We should custom how parquet write handle its metadata.
## Detailed Changes
## Test Plan
---
horaedb/metric_engine/src/storage.rs | 81 ++++++++++++++++++++++++++++++------
horaedb/metric_engine/src/types.rs | 37 ++++++++++++++++
2 files changed, 106 insertions(+), 12 deletions(-)
diff --git a/horaedb/metric_engine/src/storage.rs
b/horaedb/metric_engine/src/storage.rs
index b64e7a8d..1419503a 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -42,19 +42,20 @@ use object_store::path::Path;
use parquet::{
arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
file::properties::WriterProperties,
+ format::SortingColumn,
+ schema::types::ColumnPath,
};
use crate::{
manifest::Manifest,
read::DefaultParquetFileReaderFactory,
sst::{allocate_id, FileId, FileMeta},
- types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult},
+ types::{ObjectStoreRef, TimeRange, Timestamp, WriteOptions, WriteResult},
Result,
};
pub struct WriteRequest {
batch: RecordBatch,
- props: Option<WriterProperties>,
}
pub struct ScanRequest {
@@ -90,6 +91,7 @@ pub struct CloudObjectStorage {
manifest: Manifest,
df_schema: DFSchema,
+ write_props: WriterProperties,
}
/// It will organize the data in the following way:
@@ -109,11 +111,13 @@ impl CloudObjectStorage {
arrow_schema: SchemaRef,
num_primary_key: usize,
timestamp_index: usize,
+ write_options: WriteOptions,
) -> Result<Self> {
let manifest_prefix = crate::manifest::PREFIX_PATH;
let manifest =
Manifest::try_new(format!("{root_path}/{manifest_prefix}"),
store.clone()).await?;
let df_schema =
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
+ let write_props = Self::build_write_props(write_options,
num_primary_key);
Ok(Self {
path: root_path,
num_primary_key,
@@ -122,6 +126,7 @@ impl CloudObjectStorage {
arrow_schema,
manifest,
df_schema,
+ write_props,
})
}
@@ -136,9 +141,12 @@ impl CloudObjectStorage {
let file_path = self.build_file_path(file_id);
let file_path = Path::from(file_path);
let object_store_writer = ParquetObjectWriter::new(self.store.clone(),
file_path.clone());
- let mut writer =
- AsyncArrowWriter::try_new(object_store_writer,
self.schema().clone(), req.props)
- .context("create arrow writer")?;
+ let mut writer = AsyncArrowWriter::try_new(
+ object_store_writer,
+ self.schema().clone(),
+ Some(self.write_props.clone()),
+ )
+ .context("create arrow writer")?;
// sort record batch
let mut batches = self.sort_batch(req.batch).await?;
@@ -161,9 +169,10 @@ impl CloudObjectStorage {
fn build_sort_exprs(&self) -> Result<LexOrdering> {
let sort_exprs = (0..self.num_primary_key)
- .collect::<Vec<_>>()
- .iter()
- .map(|i| ident(self.schema().field(*i).name()).sort(true, true))
+ .map(|i| {
+ ident(self.schema().field(i).name())
+ .sort(true /* asc */, true /* nulls_first */)
+ })
.collect::<Vec<_>>();
let sort_exprs =
create_physical_sort_exprs(&sort_exprs, &self.df_schema,
&ExecutionProps::default())
@@ -184,6 +193,48 @@ impl CloudObjectStorage {
execute_stream(physical_plan, ctx.task_ctx()).context("execute
sort physical plan")?;
Ok(res)
}
+
+ fn build_write_props(write_options: WriteOptions, num_primary_key: usize)
-> WriterProperties {
+ let sorting_columns = write_options.enable_sorting_columns.then(|| {
+ (0..num_primary_key)
+ .map(|i| {
+ SortingColumn::new(i as i32, false /* desc */, true /*
nulls_first */)
+ })
+ .collect::<Vec<_>>()
+ });
+
+ let mut builder = WriterProperties::builder()
+ .set_max_row_group_size(write_options.max_row_group_size)
+ .set_write_batch_size(write_options.write_bacth_size)
+ .set_sorting_columns(sorting_columns)
+ .set_dictionary_enabled(write_options.enable_dict)
+ .set_bloom_filter_enabled(write_options.enable_bloom_filter)
+ .set_encoding(write_options.encoding)
+ .set_compression(write_options.compression);
+
+ if write_options.column_options.is_none() {
+ return builder.build();
+ }
+
+ for (col_name, col_opt) in write_options.column_options.unwrap() {
+ let col_path = ColumnPath::new(vec![col_name.to_string()]);
+ if let Some(enable_dict) = col_opt.enable_dict {
+ builder =
builder.set_column_dictionary_enabled(col_path.clone(), enable_dict);
+ }
+ if let Some(enable_bloom_filter) = col_opt.enable_bloom_filter {
+ builder =
+ builder.set_column_bloom_filter_enabled(col_path.clone(),
enable_bloom_filter);
+ }
+ if let Some(encoding) = col_opt.encoding {
+ builder = builder.set_column_encoding(col_path.clone(),
encoding);
+ }
+ if let Some(compression) = col_opt.compression {
+ builder = builder.set_column_compression(col_path,
compression);
+ }
+ }
+
+ builder.build()
+ }
}
#[async_trait]
@@ -286,10 +337,16 @@ mod tests {
]));
let store = Arc::new(LocalFileSystem::new());
- let storage =
- CloudObjectStorage::try_new("/tmp/storage".to_string(), store,
schema.clone(), 1, 1)
- .await
- .unwrap();
+ let storage = CloudObjectStorage::try_new(
+ "/tmp/storage".to_string(),
+ store,
+ schema.clone(),
+ 1,
+ 1,
+ WriteOptions::default(),
+ )
+ .await
+ .unwrap();
let batch = RecordBatch::try_new(
schema.clone(),
diff --git a/horaedb/metric_engine/src/types.rs
b/horaedb/metric_engine/src/types.rs
index e6b518a5..7bc9da18 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -16,11 +16,13 @@
// under the License.
use std::{
+ collections::HashMap,
ops::{Add, Deref, Range},
sync::Arc,
};
use object_store::ObjectStore;
+use parquet::basic::{Compression, Encoding, ZstdLevel};
use crate::sst::FileId;
@@ -95,3 +97,38 @@ pub struct WriteResult {
pub id: FileId,
pub size: usize,
}
+
+pub struct ColumnOptions {
+ pub enable_dict: Option<bool>,
+ pub enable_bloom_filter: Option<bool>,
+ pub encoding: Option<Encoding>,
+ pub compression: Option<Compression>,
+}
+
+pub struct WriteOptions {
+ pub max_row_group_size: usize,
+ pub write_bacth_size: usize,
+ pub enable_sorting_columns: bool,
+ // use to set column props with default value
+ pub enable_dict: bool,
+ pub enable_bloom_filter: bool,
+ pub encoding: Encoding,
+ pub compression: Compression,
+ // use to set column props with column name
+ pub column_options: Option<HashMap<String, ColumnOptions>>,
+}
+
+impl Default for WriteOptions {
+ fn default() -> Self {
+ Self {
+ max_row_group_size: 8192,
+ write_bacth_size: 1024,
+ enable_sorting_columns: true,
+ enable_dict: false,
+ enable_bloom_filter: false,
+ encoding: Encoding::PLAIN,
+ compression: Compression::ZSTD(ZstdLevel::default()),
+ column_options: None,
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]