metesynnada opened a new issue, #3674:
URL: https://github.com/apache/arrow-rs/issues/3674

   **Describe the bug**
   
   The bug in the `arrow-csv` reader of the arrow-rs library affects the 
reading of CSV files in a FIFO environment where an EOF is not received until 
the file is closed. The bug occurs because the code that reads the buffer is 
designed to wait for additional bytes, even if the batch size is set to the 
correct size.
   
   For example, if the batch size is set to 64 and 64 rows are provided to the 
reader, the decoder will have enough data to create a **`RecordBatch`**. 
However, when the loop iterates for the second time, the code waits for 
additional bytes at **`self.reader.fill_buf()?`**, causing a deadlock. This 
prevents tests for streaming purposes from working, even though this was 
supported before the PR https://github.com/apache/arrow-rs/pull/3604.
   
   ```rust
   impl<R: BufRead> BufReader<R> {
       fn read(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
           loop {
               let buf = self.reader.fill_buf()?;
               let decoded = self.decoder.decode(buf)?;
               if decoded == 0 {
                   break;
               }
               self.reader.consume(decoded);
           }
   
           self.decoder.flush()
       }
   }
   ```
   
   **To Reproduce**
   
   - Add `nix = "0.26.2"` into `dev-dependencies`
   - Copy paste the code and run the code within 
`arrow-csv/src/reader/records.rs` or any convenient place in arrow-csv.
   
   ```rust
   #[cfg(test)]
   mod pr {
       use crate::ReaderBuilder;
       use arrow_array::RecordBatch;
       use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
       use nix::sys::stat;
       use nix::unistd;
       use std::fs::{File, OpenOptions};
       use std::io::BufRead;
       use std::io::BufReader as StdBufReader;
       use std::io::Write;
       use std::path::Path;
       use std::path::PathBuf;
       use std::sync::{Arc, Mutex};
       use std::thread;
       use std::time::{Duration, Instant};
       use tempfile::TempDir;
   
       fn create_fifo_file(
           tmp_dir: &TempDir,
           file_name: &str,
       ) -> Result<PathBuf, ArrowError> {
           let file_path = tmp_dir.path().join(file_name);
           if let Err(e) = unistd::mkfifo(&file_path, stat::Mode::S_IRWXU) {
               Err(ArrowError::CsvError(e.to_string()))
           } else {
               Ok(file_path)
           }
       }
   
       fn write_to_fifo(mut file: &File, line: &str) -> Result<usize, 
ArrowError> {
           file.write(line.as_bytes()).or_else(|e| {
               // Broken pipe error
               if e.raw_os_error().unwrap() == 32 {
                   thread::sleep(Duration::from_millis(100));
                   return Ok(0);
               }
               Err(ArrowError::CsvError(e.to_string()))
           })
       }
   
       fn read_from_csv<R: BufRead>(
           mut reader: R,
           schema: SchemaRef,
           batch_size: usize,
       ) -> Result<impl Iterator<Item = Result<RecordBatch, ArrowError>>, 
ArrowError> {
           let mut decoder = ReaderBuilder::new()
               .with_schema(schema)
               .with_batch_size(batch_size)
               .build_decoder();
           let mut next = move || {
               loop {
                   //Deadlock happens here since we are waiting for bytes to 
produce the first batch.
                   let buf = reader.fill_buf()?;
                   let decoded = decoder.decode(buf)?;
                   if decoded == 0 {
                       break;
                   }
                   reader.consume(decoded);
               }
               decoder.flush()
           };
           Ok(std::iter::from_fn(move || next().transpose()))
       }
   
       const TEST_BATCH_SIZE: usize = 50;
   
       #[test]
       fn csv_reader_env() -> Result<(), ArrowError> {
           // We use a lock to wait for a batch creation
           let waiting = Arc::new(Mutex::new(true));
           let waiting_thread = waiting.clone();
           let tmp_dir = TempDir::new()?;
           let fifo_path = create_fifo_file(&tmp_dir, "fifo_file.csv")?;
           let fifo_path_thread = fifo_path.clone();
           let joinable_iterator = (0..TEST_BATCH_SIZE).map(|_| 
"a".to_string());
           let fifo_writer = thread::spawn(move || {
               let first_file = OpenOptions::new()
                   .write(true)
                   .open(fifo_path_thread)
                   .unwrap();
               for (cnt, string_col) in joinable_iterator.enumerate() {
                   let line = format!("{string_col},{cnt}\n").to_owned();
                   write_to_fifo(&first_file, &line).unwrap();
               }
               // This part prevents that we get an EOF in FIFO.
               while *waiting_thread.lock().unwrap() {
                   thread::sleep(Duration::from_millis(200));
               }
           });
           let schema = Arc::new(Schema::new(vec![
               Field::new("a1", DataType::Utf8, false),
               Field::new("a2", DataType::UInt32, false),
           ]));
   
           let file = File::open(fifo_path).unwrap();
           let reader = StdBufReader::new(file);
   
           let mut read = read_from_csv(reader, schema.clone(), 
TEST_BATCH_SIZE)?;
   
           while let Some(Ok(batch)) = read.next() {
               // If we get a batch, the lock will be false and the experiment 
can finish.
               *waiting.lock().unwrap() = false;
               println!("We get a record batch");
           }
           fifo_writer.join().unwrap();
           Ok(())
       }
   }
   ```
   
   **Expected behavior**
   
   - For reproduced code: Produce the `RecordBatch` and finish.
   - For the algorithm, it should support the producing RecordBatch immediately 
after the necesseray bytes received.
   
   **Additional context**
   NA


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to