ethan-tyler commented on code in PR #19262:
URL: https://github.com/apache/datafusion/pull/19262#discussion_r2611158540
##########
datafusion/datasource-csv/src/source.rs:
##########
@@ -453,16 +453,17 @@ pub async fn plan_to_csv(
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
- let writer_buffer_size = task_ctx
- .session_config()
- .options()
- .execution
- .objectstore_writer_buffer_size;
+ let exec_options = &task_ctx.session_config().options().execution;
+ let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
- let filename = format!("{}/part-{i}.csv", parsed.prefix());
+ let filename = format!(
+ "{}/{}part-{i}.csv",
+ exec_options.partitioned_file_prefix_name,
+ parsed.prefix()
+ );
Review Comment:
The format arguments look swapped here. With an empty prefix (the default),
this produces `/<base>part-0.parquet`. The leading slash comes from the empty
first argument, and there's no separator before `part-`.
Should be:
```rust
let filename = format!(
"{}/{}part-{i}.parquet",
parsed.prefix(),
exec_options.partitioned_file_prefix_name
);
##########
datafusion/datasource-json/src/source.rs:
##########
@@ -275,16 +275,17 @@ pub async fn plan_to_json(
let parsed = ListingTableUrl::parse(path)?;
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
- let writer_buffer_size = task_ctx
- .session_config()
- .options()
- .execution
- .objectstore_writer_buffer_size;
+ let exec_options = &task_ctx.session_config().options().execution;
+ let writer_buffer_size = exec_options.objectstore_writer_buffer_size;
let mut join_set = JoinSet::new();
for i in 0..plan.output_partitioning().partition_count() {
let storeref = Arc::clone(&store);
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
- let filename = format!("{}/part-{i}.json", parsed.prefix());
+ let filename = format!(
+ "{}/{}part-{i}.json",
+ exec_options.partitioned_file_prefix_name,
+ parsed.prefix()
+ );
Review Comment:
Same fix needed here - swap parsed.prefix() and
exec_options.partitioned_file_prefix_name.
##########
datafusion/common/src/config.rs:
##########
@@ -649,6 +649,12 @@ config_namespace! {
/// # Default
/// `false` — ANSI SQL mode is disabled by default.
pub enable_ansi_mode: bool, default = false
+
+ /// Prefix to use when generating file name in multi file output
+ ///
+ /// When prefix is non-empty string, this prefix will be used to
generate file name as
+ /// `{partitioned_file_prefix_name}{datafusion generated suffix}`
Review Comment:
Might be worth adding this to docs/source/user-guide/configs.md so users can
discover it. The auto-generated config docs don't always surface these well.
##########
datafusion/core/tests/dataframe/mod.rs:
##########
@@ -6735,3 +6735,70 @@ async fn
test_duplicate_state_fields_for_dfschema_construct() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn write_partitioned_parquet_results_with_prefix() -> Result<()> {
+ // create partitioned input file and context
+ let tmp_dir = TempDir::new()?;
+
+ let mut config = SessionConfig::new();
+ config.options_mut().execution.partitioned_file_prefix_name =
"prefix".to_owned();
+ let ctx = SessionContext::new_with_config(config);
+
+ // Create an in memory table with schema C1 and C2, both strings
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Utf8, false),
+ Field::new("c2", DataType::Utf8, false),
+ ]));
+
+ let record_batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![
+ Arc::new(StringArray::from(vec!["abc", "def"])),
+ Arc::new(StringArray::from(vec!["123", "456"])),
+ ],
+ )?;
+
+ let mem_table = Arc::new(MemTable::try_new(schema,
vec![vec![record_batch]])?);
+
+ // Register the table in the context
+ ctx.register_table("test", mem_table)?;
+
+ let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
+ let local_url = Url::parse("file://local").unwrap();
+ ctx.register_object_store(&local_url, local);
+
+ // execute a simple query and write the results to parquet
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out/";
+ let out_dir_url = format!("file://{out_dir}");
+
+ // Write the results to parquet with partitioning
+ let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+ let df_write_options =
+
DataFrameWriteOptions::new().with_partition_by(vec![String::from("c2")]);
Review Comment:
Heads up: with_partition_by routes through demux.rs, which has the correct
prefix handling. But the plan_to_parquet/plan_to_csv/plan_to_json functions
(used when writing without partitioning) take a different path and aren't
covered by this test.
A second test using DataFrame::write_parquet with default
DataFrameWriteOptions (no partition_by) would catch the bug in those writers.
Happy to sketch out what that test might look like if it'd help.
##########
datafusion/datasource-parquet/src/writer.rs:
##########
@@ -39,21 +39,22 @@ pub async fn plan_to_parquet(
let object_store_url = parsed.object_store();
let store = task_ctx.runtime_env().object_store(&object_store_url)?;
let mut join_set = JoinSet::new();
+ let exec_options = &task_ctx.session_config().options().execution;
for i in 0..plan.output_partitioning().partition_count() {
let plan: Arc<dyn ExecutionPlan> = Arc::clone(&plan);
- let filename = format!("{}/part-{i}.parquet", parsed.prefix());
+ let filename = format!(
+ "{}/{}part-{i}.parquet",
+ exec_options.partitioned_file_prefix_name,
+ parsed.prefix()
+ );
Review Comment:
Same fix needed here - swap parsed.prefix() and
exec_options.partitioned_file_prefix_name.
##########
datafusion/datasource/src/write/demux.rs:
##########
@@ -156,7 +156,8 @@ async fn row_count_demuxer(
let max_buffered_batches =
exec_options.max_buffered_batches_per_output_file;
let minimum_parallel_files = exec_options.minimum_parallel_output_files;
let mut part_idx = 0;
- let write_id = rand::distr::Alphanumeric.sample_string(&mut rand::rng(),
16);
+ let mut write_id = exec_options.partitioned_file_prefix_name.clone();
Review Comment:
This looks right - prepending the prefix to the random write_id before
appending more random chars.
The CSV/JSON/Parquet writers in the other datasource crates have the same
intent but got the format args backwards. Might be worth adding a shared helper
like fn make_output_filename(base: &str, prefix: &str, suffix: &str) -> String
to avoid this class of bug, but that could be a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]