Dandandan commented on a change in pull request #9029:
URL: https://github.com/apache/arrow/pull/9029#discussion_r549561500
##########
File path: rust/datafusion/src/physical_plan/parquet.rs
##########
@@ -104,35 +105,41 @@ impl ParquetExec {
filenames: &[&str],
projection: Option<Vec<usize>>,
batch_size: usize,
+ max_concurrency: usize,
) -> Result<Self> {
// build a list of Parquet partitions with statistics and gather all
unique schemas
// used in this data set
let mut schemas: Vec<Schema> = vec![];
- let mut partitions = Vec::with_capacity(filenames.len());
- for filename in filenames {
- let file = File::open(filename)?;
- let file_reader = Arc::new(SerializedFileReader::new(file)?);
- let mut arrow_reader = ParquetFileArrowReader::new(file_reader);
- let meta_data = arrow_reader.get_metadata();
- // collect all the unique schemas in this data set
- let schema = arrow_reader.get_schema()?;
- if schemas.is_empty() || schema != schemas[0] {
- schemas.push(schema);
- }
- let mut num_rows = 0;
- let mut total_byte_size = 0;
- for i in 0..meta_data.num_row_groups() {
- let row_group_meta = meta_data.row_group(i);
- num_rows += row_group_meta.num_rows();
- total_byte_size += row_group_meta.total_byte_size();
+ let mut partitions = Vec::with_capacity(max_concurrency);
+ let filenames: Vec<String> = filenames.iter().map(|s|
s.to_string()).collect();
+ let chunks = split_files(&filenames, max_concurrency);
Review comment:
Makes sense, the change to limit the nr. of threads is useful. I think
the idea of @alamb to send it over a channel makes sense to me.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]