This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ce3d446be5 Disable parallel reading for gziped ndjson file (#9799)
ce3d446be5 is described below
commit ce3d446be5f6a11664e100fc47940e6ecb5418d3
Author: Lordworms <[email protected]>
AuthorDate: Thu Mar 28 09:47:37 2024 -0500
Disable parallel reading for gziped ndjson file (#9799)
* for debug
* disable paralle reading for gziped ndjson file
* directly return None
* delete .gz
* fix clippy
---
.../core/src/datasource/physical_plan/json.rs | 52 +++++++++++++++++++++-
1 file changed, 51 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs
b/datafusion/core/src/datasource/physical_plan/json.rs
index 194a4a91c3..c876b3d078 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -150,6 +150,9 @@ impl ExecutionPlan for NdJsonExec {
target_partitions: usize,
config: &datafusion_common::config::ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ if self.file_compression_type == FileCompressionType::GZIP {
+ return Ok(None);
+ }
let repartition_file_min_size =
config.optimizer.repartition_file_min_size;
let preserve_order_within_groups =
self.properties().output_ordering().is_some();
let file_groups = &self.base_config.file_groups;
@@ -392,11 +395,14 @@ mod tests {
use arrow::datatypes::{Field, SchemaBuilder};
use datafusion_common::cast::{as_int32_array, as_int64_array,
as_string_array};
use datafusion_common::FileType;
-
+ use flate2::write::GzEncoder;
+ use flate2::Compression;
use futures::StreamExt;
use object_store::chunked::ChunkedStore;
use object_store::local::LocalFileSystem;
use rstest::*;
+ use std::fs::File;
+ use std::io;
use tempfile::TempDir;
use url::Url;
@@ -884,4 +890,48 @@ mod tests {
Ok(())
}
+ fn compress_file(path: &str, output_path: &str) -> io::Result<()> {
+ let input_file = File::open(path)?;
+ let mut reader = BufReader::new(input_file);
+
+ let output_file = File::create(output_path)?;
+ let writer = std::io::BufWriter::new(output_file);
+
+ let mut encoder = GzEncoder::new(writer, Compression::default());
+ io::copy(&mut reader, &mut encoder)?;
+
+ encoder.finish()?;
+ Ok(())
+ }
+ #[tokio::test]
+ async fn test_disable_parallel_for_json_gz() -> Result<()> {
+ let config = SessionConfig::new()
+ .with_repartition_file_scans(true)
+ .with_repartition_file_min_size(0)
+ .with_target_partitions(4);
+ let ctx = SessionContext::new_with_config(config);
+ let path = format!("{TEST_DATA_BASE}/1.json");
+ let compressed_path = format!("{}.gz", &path);
+ compress_file(&path, &compressed_path)?;
+ let read_option = NdJsonReadOptions::default()
+ .file_compression_type(FileCompressionType::GZIP)
+ .file_extension("gz");
+ let df = ctx.read_json(compressed_path.clone(), read_option).await?;
+ let res = df.collect().await;
+ fs::remove_file(&compressed_path)?;
+ assert_batches_eq!(
+ &[
+ "+-----+------------------+---------------+------+",
+ "| a | b | c | d |",
+ "+-----+------------------+---------------+------+",
+ "| 1 | [2.0, 1.3, -6.1] | [false, true] | 4 |",
+ "| -10 | [2.0, 1.3, -6.1] | [true, true] | 4 |",
+ "| 2 | [2.0, , -6.1] | [false, ] | text |",
+ "| | | | |",
+ "+-----+------------------+---------------+------+",
+ ],
+ &res?
+ );
+ Ok(())
+ }
}