albertlockett opened a new issue, #8004:
URL: https://github.com/apache/arrow-rs/issues/8004

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   I'm reading from a stream of `RecordBatches` and writing to a parquet file. 
In my record batches, the encoding of certain columns can change between 
dictionary encoding and the native array type. 
   
   **Describe the solution you'd like**
   I'd like to be able to write these batches using the `ArrowWriter` and have 
it handle it this without an error. Internally, the column writer will take the 
values out of the dictionary anyway, so I think that a given 
`ArrowColumnWriter` should be able to handle both the dictionary encoded and 
the native type.
   
   ```rs
           let schema = Arc::new(Schema::new(vec![Field::new(
               "a",
               DataType::Dictionary(Box::new(DataType::UInt8), 
Box::new(DataType::Utf8)),
               false,
           )]));
   
           let rb1 = RecordBatch::try_new(
               schema.clone(),
               vec![Arc::new(DictionaryArray::new(
                   UInt8Array::from_iter_values(vec![0, 1, 0]),
                   Arc::new(StringArray::from_iter_values(vec!["parquet", 
"barquet"])),
               ))],
           )
           .unwrap();
   
           let file = tempfile().unwrap();
           let mut writer =
               ArrowWriter::try_new(file.try_clone().unwrap(), rb1.schema(), 
None).unwrap();
           writer.write(&rb1).unwrap();
   
           // check can append another record batch where the field has the 
same type
           // as the dictionary values from the first batch
           let schema2 = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Utf8, false)]));
           let rb2 = RecordBatch::try_new(
               schema2,
               vec![Arc::new(StringArray::from_iter_values(vec![
                   "barquet", "curious",
               ]))],
           )
           .unwrap();
           writer.write(&rb2).unwrap();
   
           writer.close().unwrap();
   
           let mut record_batch_reader =
               ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 
1024).unwrap();
           let actual_batch = record_batch_reader.next().unwrap().unwrap();
   
           let expected_batch = RecordBatch::try_new(
               schema,
               vec![Arc::new(DictionaryArray::new(
                   UInt8Array::from_iter_values(vec![0, 1, 0, 1, 2]),
                   Arc::new(StringArray::from_iter_values(vec![
                       "parquet", "barquet", "curious",
                   ])),
               ))],
           )
           .unwrap();
   
           assert_eq!(actual_batch, expected_batch)
   ```
   
   Currently this returns an error like:
   ```
   Incompatible type. Field 'a' has type "Dictionary(UInt8, Utf8), array has 
type Utf8"
   ```
   
   **Describe alternatives you've considered**
   I could convert the array before passing it to the writer, but I'd like to 
avoid allocating this array before writing.
   
   I also considered creating the levels directly and managing the instances of 
`ArrowColumnWriter` myself, but I'd like to not have to duplicate the logic in 
`ArrowWriter`:
   ```rs
   use std::{fs::File, sync::Arc};
   
   use arrow_array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, 
StringArray, UInt8Array};
   use arrow_schema::{DataType, Field, Schema};
   use parquet::{
       arrow::{
           arrow_reader::ParquetRecordBatchReader,
           arrow_writer::{compute_leaves, get_column_writers},
           ArrowSchemaConverter,
       },
       file::{
           properties::WriterProperties,
           reader::SerializedFileReader,
           writer::{SerializedFileWriter, SerializedRowGroupWriter},
       },
   };
   
   fn main() {
       let schema1 = Arc::new(Schema::new(vec![Field::new(
           "a",
           DataType::Dictionary(Box::new(DataType::UInt8), 
Box::new(DataType::Utf8)),
           false,
       )]));
   
       let writer_props = Arc::new(WriterProperties::default());
       let parquet_schema = ArrowSchemaConverter::new()
           .with_coerce_types(writer_props.coerce_types())
           .convert(&schema1)
           .unwrap();
   
       let mut col_writers = get_column_writers(&parquet_schema, &writer_props, 
&schema1).unwrap();
   
       let root_schema = parquet_schema.root_schema_ptr();
   
       let mut file = File::create_new("/tmp/test.parquet").unwrap();
       let mut writer =
           SerializedFileWriter::new(&mut file, root_schema, 
writer_props.clone()).unwrap();
   
       // write dict
       let col1_arr = Arc::new(DictionaryArray::new(
           UInt8Array::from_iter_values(vec![0, 1, 1]),
           Arc::new(StringArray::from_iter_values(vec!["barquet", "curious"])),
       )) as ArrayRef;
       let leaves = compute_leaves(schema1.field(0), &col1_arr).unwrap();
       for leaf in leaves {
           let col_writer = col_writers.get_mut(0).unwrap();
           col_writer.write(&leaf).unwrap();
       }
   
       // write native
       // let col1_arr = Arc::new(StringArray::from_iter_values(vec!["barquet", 
"very curious"])) as ArrayRef;
       // let col1_field = Field::new("a", DataType::Utf8, false);
   
       let col1_arr = Arc::new(Int32Array::from_iter_values(vec![1, 2, 3])) as 
ArrayRef;
       let col1_field = Field::new("a", DataType::Int32, false);
   
       let leaves = compute_leaves(&col1_field, &col1_arr).unwrap();
       for leaf in leaves {
           let col_writer = col_writers.get_mut(0).unwrap();
           col_writer.write(&leaf).unwrap();
       }
   
       let mut row_group_writer: SerializedRowGroupWriter<'_, _> = 
writer.next_row_group().unwrap();
       for col_writer in col_writers {
           let chunk = col_writer.close().unwrap();
           chunk.append_to_row_group(&mut row_group_writer).unwrap();
       }
   
       row_group_writer.close().unwrap();
       writer.close().unwrap();
   
       // read back
       let reader = 
ParquetRecordBatchReader::try_new(file.try_clone().unwrap(), 1024).unwrap();
       for batch in reader {
           println!("got batch: {:?}", batch.unwrap());
       }
   }
   
   ```
   
   **Additional context**
   This is related to otel-arrow's parquet exporter
   
https://github.com/open-telemetry/otel-arrow/blob/main/rust/otap-dataflow/crates/otap/src/parquet_exporter.rs
   which is receiving a stream of adaptive arrays similar to what is described 
here:
   
https://arrow.apache.org/blog/2023/06/26/our-journey-at-f5-with-apache-arrow-part-2/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to