alamb commented on code in PR #2985:
URL: https://github.com/apache/arrow-datafusion/pull/2985#discussion_r933781120
##########
datafusion/core/tests/sql/parquet.rs:
##########
@@ -173,58 +173,6 @@ async fn parquet_list_columns() {
assert_eq!(result.value(3), "xyz");
}
-#[tokio::test]
-async fn schema_merge_ignores_metadata() {
- // Create two parquet files in same table with same schema but different
metadata
- let tmp_dir = TempDir::new().unwrap();
- let table_dir = tmp_dir.path().join("parquet_test");
- let table_path = Path::new(&table_dir);
-
- let mut non_empty_metadata: HashMap<String, String> = HashMap::new();
- non_empty_metadata.insert("testing".to_string(), "metadata".to_string());
-
- let fields = vec![
- Field::new("id", DataType::Int32, true),
- Field::new("name", DataType::Utf8, true),
- ];
- let schemas = vec![
- Arc::new(Schema::new_with_metadata(
- fields.clone(),
- non_empty_metadata.clone(),
- )),
- Arc::new(Schema::new(fields.clone())),
- ];
-
- if let Ok(()) = fs::create_dir(table_path) {
- for (i, schema) in schemas.iter().enumerate().take(2) {
- let filename = format!("part-{}.parquet", i);
- let path = table_path.join(&filename);
- let file = fs::File::create(path).unwrap();
- let mut writer = ArrowWriter::try_new(file, schema.clone(),
None).unwrap();
-
- // create mock record batch
- let ids = Arc::new(Int32Array::from_slice(&[i as i32]));
- let names = Arc::new(StringArray::from_slice(&["test"]));
- let rec_batch =
- RecordBatch::try_new(schema.clone(), vec![ids,
names]).unwrap();
-
- writer.write(&rec_batch).unwrap();
- writer.close().unwrap();
- }
- }
-
- // Read the parquet files into a dataframe to confirm results
- // (no errors)
- let ctx = SessionContext::new();
- let df = ctx
- .read_parquet(table_dir.to_str().unwrap(),
ParquetReadOptions::default())
- .await
- .unwrap();
- let result = df.collect().await.unwrap();
-
- assert_eq!(result[0].schema().metadata(), result[1].schema().metadata());
Review Comment:
Note that this does not validate the contents of the metadata, just that it
is the same. Turns out the metadata is actually empty....
##########
datafusion/core/src/physical_plan/file_format/mod.rs:
##########
@@ -130,7 +130,9 @@ impl FileScanConfig {
column_statistics: Some(table_cols_stats),
};
- let table_schema = Arc::new(Schema::new(table_fields));
+ let table_schema = Arc::new(
+
Schema::new(table_fields).with_metadata(self.file_schema.metadata().clone()),
Review Comment:
Here is the change that is required to preserve the metadata in the merged
schema
##########
datafusion/core/tests/sql/parquet.rs:
##########
@@ -173,58 +173,6 @@ async fn parquet_list_columns() {
assert_eq!(result.value(3), "xyz");
}
-#[tokio::test]
-async fn schema_merge_ignores_metadata() {
- // Create two parquet files in same table with same schema but different
metadata
- let tmp_dir = TempDir::new().unwrap();
- let table_dir = tmp_dir.path().join("parquet_test");
- let table_path = Path::new(&table_dir);
-
- let mut non_empty_metadata: HashMap<String, String> = HashMap::new();
- non_empty_metadata.insert("testing".to_string(), "metadata".to_string());
-
- let fields = vec![
- Field::new("id", DataType::Int32, true),
- Field::new("name", DataType::Utf8, true),
- ];
- let schemas = vec![
- Arc::new(Schema::new_with_metadata(
- fields.clone(),
- non_empty_metadata.clone(),
Review Comment:
The schemas in this test have different (but compatible) metadata so merging
works.
##########
datafusion/core/src/execution/options.rs:
##########
@@ -142,17 +142,23 @@ pub struct ParquetReadOptions<'a> {
pub file_extension: &'a str,
/// Partition Columns
pub table_partition_cols: Vec<String>,
- /// Should DataFusion parquet reader using the predicate to prune data,
+ /// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
pub parquet_pruning: bool,
+ /// Tell the parquet reader to ignore any Metadata that may be in
Review Comment:
The duplication of options on `ParquetReadOptions` and `ParquetFormat` is
non ideal, and I hope to fix it, but for this PR I just followed the existing
pattern
##########
datafusion/core/tests/sql/parquet_schema.rs:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for parquet schema handling
+use std::{
+ collections::{BTreeMap, HashMap},
+ fs,
+ path::Path,
+};
+
+use ::parquet::arrow::ArrowWriter;
+use tempfile::TempDir;
+
+use super::*;
+
+#[tokio::test]
+async fn schema_merge_ignores_metadata_by_default() {
+ // Create several parquet files in same directoty / table with
+ // same schema but different metadata
+ let tmp_dir = TempDir::new().unwrap();
+ let table_dir = tmp_dir.path().join("parquet_test");
+
+ let options = ParquetReadOptions::default();
+
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("name", DataType::Utf8, true);
+
+ let schemas = vec![
+ // schema level metadata
+ Schema::new(vec![f1.clone(),
f2.clone()]).with_metadata(make_meta("foo", "bar")),
Review Comment:
Lots of different metadata
##########
datafusion/core/tests/sql/parquet_schema.rs:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for parquet schema handling
+use std::{
+ collections::{BTreeMap, HashMap},
+ fs,
+ path::Path,
+};
+
+use ::parquet::arrow::ArrowWriter;
+use tempfile::TempDir;
+
+use super::*;
+
+#[tokio::test]
+async fn schema_merge_ignores_metadata_by_default() {
+ // Create several parquet files in same directoty / table with
+ // same schema but different metadata
+ let tmp_dir = TempDir::new().unwrap();
+ let table_dir = tmp_dir.path().join("parquet_test");
+
+ let options = ParquetReadOptions::default();
+
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("name", DataType::Utf8, true);
+
+ let schemas = vec![
+ // schema level metadata
+ Schema::new(vec![f1.clone(),
f2.clone()]).with_metadata(make_meta("foo", "bar")),
+ // schema different (incompatible) metadata
+ Schema::new(vec![f1.clone(),
f2.clone()]).with_metadata(make_meta("foo", "baz")),
+ // schema with no meta
+ Schema::new(vec![f1.clone(), f2.clone()]),
+ // field level metadata
+ Schema::new(vec![
+ f1.clone().with_metadata(make_b_meta("blarg", "bar")),
+ f2.clone(),
+ ]),
+ // incompatible field level metadata
+ Schema::new(vec![
+ f1.clone().with_metadata(make_b_meta("blarg", "baz")),
+ f2.clone(),
+ ]),
+ // schema with no meta
+ Schema::new(vec![f1, f2]),
+ ];
+ write_files(table_dir.as_path(), schemas);
+
+ // can be any order
+ let expected = vec![
+ "+----+------+",
+ "| id | name |",
+ "+----+------+",
+ "| 1 | test |",
+ "| 2 | test |",
+ "| 3 | test |",
+ "| 0 | test |",
+ "| 5 | test |",
+ "| 4 | test |",
+ "+----+------+",
+ ];
+
+ // Read the parquet files into a dataframe to confirm results
+ // (no errors)
+ let table_path = table_dir.to_str().unwrap().to_string();
+
+ let ctx = SessionContext::new();
+ let df = ctx
+ .read_parquet(&table_path, options.clone())
+ .await
+ .unwrap();
+ let actual = df.collect().await.unwrap();
+
+ assert_batches_sorted_eq!(expected, &actual);
+ assert_no_metadata(&actual);
+
+ // also validate it works via SQL interface as well
+ ctx.register_parquet("t", &table_path, options)
+ .await
+ .unwrap();
+
+ let actual = execute_to_batches(&ctx, "SELECT * from t").await;
+ assert_batches_sorted_eq!(expected, &actual);
+ assert_no_metadata(&actual);
+}
+
+#[tokio::test]
+async fn schema_merge_can_preserve_metadata() {
+ // Create several parquet files in same directoty / table with
+ // same schema but different metadata
+ let tmp_dir = TempDir::new().unwrap();
+ let table_dir = tmp_dir.path().join("parquet_test");
+
+ // explicitly disable schema clearing
+ let options = ParquetReadOptions::default().skip_metadata(false);
+
+ let f1 = Field::new("id", DataType::Int32, true);
+ let f2 = Field::new("name", DataType::Utf8, true);
+
+ let schemas = vec![
+ // schema level metadata
+ Schema::new(vec![f1.clone(),
f2.clone()]).with_metadata(make_meta("foo", "bar")),
+ // schema different (compatible) metadata
+ Schema::new(vec![f1.clone(),
f2.clone()]).with_metadata(make_meta("foo2", "baz")),
+ // schema with no meta
+ Schema::new(vec![f1.clone(), f2.clone()]),
+ ];
+ write_files(table_dir.as_path(), schemas);
+
+ // can be any order
+ let expected = vec![
+ "+----+------+",
+ "| id | name |",
+ "+----+------+",
+ "| 1 | test |",
+ "| 2 | test |",
+ "| 0 | test |",
+ "+----+------+",
+ ];
+
+ let mut expected_metadata = make_meta("foo", "bar");
+ expected_metadata.insert("foo2".into(), "baz".into());
+
+ // Read the parquet files into a dataframe to confirm results
+ // (no errors)
+ let table_path = table_dir.to_str().unwrap().to_string();
+
+ let ctx = SessionContext::new();
+ let df = ctx
+ .read_parquet(&table_path, options.clone())
+ .await
+ .unwrap();
+ let actual = df.collect().await.unwrap();
+
+ assert_batches_sorted_eq!(expected, &actual);
+ assert_metadata(&actual, &expected_metadata);
Review Comment:
this test now validates that the merged metadata is as expected
##########
datafusion/core/tests/sql/parquet.rs:
##########
@@ -173,58 +173,6 @@ async fn parquet_list_columns() {
assert_eq!(result.value(3), "xyz");
}
-#[tokio::test]
Review Comment:
I moved this to its own module `parquet_format` as it was getting somewhat
complicated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]