alamb commented on code in PR #6987: URL: https://github.com/apache/arrow-datafusion/pull/6987#discussion_r1265914411
########## datafusion-examples/examples/dataframe-to-s3.rs: ########## @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::datasource::file_format::file_type::{FileType, GetExt}; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::ListingOptions; +use datafusion::error::Result; +use datafusion::prelude::*; + +//use datafusion::prelude::data; +use object_store::aws::AmazonS3Builder; +use std::env; +use std::sync::Arc; +use url::Url; + +/// This example demonstrates executing a simple query against an Arrow data source (a directory +/// with multiple Parquet files) and fetching results +#[tokio::main] +async fn main() -> Result<()> { + // create local execution context + let ctx = SessionContext::new(); + + let region = "us-east-1"; + let bucket_name = "<my_s3_example_bucket>"; + + let s3 = AmazonS3Builder::new() + .with_bucket_name(bucket_name) + .with_region(region) + .with_access_key_id(env::var("AWS_ACCESS_KEY_ID").unwrap()) + .with_secret_access_key(env::var("AWS_SECRET_ACCESS_KEY").unwrap()) + .build()?; + + let path = format!("s3://{bucket_name}"); + let s3_url = Url::parse(&path).unwrap(); + let arc_s3 = Arc::new(s3); + ctx.runtime_env() + .register_object_store(&s3_url, arc_s3.clone()); + + let path = format!("s3://{bucket_name}/test_data/"); + let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + let listing_options = ListingOptions::new(Arc::new(file_format)) + .with_file_extension(FileType::PARQUET.get_ext()); + ctx.register_listing_table("test", &path, listing_options, None, None) + .await?; + + // execute the query + let df = ctx + .sql( + "SELECT testcode, count(1) \ + FROM test \ + group by testcode \ + ", + ) + .await?; + + let out_path = format!("s3://{bucket_name}/test_write/"); + df.write_parquet(&out_path, None).await?; + + //write as JSON to s3 instead Review Comment: Did you mean to leave this commented out? ########## datafusion/core/src/datasource/physical_plan/csv.rs: ########## @@ -566,30 +564,32 @@ pub async fn plan_to_csv( path: impl AsRef<str>, ) -> Result<()> { let path = path.as_ref(); - // create directory to contain the CSV files (one per partition) - let fs_path = Path::new(path); - if let Err(e) = fs::create_dir(fs_path) { - return Err(DataFusionError::Execution(format!( - "Could not create directory {path}: {e:?}" - ))); - } - + let parsed = ListingTableUrl::parse(path)?; + let object_store_url = parsed.object_store(); + let store = task_ctx.runtime_env().object_store(&object_store_url)?; + let mut buffer; let mut join_set = JoinSet::new(); for i in 0..plan.output_partitioning().partition_count() { - let plan = plan.clone(); - let filename = format!("part-{i}.csv"); - let path = fs_path.join(filename); - let file = fs::File::create(path)?; - let mut writer = csv::Writer::new(file); - let stream = plan.execute(i, task_ctx.clone())?; + let storeref = store.clone(); + let plan: Arc<dyn ExecutionPlan> = plan.clone(); + let filename = format!("{}/part-{i}.csv", parsed.prefix()); + let file = object_store::path::Path::parse(filename)?; + buffer = Vec::new(); Review Comment: Do I understand this code that it effectively buffers the entire output to memory prior to writing to the remote store? What do you think about using the multi-part put API instead? https://docs.rs/object_store/0.6.1/object_store/#multipart-put-object -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
