This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d917a0efd Extend insert into support to include Json backed tables 
(#7212)
3d917a0efd is described below

commit 3d917a0efd2688233d435d0235d9e6360cf79a63
Author: devinjdangelo <devinjdang...@gmail.com>
AuthorDate: Tue Aug 8 09:18:28 2023 -0400

    Extend insert into support to include Json backed tables (#7212)
    
    * jsonsink and test simplemented
    
    * fix tests and clean up
    
    * clippy
    
    * minor refactor
    
    * comments + append existing file test check no new files added
    
    * format comments
    
    Co-authored-by: Metehan Yıldırım 
<100111937+metesynn...@users.noreply.github.com>
    
    ---------
    
    Co-authored-by: Metehan Yıldırım 
<100111937+metesynn...@users.noreply.github.com>
---
 datafusion/core/src/datasource/file_format/csv.rs  |  78 ++-----
 datafusion/core/src/datasource/file_format/json.rs | 236 +++++++++++++++++++-
 datafusion/core/src/datasource/file_format/mod.rs  |  75 ++++++-
 .../core/src/datasource/file_format/options.rs     |  20 ++
 datafusion/core/src/datasource/listing/table.rs    | 247 +++++++++++++++------
 5 files changed, 518 insertions(+), 138 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/csv.rs 
b/datafusion/core/src/datasource/file_format/csv.rs
index cbcdc2f112..0d8641a464 100644
--- a/datafusion/core/src/datasource/file_format/csv.rs
+++ b/datafusion/core/src/datasource/file_format/csv.rs
@@ -36,9 +36,9 @@ use bytes::{Buf, Bytes};
 use futures::stream::BoxStream;
 use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
 use object_store::{delimited::newline_delimited_stream, ObjectMeta, 
ObjectStore};
-use tokio::io::{AsyncWrite, AsyncWriteExt};
+use tokio::io::AsyncWrite;
 
-use super::FileFormat;
+use super::{stateless_serialize_and_write_files, FileFormat};
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::FileWriterMode;
 use crate::datasource::file_format::{
@@ -274,6 +274,12 @@ impl FileFormat for CsvFormat {
                 "Overwrites are not implemented yet for CSV".into(),
             ));
         }
+
+        if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
+            return Err(DataFusionError::NotImplemented(
+                "Inserting compressed CSV is not implemented yet.".into(),
+            ));
+        }
         let sink_schema = conf.output_schema().clone();
         let sink = Arc::new(CsvSink::new(
             conf,
@@ -439,28 +445,6 @@ impl BatchSerializer for CsvSerializer {
     }
 }
 
-async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
-    result: Result<T>,
-    writers: &mut [AbortableWrite<W>],
-) -> Result<T> {
-    match result {
-        Ok(value) => Ok(value),
-        Err(e) => {
-            // Abort all writers before returning the error:
-            for writer in writers {
-                let mut abort_future = writer.abort_writer();
-                if let Ok(abort_future) = &mut abort_future {
-                    let _ = abort_future.await;
-                }
-                // Ignore errors that occur during abortion,
-                // We do try to abort all writers before returning error.
-            }
-            // After aborting writers return original error.
-            Err(e)
-        }
-    }
-}
-
 /// Implements [`DataSink`] for writing to a CSV file.
 struct CsvSink {
     /// Config options for writing data
@@ -566,7 +550,7 @@ impl CsvSink {
 impl DataSink for CsvSink {
     async fn write_all(
         &self,
-        mut data: Vec<SendableRecordBatchStream>,
+        data: Vec<SendableRecordBatchStream>,
         context: &Arc<TaskContext>,
     ) -> Result<u64> {
         let num_partitions = data.len();
@@ -576,7 +560,7 @@ impl DataSink for CsvSink {
             .object_store(&self.config.object_store_url)?;
 
         // Construct serializer and writer for each file group
-        let mut serializers = vec![];
+        let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
         let mut writers = vec![];
         match self.config.writer_mode {
             FileWriterMode::Append => {
@@ -590,7 +574,7 @@ impl DataSink for CsvSink {
                     let serializer = CsvSerializer::new()
                         .with_builder(builder)
                         .with_header(header);
-                    serializers.push(serializer);
+                    serializers.push(Box::new(serializer));
 
                     let file = file_group.clone();
                     let writer = self
@@ -608,9 +592,9 @@ impl DataSink for CsvSink {
                 ))
             }
             FileWriterMode::PutMultipart => {
-                //currently assuming only 1 partition path (i.e. not hive 
style partitioning on a column)
+                // Currently assuming only 1 partition path (i.e. not 
hive-style partitioning on a column)
                 let base_path = &self.config.table_paths[0];
-                //uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
+                // Uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
                 let write_id = Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
                 for part_idx in 0..num_partitions {
                     let header = true;
@@ -618,7 +602,7 @@ impl DataSink for CsvSink {
                     let serializer = CsvSerializer::new()
                         .with_builder(builder)
                         .with_header(header);
-                    serializers.push(serializer);
+                    serializers.push(Box::new(serializer));
                     let file_path = base_path
                         .prefix()
                         .child(format!("/{}_{}.csv", write_id, part_idx));
@@ -636,39 +620,7 @@ impl DataSink for CsvSink {
             }
         }
 
-        let mut row_count = 0;
-        // Map errors to DatafusionError.
-        let err_converter =
-            |_| DataFusionError::Internal("Unexpected FileSink 
Error".to_string());
-        // TODO parallelize serialization accross partitions and batches 
within partitions
-        // see: https://github.com/apache/arrow-datafusion/issues/7079
-        for idx in 0..num_partitions {
-            while let Some(maybe_batch) = data[idx].next().await {
-                // Write data to files in a round robin fashion:
-                let serializer = &mut serializers[idx];
-                let batch = check_for_errors(maybe_batch, &mut writers).await?;
-                row_count += batch.num_rows();
-                let bytes =
-                    check_for_errors(serializer.serialize(batch).await, &mut 
writers)
-                        .await?;
-                let writer = &mut writers[idx];
-                check_for_errors(
-                    writer.write_all(&bytes).await.map_err(err_converter),
-                    &mut writers,
-                )
-                .await?;
-            }
-        }
-        // Perform cleanup:
-        let n_writers = writers.len();
-        for idx in 0..n_writers {
-            check_for_errors(
-                writers[idx].shutdown().await.map_err(err_converter),
-                &mut writers,
-            )
-            .await?;
-        }
-        Ok(row_count as u64)
+        stateless_serialize_and_write_files(data, serializers, writers).await
     }
 }
 
diff --git a/datafusion/core/src/datasource/file_format/json.rs 
b/datafusion/core/src/datasource/file_format/json.rs
index 6247e85ba8..dae3a18f96 100644
--- a/datafusion/core/src/datasource/file_format/json.rs
+++ b/datafusion/core/src/datasource/file_format/json.rs
@@ -19,28 +19,52 @@
 
 use std::any::Any;
 
+use bytes::Bytes;
+use datafusion_common::DataFusionError;
+use datafusion_execution::TaskContext;
+use rand::distributions::Alphanumeric;
+use rand::distributions::DistString;
+use std::fmt;
+use std::fmt::Debug;
 use std::io::BufReader;
 use std::sync::Arc;
+use tokio::io::AsyncWrite;
 
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
+use arrow::json;
 use arrow::json::reader::infer_json_schema_from_iterator;
 use arrow::json::reader::ValueIter;
+use arrow_array::RecordBatch;
 use async_trait::async_trait;
 use bytes::Buf;
 
 use datafusion_physical_expr::PhysicalExpr;
 use object_store::{GetResult, ObjectMeta, ObjectStore};
 
+use crate::datasource::physical_plan::FileGroupDisplay;
+use crate::datasource::physical_plan::FileMeta;
+use crate::physical_plan::insert::DataSink;
+use crate::physical_plan::insert::InsertExec;
+use crate::physical_plan::SendableRecordBatchStream;
+use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
+
+use super::stateless_serialize_and_write_files;
+use super::AbortMode;
+use super::AbortableWrite;
+use super::AsyncPutWriter;
+use super::BatchSerializer;
 use super::FileFormat;
 use super::FileScanConfig;
+use super::FileWriterMode;
+use super::MultiPart;
 use crate::datasource::file_format::file_type::FileCompressionType;
 use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
+use crate::datasource::physical_plan::FileSinkConfig;
 use crate::datasource::physical_plan::NdJsonExec;
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::physical_plan::ExecutionPlan;
-use crate::physical_plan::Statistics;
 
 /// The default file extension of json files
 pub const DEFAULT_JSON_EXTENSION: &str = ".json";
@@ -148,6 +172,216 @@ impl FileFormat for JsonFormat {
         let exec = NdJsonExec::new(conf, 
self.file_compression_type.to_owned());
         Ok(Arc::new(exec))
     }
+
+    async fn create_writer_physical_plan(
+        &self,
+        input: Arc<dyn ExecutionPlan>,
+        _state: &SessionState,
+        conf: FileSinkConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if conf.overwrite {
+            return Err(DataFusionError::NotImplemented(
+                "Overwrites are not implemented yet for Json".into(),
+            ));
+        }
+
+        if self.file_compression_type != FileCompressionType::UNCOMPRESSED {
+            return Err(DataFusionError::NotImplemented(
+                "Inserting compressed JSON is not implemented yet.".into(),
+            ));
+        }
+        let sink_schema = conf.output_schema().clone();
+        let sink = Arc::new(JsonSink::new(conf, 
self.file_compression_type.clone()));
+
+        Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
+    }
+}
+
+impl Default for JsonSerializer {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+/// Define a struct for serializing Json records to a stream
+pub struct JsonSerializer {
+    // Inner buffer for avoiding reallocation
+    buffer: Vec<u8>,
+}
+
+impl JsonSerializer {
+    /// Constructor for the JsonSerializer object
+    pub fn new() -> Self {
+        Self {
+            buffer: Vec::with_capacity(4096),
+        }
+    }
+}
+
+#[async_trait]
+impl BatchSerializer for JsonSerializer {
+    async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes> {
+        let mut writer = json::LineDelimitedWriter::new(&mut self.buffer);
+        writer.write(&batch)?;
+        //drop(writer);
+        Ok(Bytes::from(self.buffer.drain(..).collect::<Vec<u8>>()))
+    }
+}
+
+/// Implements [`DataSink`] for writing to a Json file.
+struct JsonSink {
+    /// Config options for writing data
+    config: FileSinkConfig,
+    file_compression_type: FileCompressionType,
+}
+
+impl Debug for JsonSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("JsonSink")
+            .field("file_compression_type", &self.file_compression_type)
+            .finish()
+    }
+}
+
+impl DisplayAs for JsonSink {
+    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> 
fmt::Result {
+        match t {
+            DisplayFormatType::Default | DisplayFormatType::Verbose => {
+                write!(
+                    f,
+                    "JsonSink(writer_mode={:?}, file_groups=",
+                    self.config.writer_mode
+                )?;
+                FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
+                write!(f, ")")
+            }
+        }
+    }
+}
+
+impl JsonSink {
+    fn new(config: FileSinkConfig, file_compression_type: FileCompressionType) 
-> Self {
+        Self {
+            config,
+            file_compression_type,
+        }
+    }
+
+    // Create a write for Json files
+    async fn create_writer(
+        &self,
+        file_meta: FileMeta,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Result<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>> {
+        let object = &file_meta.object_meta;
+        match self.config.writer_mode {
+            // If the mode is append, call the store's append method and 
return wrapped in
+            // a boxed trait object.
+            FileWriterMode::Append => {
+                let writer = object_store
+                    .append(&object.location)
+                    .await
+                    .map_err(DataFusionError::ObjectStore)?;
+                let writer = AbortableWrite::new(
+                    self.file_compression_type.convert_async_writer(writer)?,
+                    AbortMode::Append,
+                );
+                Ok(writer)
+            }
+            // If the mode is put, create a new AsyncPut writer and return it 
wrapped in
+            // a boxed trait object
+            FileWriterMode::Put => {
+                let writer = Box::new(AsyncPutWriter::new(object.clone(), 
object_store));
+                let writer = AbortableWrite::new(
+                    self.file_compression_type.convert_async_writer(writer)?,
+                    AbortMode::Put,
+                );
+                Ok(writer)
+            }
+            // If the mode is put multipart, call the store's put_multipart 
method and
+            // return the writer wrapped in a boxed trait object.
+            FileWriterMode::PutMultipart => {
+                let (multipart_id, writer) = object_store
+                    .put_multipart(&object.location)
+                    .await
+                    .map_err(DataFusionError::ObjectStore)?;
+                Ok(AbortableWrite::new(
+                    self.file_compression_type.convert_async_writer(writer)?,
+                    AbortMode::MultiPart(MultiPart::new(
+                        object_store,
+                        multipart_id,
+                        object.location.clone(),
+                    )),
+                ))
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl DataSink for JsonSink {
+    async fn write_all(
+        &self,
+        data: Vec<SendableRecordBatchStream>,
+        context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        let num_partitions = data.len();
+
+        let object_store = context
+            .runtime_env()
+            .object_store(&self.config.object_store_url)?;
+
+        // Construct serializer and writer for each file group
+        let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
+        let mut writers = vec![];
+        match self.config.writer_mode {
+            FileWriterMode::Append => {
+                for file_group in &self.config.file_groups {
+                    let serializer = JsonSerializer::new();
+                    serializers.push(Box::new(serializer));
+
+                    let file = file_group.clone();
+                    let writer = self
+                        .create_writer(
+                            file.object_meta.clone().into(),
+                            object_store.clone(),
+                        )
+                        .await?;
+                    writers.push(writer);
+                }
+            }
+            FileWriterMode::Put => {
+                return Err(DataFusionError::NotImplemented(
+                    "Put Mode is not implemented for Json Sink yet".into(),
+                ))
+            }
+            FileWriterMode::PutMultipart => {
+                // Currently assuming only 1 partition path (i.e. not 
hive-style partitioning on a column)
+                let base_path = &self.config.table_paths[0];
+                // Uniquely identify this batch of files with a random string, 
to prevent collisions overwriting files
+                let write_id = Alphanumeric.sample_string(&mut 
rand::thread_rng(), 16);
+                for part_idx in 0..num_partitions {
+                    let serializer = JsonSerializer::new();
+                    serializers.push(Box::new(serializer));
+                    let file_path = base_path
+                        .prefix()
+                        .child(format!("/{}_{}.json", write_id, part_idx));
+                    let object_meta = ObjectMeta {
+                        location: file_path,
+                        last_modified: chrono::offset::Utc::now(),
+                        size: 0,
+                        e_tag: None,
+                    };
+                    let writer = self
+                        .create_writer(object_meta.into(), 
object_store.clone())
+                        .await?;
+                    writers.push(writer);
+                }
+            }
+        }
+
+        stateless_serialize_and_write_files(data, serializers, writers).await
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/datasource/file_format/mod.rs 
b/datafusion/core/src/datasource/file_format/mod.rs
index 4cc6e8706a..97492276a2 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -39,7 +39,7 @@ use crate::arrow::datatypes::SchemaRef;
 use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
 use crate::error::Result;
 use crate::execution::context::SessionState;
-use crate::physical_plan::{ExecutionPlan, Statistics};
+use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream, 
Statistics};
 
 use arrow_array::RecordBatch;
 use datafusion_common::DataFusionError;
@@ -48,11 +48,11 @@ use datafusion_physical_expr::PhysicalExpr;
 use async_trait::async_trait;
 use bytes::Bytes;
 use futures::future::BoxFuture;
-use futures::ready;
 use futures::FutureExt;
+use futures::{ready, StreamExt};
 use object_store::path::Path;
 use object_store::{MultipartId, ObjectMeta, ObjectStore};
-use tokio::io::AsyncWrite;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
 /// This trait abstracts all the file format specific implementations
 /// from the [`TableProvider`]. This helps code re-utilization across
 /// providers that support the the same file formats.
@@ -313,6 +313,75 @@ pub trait BatchSerializer: Unpin + Send {
     async fn serialize(&mut self, batch: RecordBatch) -> Result<Bytes>;
 }
 
+/// Checks if any of the passed writers have encountered an error
+/// and if so, all writers are aborted.
+async fn check_for_errors<T, W: AsyncWrite + Unpin + Send>(
+    result: Result<T>,
+    writers: &mut [AbortableWrite<W>],
+) -> Result<T> {
+    match result {
+        Ok(value) => Ok(value),
+        Err(e) => {
+            // Abort all writers before returning the error:
+            for writer in writers {
+                let mut abort_future = writer.abort_writer();
+                if let Ok(abort_future) = &mut abort_future {
+                    let _ = abort_future.await;
+                }
+                // Ignore errors that occur during abortion,
+                // We do try to abort all writers before returning error.
+            }
+            // After aborting writers return original error.
+            Err(e)
+        }
+    }
+}
+
+/// Contains the common logic for serializing RecordBatches and
+/// writing the resulting bytes to an ObjectStore.
+/// Serialization is assumed to be stateless, i.e.
+/// each RecordBatch can be serialized without any
+/// dependency on the RecordBatches before or after.
+async fn stateless_serialize_and_write_files(
+    mut data: Vec<SendableRecordBatchStream>,
+    mut serializers: Vec<Box<dyn BatchSerializer>>,
+    mut writers: Vec<AbortableWrite<Box<dyn AsyncWrite + Send + Unpin>>>,
+) -> Result<u64> {
+    let num_partitions = data.len();
+    let mut row_count = 0;
+    // Map errors to DatafusionError.
+    let err_converter =
+        |_| DataFusionError::Internal("Unexpected FileSink Error".to_string());
+    // TODO parallelize serialization accross partitions and batches within 
partitions
+    // see: https://github.com/apache/arrow-datafusion/issues/7079
+    for idx in 0..num_partitions {
+        while let Some(maybe_batch) = data[idx].next().await {
+            // Write data to files in a round robin fashion:
+            let serializer = &mut serializers[idx];
+            let batch = check_for_errors(maybe_batch, &mut writers).await?;
+            row_count += batch.num_rows();
+            let bytes =
+                check_for_errors(serializer.serialize(batch).await, &mut 
writers).await?;
+            let writer = &mut writers[idx];
+            check_for_errors(
+                writer.write_all(&bytes).await.map_err(err_converter),
+                &mut writers,
+            )
+            .await?;
+        }
+    }
+    // Perform cleanup:
+    let n_writers = writers.len();
+    for idx in 0..n_writers {
+        check_for_errors(
+            writers[idx].shutdown().await.map_err(err_converter),
+            &mut writers,
+        )
+        .await?;
+    }
+    Ok(row_count as u64)
+}
+
 #[cfg(test)]
 pub(crate) mod test_util {
     use std::ops::Range;
diff --git a/datafusion/core/src/datasource/file_format/options.rs 
b/datafusion/core/src/datasource/file_format/options.rs
index b8499065bd..73c20d3b0c 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -373,6 +373,10 @@ pub struct NdJsonReadOptions<'a> {
     pub file_compression_type: FileCompressionType,
     /// Flag indicating whether this file may be unbounded (as in a FIFO file).
     pub infinite: bool,
+    /// Indicates how the file is sorted
+    pub file_sort_order: Vec<Vec<Expr>>,
+    /// Setting controls how inserts to this file should be handled
+    pub insert_mode: ListingTableInsertMode,
 }
 
 impl<'a> Default for NdJsonReadOptions<'a> {
@@ -384,6 +388,8 @@ impl<'a> Default for NdJsonReadOptions<'a> {
             table_partition_cols: vec![],
             file_compression_type: FileCompressionType::UNCOMPRESSED,
             infinite: false,
+            file_sort_order: vec![],
+            insert_mode: ListingTableInsertMode::AppendToFile,
         }
     }
 }
@@ -424,6 +430,18 @@ impl<'a> NdJsonReadOptions<'a> {
         self.schema = Some(schema);
         self
     }
+
+    /// Configure if file has known sort order
+    pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<Expr>>) -> Self {
+        self.file_sort_order = file_sort_order;
+        self
+    }
+
+    /// Configure how insertions to this table should be handled
+    pub fn insert_mode(mut self, insert_mode: ListingTableInsertMode) -> Self {
+        self.insert_mode = insert_mode;
+        self
+    }
 }
 
 #[async_trait]
@@ -535,6 +553,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
             .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
             .with_infinite_source(self.infinite)
+            .with_file_sort_order(self.file_sort_order.clone())
+            .with_insert_mode(self.insert_mode.clone())
     }
 
     async fn get_resolved_schema(
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 4085dac484..b47d25d1f9 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -805,6 +805,19 @@ impl TableProvider for ListingTable {
             );
         }
 
+        // TODO support inserts to sorted tables which preserve sort_order
+        // Inserts currently make no effort to preserve sort_order. This could 
lead to
+        // incorrect query results on the table after inserting incorrectly 
sorted data.
+        let unsorted: Vec<Vec<Expr>> = vec![];
+        if self.options.file_sort_order != unsorted {
+            return Err(
+                DataFusionError::NotImplemented(
+                    "Writing to a sorted listing table via insert into is not 
supported yet. \
+                    To write to this table in the meantime, register an 
equivalent table with \
+                    file_sort_order = vec![]".into())
+            );
+        }
+
         let table_path = &self.table_paths()[0];
         // Get the object store for the table path.
         let store = state.runtime_env().object_store(table_path)?;
@@ -838,10 +851,9 @@ impl TableProvider for ListingTable {
                 writer_mode = 
crate::datasource::file_format::FileWriterMode::PutMultipart
             }
             ListingTableInsertMode::Error => {
-                return Err(DataFusionError::Plan(
+                return plan_err!(
                     "Invalid plan attempting write to table with 
TableWriteMode::Error!"
-                        .into(),
-                ))
+                )
             }
         }
 
@@ -935,6 +947,7 @@ mod tests {
     use super::*;
     use crate::datasource::file_format::file_type::GetExt;
     use crate::datasource::{provider_as_source, MemTable};
+    use crate::execution::options::ArrowReadOptions;
     use crate::physical_plan::collect;
     use crate::prelude::*;
     use crate::{
@@ -944,9 +957,7 @@ mod tests {
         logical_expr::{col, lit},
         test::{columns, object_store::register_test_store},
     };
-    use arrow::csv;
     use arrow::datatypes::{DataType, Schema};
-    use arrow::error::Result as ArrowResult;
     use arrow::record_batch::RecordBatch;
     use chrono::DateTime;
     use datafusion_common::assert_contains;
@@ -1434,26 +1445,6 @@ mod tests {
         Ok(Arc::new(table))
     }
 
-    fn load_empty_schema_csv_table(
-        schema: SchemaRef,
-        temp_path: &str,
-        insert_mode: ListingTableInsertMode,
-    ) -> Result<Arc<dyn TableProvider>> {
-        File::create(temp_path)?;
-        let table_path = ListingTableUrl::parse(temp_path).unwrap();
-
-        let file_format = CsvFormat::default();
-        let listing_options =
-            
ListingOptions::new(Arc::new(file_format)).with_insert_mode(insert_mode);
-
-        let config = ListingTableConfig::new(table_path)
-            .with_listing_options(listing_options)
-            .with_schema(schema);
-
-        let table = ListingTable::try_new(config)?;
-        Ok(Arc::new(table))
-    }
-
     /// Check that the files listed by the table match the specified 
`output_partitioning`
     /// when the object store contains `files`.
     async fn assert_list_files_for_scan_grouping(
@@ -1559,10 +1550,72 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn test_append_plan_to_external_table_stored_as_csv() -> Result<()> {
-        let file_type = FileType::CSV;
-        let file_compression_type = FileCompressionType::UNCOMPRESSED;
+    async fn test_insert_into_append_to_json_file() -> Result<()> {
+        helper_test_insert_into_append_to_existing_files(
+            FileType::JSON,
+            FileCompressionType::UNCOMPRESSED,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_append_new_json_files() -> Result<()> {
+        helper_test_append_new_files_to_table(
+            FileType::JSON,
+            FileCompressionType::UNCOMPRESSED,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_append_to_csv_file() -> Result<()> {
+        helper_test_insert_into_append_to_existing_files(
+            FileType::CSV,
+            FileCompressionType::UNCOMPRESSED,
+        )
+        .await?;
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_insert_into_append_new_csv_files() -> Result<()> {
+        helper_test_append_new_files_to_table(
+            FileType::CSV,
+            FileCompressionType::UNCOMPRESSED,
+        )
+        .await?;
+        Ok(())
+    }
+
+    fn load_empty_schema_table(
+        schema: SchemaRef,
+        temp_path: &str,
+        insert_mode: ListingTableInsertMode,
+        file_format: Arc<dyn FileFormat>,
+    ) -> Result<Arc<dyn TableProvider>> {
+        File::create(temp_path)?;
+        let table_path = ListingTableUrl::parse(temp_path).unwrap();
+
+        let listing_options =
+            
ListingOptions::new(file_format.clone()).with_insert_mode(insert_mode);
+
+        let config = ListingTableConfig::new(table_path)
+            .with_listing_options(listing_options)
+            .with_schema(schema);
+
+        let table = ListingTable::try_new(config)?;
+        Ok(Arc::new(table))
+    }
 
+    /// Logic of testing inserting into listing table by Appending to existing 
files
+    /// is the same for all formats/options which support this. This helper 
allows
+    /// passing different options to execute the same test with different 
settings.
+    async fn helper_test_insert_into_append_to_existing_files(
+        file_type: FileType,
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         // Create the initial context, schema, and batch.
         let session_ctx = SessionContext::new();
         // Create a new schema with one field called "a" of type Int32
@@ -1587,17 +1640,27 @@ mod tests {
                 .unwrap()
         );
 
-        // Define batch size for file reader
-        let batch_size = batch.num_rows();
-
         // Create a temporary directory and a CSV file within it.
         let tmp_dir = TempDir::new()?;
         let path = tmp_dir.path().join(filename);
 
-        let initial_table = load_empty_schema_csv_table(
+        let file_format: Arc<dyn FileFormat> = match file_type {
+            FileType::CSV => Arc::new(
+                
CsvFormat::default().with_file_compression_type(file_compression_type),
+            ),
+            FileType::JSON => Arc::new(
+                
JsonFormat::default().with_file_compression_type(file_compression_type),
+            ),
+            FileType::PARQUET => Arc::new(ParquetFormat::default()),
+            FileType::AVRO => Arc::new(AvroFormat {}),
+            FileType::ARROW => Arc::new(ArrowFormat {}),
+        };
+
+        let initial_table = load_empty_schema_table(
             schema.clone(),
             path.to_str().unwrap(),
             ListingTableInsertMode::AppendToFile,
+            file_format,
         )?;
         session_ctx.register_table("t", initial_table)?;
         // Create and register the source table with the provided schema and 
inserted data
@@ -1632,19 +1695,9 @@ mod tests {
 
         // Assert that the batches read from the file match the expected 
result.
         assert_batches_eq!(expected, &res);
-        // Open the CSV file, read its contents as a record batch, and collect 
the batches into a vector.
-        let file = File::open(path.clone())?;
-        let reader = csv::ReaderBuilder::new(schema.clone())
-            .has_header(true)
-            .with_batch_size(batch_size)
-            .build(file)
-            .map_err(|e| DataFusionError::Internal(e.to_string()))?;
-
-        let batches = reader
-            .collect::<Vec<ArrowResult<RecordBatch>>>()
-            .into_iter()
-            .collect::<ArrowResult<Vec<RecordBatch>>>()
-            .map_err(|e| DataFusionError::Internal(e.to_string()))?;
+
+        // Read the records in the table
+        let batches = session_ctx.sql("select * from 
t").await?.collect().await?;
 
         // Define the expected result as a vector of strings.
         let expected = vec![
@@ -1663,6 +1716,10 @@ mod tests {
         // Assert that the batches read from the file match the expected 
result.
         assert_batches_eq!(expected, &batches);
 
+        // Assert that only 1 file was added to the table
+        let num_files = tmp_dir.path().read_dir()?.count();
+        assert_eq!(num_files, 1);
+
         // Create a physical plan from the insert plan
         let plan = session_ctx
             .state()
@@ -1684,18 +1741,7 @@ mod tests {
         assert_batches_eq!(expected, &res);
 
         // Open the CSV file, read its contents as a record batch, and collect 
the batches into a vector.
-        let file = File::open(path.clone())?;
-        let reader = csv::ReaderBuilder::new(schema.clone())
-            .has_header(true)
-            .with_batch_size(batch_size)
-            .build(file)
-            .map_err(|e| DataFusionError::Internal(e.to_string()))?;
-
-        let batches = reader
-            .collect::<Vec<ArrowResult<RecordBatch>>>()
-            .into_iter()
-            .collect::<ArrowResult<Vec<RecordBatch>>>()
-            .map_err(|e| DataFusionError::Internal(e.to_string()));
+        let batches = session_ctx.sql("select * from 
t").await?.collect().await?;
 
         // Define the expected result after the second append.
         let expected = vec![
@@ -1718,14 +1764,20 @@ mod tests {
         ];
 
         // Assert that the batches read from the file after the second append 
match the expected result.
-        assert_batches_eq!(expected, &batches?);
+        assert_batches_eq!(expected, &batches);
+
+        // Assert that no additional files were added to the table
+        let num_files = tmp_dir.path().read_dir()?.count();
+        assert_eq!(num_files, 1);
 
         // Return Ok if the function
         Ok(())
     }
 
-    #[tokio::test]
-    async fn test_append_new_files_to_csv_table() -> Result<()> {
+    async fn helper_test_append_new_files_to_table(
+        file_type: FileType,
+        file_compression_type: FileCompressionType,
+    ) -> Result<()> {
         // Create the initial context, schema, and batch.
         let session_ctx = SessionContext::new();
         // Create a new schema with one field called "a" of type Int32
@@ -1741,17 +1793,70 @@ mod tests {
             vec![Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]))],
         )?;
 
-        // Create a temporary directory and a CSV file within it.
+        // Register appropriate table depending on file_type we want to test
         let tmp_dir = TempDir::new()?;
-        session_ctx
-            .register_csv(
-                "t",
-                tmp_dir.path().to_str().unwrap(),
-                CsvReadOptions::new()
-                    .insert_mode(ListingTableInsertMode::AppendNewFiles)
-                    .schema(schema.as_ref()),
-            )
-            .await?;
+        match file_type {
+            FileType::CSV => {
+                session_ctx
+                    .register_csv(
+                        "t",
+                        tmp_dir.path().to_str().unwrap(),
+                        CsvReadOptions::new()
+                            
.insert_mode(ListingTableInsertMode::AppendNewFiles)
+                            .schema(schema.as_ref())
+                            .file_compression_type(file_compression_type),
+                    )
+                    .await?;
+            }
+            FileType::JSON => {
+                session_ctx
+                    .register_json(
+                        "t",
+                        tmp_dir.path().to_str().unwrap(),
+                        NdJsonReadOptions::default()
+                            
.insert_mode(ListingTableInsertMode::AppendNewFiles)
+                            .schema(schema.as_ref())
+                            .file_compression_type(file_compression_type),
+                    )
+                    .await?;
+            }
+            FileType::PARQUET => {
+                session_ctx
+                    .register_parquet(
+                        "t",
+                        tmp_dir.path().to_str().unwrap(),
+                        ParquetReadOptions::default(), // TODO implement 
insert_mode for parquet
+                                                       
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
+                                                       
//.schema(schema.as_ref()),
+                    )
+                    .await?;
+            }
+            FileType::AVRO => {
+                session_ctx
+                    .register_avro(
+                        "t",
+                        tmp_dir.path().to_str().unwrap(),
+                        AvroReadOptions::default()
+                            // TODO implement insert_mode for avro
+                            
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
+                            .schema(schema.as_ref()),
+                    )
+                    .await?;
+            }
+            FileType::ARROW => {
+                session_ctx
+                    .register_arrow(
+                        "t",
+                        tmp_dir.path().to_str().unwrap(),
+                        ArrowReadOptions::default()
+                            // TODO implement insert_mode for arrow
+                            
//.insert_mode(ListingTableInsertMode::AppendNewFiles)
+                            .schema(schema.as_ref()),
+                    )
+                    .await?;
+            }
+        }
+
         // Create and register the source table with the provided schema and 
inserted data
         let source_table = Arc::new(MemTable::try_new(
             schema.clone(),
@@ -1804,7 +1909,7 @@ mod tests {
         // Assert that the batches read from the file match the expected 
result.
         assert_batches_eq!(expected, &batches);
 
-        //asert that 6 files were added to the table
+        // Assert that 6 files were added to the table
         let num_files = tmp_dir.path().read_dir()?.count();
         assert_eq!(num_files, 6);
 


Reply via email to