alamb commented on code in PR #7244:
URL: https://github.com/apache/arrow-datafusion/pull/7244#discussion_r1288935932
##########
datafusion/common/src/config.rs:
##########
@@ -270,7 +270,48 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of
evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
+
+ // The following map to parquet::file::properties::WriterProperties
+
+ /// Sets best effort maximum size of data page in bytes
+ pub data_pagesize_limit: usize, default = 1024 * 1024
Review Comment:
It is an interesting question if we want these settings to be session level
(as this change proposes) or if they should be per-sql level 🤔
I suppose having a session level default would make sense and if we want to
add per statement overrides (like `COPY TO <file> AS PARQUET
(DATA_PAGE_ROW_COUNT_LIMIT 100000)`) we can always do that afterwards as well.
##########
datafusion/common/src/config.rs:
##########
@@ -270,7 +270,48 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of
evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
+
+ // The following map to parquet::file::properties::WriterProperties
+
+ /// Sets best effort maximum size of data page in bytes
+ pub data_pagesize_limit: usize, default = 1024 * 1024
+
+ /// Sets best effort maximum number of rows in data page
+ pub data_page_row_count_limit: usize, default = usize::MAX
+
+ /// Sets best effort maximum dictionary page size, in bytes
+ pub dictionary_page_size_limit: usize, default = 1024 * 1024
+
+ /// Sets maximum number of rows in a row group
+ pub max_row_group_size: usize, default = 1024 * 1024
+
+ /// Sets "created by" property
+ pub created_by: String, default = concat!("parquet-rs version ",
env!("CARGO_PKG_VERSION")).into()
+
+ pub compression: Option<String>, default = None
+
+ /// Sets default encoding for any column
+ pub encoding: Option<String>, default = None
+
+ /// Sets if dictionary encoding is enabled
+ pub dictionary_enabled: Option<bool>, default = None
+
+ /// Sets if statistics are enabled for any column
+ pub statistics_enabled: Option<String>, default = None
+
+ /// Sets max statistics size for any column
+ pub max_statistics_size: Option<usize>, default = None
+
+ /// Sets if bloom filter is enabled for any column
+ pub bloom_filter_enabled: Option<bool>, default = None
}
+ // TODO macro not working with Option<f64> or Option<u64>
Review Comment:
👍 that would be cool to clean up if possible prior to a real PR -- let me
know if you need some help
##########
datafusion/core/src/datasource/file_format/options.rs:
##########
@@ -250,6 +266,18 @@ impl<'a> ParquetReadOptions<'a> {
self.table_partition_cols = table_partition_cols;
self
}
+
+ /// Configure if file has known sort order
+ pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
Review Comment:
I believe this feature was requested by @bmmeijers in
https://github.com/apache/arrow-datafusion/issues/7036
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -543,6 +574,172 @@ async fn fetch_statistics(
Ok(statistics)
}
+/// Implements [`DataSink`] for writing to a parquet file.
+struct ParquetSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+}
+
+impl Debug for ParquetSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetSink").finish()
+ }
+}
+
+impl DisplayAs for ParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "ParquetSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+impl ParquetSink {
+ fn new(config: FileSinkConfig) -> Self {
+ Self { config }
+ }
+
+ /// Builds a parquet WriterProperties struct, setting options as
appropriate from TaskContext options
+ fn parquet_writer_props_from_context(
+ &self,
+ context: &Arc<TaskContext>,
+ ) -> WriterProperties {
+ let parquet_context =
&context.session_config().options().execution.parquet;
+ let mut builder = WriterProperties::builder()
+ .set_created_by(parquet_context.created_by.clone())
+
.set_data_page_row_count_limit(parquet_context.data_page_row_count_limit)
+ .set_data_page_size_limit(parquet_context.data_pagesize_limit);
Review Comment:
it feels like there are some other properties to copy across to (like
created by, etc)
##########
datafusion/common/src/config.rs:
##########
@@ -270,7 +270,48 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of
evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
+
+ // The following map to parquet::file::properties::WriterProperties
+
+ /// Sets best effort maximum size of data page in bytes
+ pub data_pagesize_limit: usize, default = 1024 * 1024
Review Comment:
It is an interesting question if we want these settings to be session level
(as this change proposes) or if they should be per-sql level 🤔
I suppose having a session level default would make sense and if we want to
add per statement overrides (like `COPY TO <file> AS PARQUET
(DATA_PAGE_ROW_COUNT_LIMIT 100000)`) we can always do that afterwards as well.
##########
datafusion/common/src/config.rs:
##########
@@ -270,7 +270,48 @@ config_namespace! {
/// will be reordered heuristically to minimize the cost of
evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
+
+ // The following map to parquet::file::properties::WriterProperties
+
+ /// Sets best effort maximum size of data page in bytes
+ pub data_pagesize_limit: usize, default = 1024 * 1024
+
+ /// Sets best effort maximum number of rows in data page
+ pub data_page_row_count_limit: usize, default = usize::MAX
+
+ /// Sets best effort maximum dictionary page size, in bytes
+ pub dictionary_page_size_limit: usize, default = 1024 * 1024
+
+ /// Sets maximum number of rows in a row group
+ pub max_row_group_size: usize, default = 1024 * 1024
+
+ /// Sets "created by" property
+ pub created_by: String, default = concat!("parquet-rs version ",
env!("CARGO_PKG_VERSION")).into()
+
+ pub compression: Option<String>, default = None
+
+ /// Sets default encoding for any column
+ pub encoding: Option<String>, default = None
+
+ /// Sets if dictionary encoding is enabled
+ pub dictionary_enabled: Option<bool>, default = None
+
+ /// Sets if statistics are enabled for any column
+ pub statistics_enabled: Option<String>, default = None
+
+ /// Sets max statistics size for any column
+ pub max_statistics_size: Option<usize>, default = None
+
+ /// Sets if bloom filter is enabled for any column
+ pub bloom_filter_enabled: Option<bool>, default = None
}
+ // TODO macro not working with Option<f64> or Option<u64>
Review Comment:
👍 that would be cool to clean up if possible prior to a real PR -- let me
know if you need some help
##########
datafusion/core/src/datasource/file_format/options.rs:
##########
@@ -214,6 +214,13 @@ pub struct ParquetReadOptions<'a> {
///
/// If None specified, uses value in SessionConfig
pub skip_metadata: Option<bool>,
+ /// An optional schema representing the parquet files. If None, parquet
reader will try to infer it
+ /// based on data in file.
+ pub schema: Option<&'a Schema>,
+ /// Indicates how the file is sorted
+ pub file_sort_order: Vec<Vec<Expr>>,
Review Comment:
👍 ths is very nice
##########
datafusion/core/src/datasource/file_format/options.rs:
##########
@@ -214,6 +214,13 @@ pub struct ParquetReadOptions<'a> {
///
/// If None specified, uses value in SessionConfig
pub skip_metadata: Option<bool>,
+ /// An optional schema representing the parquet files. If None, parquet
reader will try to infer it
+ /// based on data in file.
+ pub schema: Option<&'a Schema>,
+ /// Indicates how the file is sorted
+ pub file_sort_order: Vec<Vec<Expr>>,
Review Comment:
👍 ths is very nice
##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
Review Comment:
😍
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -543,6 +574,172 @@ async fn fetch_statistics(
Ok(statistics)
}
+/// Implements [`DataSink`] for writing to a parquet file.
+struct ParquetSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+}
+
+impl Debug for ParquetSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetSink").finish()
+ }
+}
+
+impl DisplayAs for ParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "ParquetSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+impl ParquetSink {
+ fn new(config: FileSinkConfig) -> Self {
+ Self { config }
+ }
+
+ /// Builds a parquet WriterProperties struct, setting options as
appropriate from TaskContext options
+ fn parquet_writer_props_from_context(
+ &self,
+ context: &Arc<TaskContext>,
+ ) -> WriterProperties {
+ let parquet_context =
&context.session_config().options().execution.parquet;
+ let mut builder = WriterProperties::builder()
+ .set_created_by(parquet_context.created_by.clone())
+
.set_data_page_row_count_limit(parquet_context.data_page_row_count_limit)
+ .set_data_page_size_limit(parquet_context.data_pagesize_limit);
+
+ if parquet_context.bloom_filter_enabled.is_some() {
+ builder = builder
+
.set_bloom_filter_enabled(parquet_context.bloom_filter_enabled.unwrap())
+ }
+
+ // TODO
+ //.set_bloom_filter_fpp(parquet_context.bloom_filter_fpp)
+ // TODO
+ //.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv)
+
//.set_compression(parquet::basic::Compression::try_from(parquet_context.compression))
+ builder.build()
+ }
+
+ // Create a write for parquet files
+ async fn create_writer(
+ &self,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+ parquet_props: WriterProperties,
+ ) -> Result<
+ AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
+ > {
+ let object = &file_meta.object_meta;
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ plan_err!(
+ "Appending to Parquet files is not supported by the file
format!"
+ )
+ }
+ FileWriterMode::Put => Err(DataFusionError::NotImplemented(
+ "FileWriterMode::Put is not implemented for
ParquetSink".into(),
+ )),
+ FileWriterMode::PutMultipart => {
+ let (_, multipart_writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AsyncArrowWriter::try_new(
+ multipart_writer,
+ self.config.output_schema.clone(),
+ 10485760,
+ Some(parquet_props),
+ )?;
+ Ok(writer)
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for ParquetSink {
+ async fn write_all(
+ &self,
+ mut data: Vec<SendableRecordBatchStream>,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = data.len();
+ let parquet_props = self.parquet_writer_props_from_context(context);
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ // Construct writer for each file group
+ let mut writers = vec![];
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ return plan_err!(
+ "Parquet format does not support appending to existing
file!"
+ )
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for ParquetSink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
+ let base_path = &self.config.table_paths[0];
+ // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
+ let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let file_path = base_path
+ .prefix()
+ .child(format!("/{}_{}.parquet", write_id, part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(
+ object_meta.into(),
+ object_store.clone(),
+ parquet_props.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ }
+
+ let mut row_count = 0;
+ // TODO parallelize serialization accross partitions and batches
within partitions
Review Comment:
I am digging the fact all the writers look quite similar -- we'll be able to
sort out parallelizing the writes in one place and use it for all the formats
❤️
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -543,6 +574,172 @@ async fn fetch_statistics(
Ok(statistics)
}
+/// Implements [`DataSink`] for writing to a parquet file.
+struct ParquetSink {
+ /// Config options for writing data
+ config: FileSinkConfig,
+}
+
+impl Debug for ParquetSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("ParquetSink").finish()
+ }
+}
+
+impl DisplayAs for ParquetSink {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) ->
fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ write!(
+ f,
+ "ParquetSink(writer_mode={:?}, file_groups=",
+ self.config.writer_mode
+ )?;
+ FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+ write!(f, ")")
+ }
+ }
+ }
+}
+
+impl ParquetSink {
+ fn new(config: FileSinkConfig) -> Self {
+ Self { config }
+ }
+
+ /// Builds a parquet WriterProperties struct, setting options as
appropriate from TaskContext options
+ fn parquet_writer_props_from_context(
+ &self,
+ context: &Arc<TaskContext>,
+ ) -> WriterProperties {
+ let parquet_context =
&context.session_config().options().execution.parquet;
+ let mut builder = WriterProperties::builder()
+ .set_created_by(parquet_context.created_by.clone())
+
.set_data_page_row_count_limit(parquet_context.data_page_row_count_limit)
+ .set_data_page_size_limit(parquet_context.data_pagesize_limit);
+
+ if parquet_context.bloom_filter_enabled.is_some() {
+ builder = builder
+
.set_bloom_filter_enabled(parquet_context.bloom_filter_enabled.unwrap())
+ }
+
+ // TODO
+ //.set_bloom_filter_fpp(parquet_context.bloom_filter_fpp)
+ // TODO
+ //.set_bloom_filter_ndv(parquet_context.bloom_filter_ndv)
+
//.set_compression(parquet::basic::Compression::try_from(parquet_context.compression))
+ builder.build()
+ }
+
+ // Create a write for parquet files
+ async fn create_writer(
+ &self,
+ file_meta: FileMeta,
+ object_store: Arc<dyn ObjectStore>,
+ parquet_props: WriterProperties,
+ ) -> Result<
+ AsyncArrowWriter<Box<dyn tokio::io::AsyncWrite + std::marker::Send +
Unpin>>,
+ > {
+ let object = &file_meta.object_meta;
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ plan_err!(
+ "Appending to Parquet files is not supported by the file
format!"
+ )
+ }
+ FileWriterMode::Put => Err(DataFusionError::NotImplemented(
+ "FileWriterMode::Put is not implemented for
ParquetSink".into(),
+ )),
+ FileWriterMode::PutMultipart => {
+ let (_, multipart_writer) = object_store
+ .put_multipart(&object.location)
+ .await
+ .map_err(DataFusionError::ObjectStore)?;
+ let writer = AsyncArrowWriter::try_new(
+ multipart_writer,
+ self.config.output_schema.clone(),
+ 10485760,
+ Some(parquet_props),
+ )?;
+ Ok(writer)
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl DataSink for ParquetSink {
+ async fn write_all(
+ &self,
+ mut data: Vec<SendableRecordBatchStream>,
+ context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let num_partitions = data.len();
+ let parquet_props = self.parquet_writer_props_from_context(context);
+
+ let object_store = context
+ .runtime_env()
+ .object_store(&self.config.object_store_url)?;
+
+ // Construct writer for each file group
+ let mut writers = vec![];
+ match self.config.writer_mode {
+ FileWriterMode::Append => {
+ return plan_err!(
+ "Parquet format does not support appending to existing
file!"
+ )
+ }
+ FileWriterMode::Put => {
+ return Err(DataFusionError::NotImplemented(
+ "Put Mode is not implemented for ParquetSink yet".into(),
+ ))
+ }
+ FileWriterMode::PutMultipart => {
+ // Currently assuming only 1 partition path (i.e. not
hive-style partitioning on a column)
+ let base_path = &self.config.table_paths[0];
+ // Uniquely identify this batch of files with a random string,
to prevent collisions overwriting files
+ let write_id = Alphanumeric.sample_string(&mut
rand::thread_rng(), 16);
+ for part_idx in 0..num_partitions {
+ let file_path = base_path
+ .prefix()
+ .child(format!("/{}_{}.parquet", write_id, part_idx));
+ let object_meta = ObjectMeta {
+ location: file_path,
+ last_modified: chrono::offset::Utc::now(),
+ size: 0,
+ e_tag: None,
+ };
+ let writer = self
+ .create_writer(
+ object_meta.into(),
+ object_store.clone(),
+ parquet_props.clone(),
+ )
+ .await?;
+ writers.push(writer);
+ }
+ }
+ }
+
+ let mut row_count = 0;
+ // TODO parallelize serialization accross partitions and batches
within partitions
Review Comment:
I am digging the fact all the writers look quite similar -- we'll be able to
sort out parallelizing the writes in one place and use it for all the formats
❤️
##########
datafusion/core/src/datasource/file_format/write.rs:
##########
@@ -0,0 +1,310 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Module containing helper methods/traits related to enabling
Review Comment:
😍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]