This is an automated email from the ASF dual-hosted git repository.

baojinri pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/horaedb.git


The following commit(s) were added to refs/heads/main by this push:
     new 9931950a refactor: make writing properties independently (#1588)
9931950a is described below

commit 9931950a14845a4b84b6623d512763c64f46ac46
Author: 鲍金日 <[email protected]>
AuthorDate: Wed Nov 6 19:08:26 2024 +0800

    refactor: make writing properties independently (#1588)
    
    ## Rationale
    We should custom how parquet write handle its metadata.
    
    ## Detailed Changes
    
    
    ## Test Plan
---
 horaedb/metric_engine/src/storage.rs | 81 ++++++++++++++++++++++++++++++------
 horaedb/metric_engine/src/types.rs   | 37 ++++++++++++++++
 2 files changed, 106 insertions(+), 12 deletions(-)

diff --git a/horaedb/metric_engine/src/storage.rs 
b/horaedb/metric_engine/src/storage.rs
index b64e7a8d..1419503a 100644
--- a/horaedb/metric_engine/src/storage.rs
+++ b/horaedb/metric_engine/src/storage.rs
@@ -42,19 +42,20 @@ use object_store::path::Path;
 use parquet::{
     arrow::{async_writer::ParquetObjectWriter, AsyncArrowWriter},
     file::properties::WriterProperties,
+    format::SortingColumn,
+    schema::types::ColumnPath,
 };
 
 use crate::{
     manifest::Manifest,
     read::DefaultParquetFileReaderFactory,
     sst::{allocate_id, FileId, FileMeta},
-    types::{ObjectStoreRef, TimeRange, Timestamp, WriteResult},
+    types::{ObjectStoreRef, TimeRange, Timestamp, WriteOptions, WriteResult},
     Result,
 };
 
 pub struct WriteRequest {
     batch: RecordBatch,
-    props: Option<WriterProperties>,
 }
 
 pub struct ScanRequest {
@@ -90,6 +91,7 @@ pub struct CloudObjectStorage {
     manifest: Manifest,
 
     df_schema: DFSchema,
+    write_props: WriterProperties,
 }
 
 /// It will organize the data in the following way:
@@ -109,11 +111,13 @@ impl CloudObjectStorage {
         arrow_schema: SchemaRef,
         num_primary_key: usize,
         timestamp_index: usize,
+        write_options: WriteOptions,
     ) -> Result<Self> {
         let manifest_prefix = crate::manifest::PREFIX_PATH;
         let manifest =
             Manifest::try_new(format!("{root_path}/{manifest_prefix}"), 
store.clone()).await?;
         let df_schema = 
DFSchema::try_from(arrow_schema.clone()).context("build DFSchema")?;
+        let write_props = Self::build_write_props(write_options, 
num_primary_key);
         Ok(Self {
             path: root_path,
             num_primary_key,
@@ -122,6 +126,7 @@ impl CloudObjectStorage {
             arrow_schema,
             manifest,
             df_schema,
+            write_props,
         })
     }
 
@@ -136,9 +141,12 @@ impl CloudObjectStorage {
         let file_path = self.build_file_path(file_id);
         let file_path = Path::from(file_path);
         let object_store_writer = ParquetObjectWriter::new(self.store.clone(), 
file_path.clone());
-        let mut writer =
-            AsyncArrowWriter::try_new(object_store_writer, 
self.schema().clone(), req.props)
-                .context("create arrow writer")?;
+        let mut writer = AsyncArrowWriter::try_new(
+            object_store_writer,
+            self.schema().clone(),
+            Some(self.write_props.clone()),
+        )
+        .context("create arrow writer")?;
 
         // sort record batch
         let mut batches = self.sort_batch(req.batch).await?;
@@ -161,9 +169,10 @@ impl CloudObjectStorage {
 
     fn build_sort_exprs(&self) -> Result<LexOrdering> {
         let sort_exprs = (0..self.num_primary_key)
-            .collect::<Vec<_>>()
-            .iter()
-            .map(|i| ident(self.schema().field(*i).name()).sort(true, true))
+            .map(|i| {
+                ident(self.schema().field(i).name())
+                    .sort(true /* asc */, true /* nulls_first */)
+            })
             .collect::<Vec<_>>();
         let sort_exprs =
             create_physical_sort_exprs(&sort_exprs, &self.df_schema, 
&ExecutionProps::default())
@@ -184,6 +193,48 @@ impl CloudObjectStorage {
             execute_stream(physical_plan, ctx.task_ctx()).context("execute 
sort physical plan")?;
         Ok(res)
     }
+
+    fn build_write_props(write_options: WriteOptions, num_primary_key: usize) 
-> WriterProperties {
+        let sorting_columns = write_options.enable_sorting_columns.then(|| {
+            (0..num_primary_key)
+                .map(|i| {
+                    SortingColumn::new(i as i32, false /* desc */, true /* 
nulls_first */)
+                })
+                .collect::<Vec<_>>()
+        });
+
+        let mut builder = WriterProperties::builder()
+            .set_max_row_group_size(write_options.max_row_group_size)
+            .set_write_batch_size(write_options.write_bacth_size)
+            .set_sorting_columns(sorting_columns)
+            .set_dictionary_enabled(write_options.enable_dict)
+            .set_bloom_filter_enabled(write_options.enable_bloom_filter)
+            .set_encoding(write_options.encoding)
+            .set_compression(write_options.compression);
+
+        if write_options.column_options.is_none() {
+            return builder.build();
+        }
+
+        for (col_name, col_opt) in write_options.column_options.unwrap() {
+            let col_path = ColumnPath::new(vec![col_name.to_string()]);
+            if let Some(enable_dict) = col_opt.enable_dict {
+                builder = 
builder.set_column_dictionary_enabled(col_path.clone(), enable_dict);
+            }
+            if let Some(enable_bloom_filter) = col_opt.enable_bloom_filter {
+                builder =
+                    builder.set_column_bloom_filter_enabled(col_path.clone(), 
enable_bloom_filter);
+            }
+            if let Some(encoding) = col_opt.encoding {
+                builder = builder.set_column_encoding(col_path.clone(), 
encoding);
+            }
+            if let Some(compression) = col_opt.compression {
+                builder = builder.set_column_compression(col_path, 
compression);
+            }
+        }
+
+        builder.build()
+    }
 }
 
 #[async_trait]
@@ -286,10 +337,16 @@ mod tests {
         ]));
 
         let store = Arc::new(LocalFileSystem::new());
-        let storage =
-            CloudObjectStorage::try_new("/tmp/storage".to_string(), store, 
schema.clone(), 1, 1)
-                .await
-                .unwrap();
+        let storage = CloudObjectStorage::try_new(
+            "/tmp/storage".to_string(),
+            store,
+            schema.clone(),
+            1,
+            1,
+            WriteOptions::default(),
+        )
+        .await
+        .unwrap();
 
         let batch = RecordBatch::try_new(
             schema.clone(),
diff --git a/horaedb/metric_engine/src/types.rs 
b/horaedb/metric_engine/src/types.rs
index e6b518a5..7bc9da18 100644
--- a/horaedb/metric_engine/src/types.rs
+++ b/horaedb/metric_engine/src/types.rs
@@ -16,11 +16,13 @@
 // under the License.
 
 use std::{
+    collections::HashMap,
     ops::{Add, Deref, Range},
     sync::Arc,
 };
 
 use object_store::ObjectStore;
+use parquet::basic::{Compression, Encoding, ZstdLevel};
 
 use crate::sst::FileId;
 
@@ -95,3 +97,38 @@ pub struct WriteResult {
     pub id: FileId,
     pub size: usize,
 }
+
+pub struct ColumnOptions {
+    pub enable_dict: Option<bool>,
+    pub enable_bloom_filter: Option<bool>,
+    pub encoding: Option<Encoding>,
+    pub compression: Option<Compression>,
+}
+
+pub struct WriteOptions {
+    pub max_row_group_size: usize,
+    pub write_bacth_size: usize,
+    pub enable_sorting_columns: bool,
+    // use to set column props with default value
+    pub enable_dict: bool,
+    pub enable_bloom_filter: bool,
+    pub encoding: Encoding,
+    pub compression: Compression,
+    // use to set column props with column name
+    pub column_options: Option<HashMap<String, ColumnOptions>>,
+}
+
+impl Default for WriteOptions {
+    fn default() -> Self {
+        Self {
+            max_row_group_size: 8192,
+            write_bacth_size: 1024,
+            enable_sorting_columns: true,
+            enable_dict: false,
+            enable_bloom_filter: false,
+            encoding: Encoding::PLAIN,
+            compression: Compression::ZSTD(ZstdLevel::default()),
+            column_options: None,
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to