HagaiHargil opened a new issue #646:
URL: https://github.com/apache/arrow-rs/issues/646


   **Describe the bug**
   When using `ipc::StreamReader`, if by any chance the stream buffer empties 
and then re-fills, the current `StreamReader` will not detect the new data that 
was added.
   
   **To Reproduce**
   ```rust
   fn test_intermittent_writing() {
       // Generate mock data
       let schema = Schema::new(vec![
           Field::new("id", DataType::Int32, false)
       ]);
       let second_schema = schema.clone();
   
       std::thread::spawn(|| 
start_writer_and_write(String::from("test_test.d"), second_schema));
       // Let the writer write something
       std::thread::sleep(std::time::Duration::from_secs(1));
       let mut stream = 
StreamReader::try_new(File::open("test_test.d").unwrap()).unwrap();
       let mut idx = 0;
       // Start looping and reading the stream. The first loop detects and 
prints
       // out the array. No other loops detect anything, even though after
       // several seconds new data appears in the stream.
       while idx < 12 {
           match stream.next() {
               Some(x) => println!("{:?}", x),
               None => println!("None"),
           }
           idx += 1;
           std::thread::sleep(std::time::Duration::from_secs(1));
       }
       println!("Stopped loop, checking whether the data is there");
       let mut stream = 
StreamReader::try_new(File::open("test_test.d").unwrap()).unwrap();
       println!("{:?}", stream.next().unwrap());  // prints the Batch
       println!("{:?}", stream.next().unwrap());  // also prints the second 
Batch
   }
   
   fn start_writer_and_write(stream_name: String, schema: Schema) {
       let stream = File::create(stream_name).unwrap();
       let mut stream = StreamWriter::try_new(stream, &schema).unwrap();
       let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
       let batch = RecordBatch::try_new(
           Arc::new(schema.clone()),
           vec![Arc::new(id_array)]
       ).unwrap();
       stream.write(&batch).unwrap();
       std::thread::sleep(std::time::Duration::from_secs(10));
       stream.write(&batch).unwrap();
       println!("I wrote it, goodbye");
   }
   ```
   
   **Expected behavior**
   From my understanding, as long as the `StreamReader` is open new data should 
be detected even if the stream was empty for some time.
   
   **Additional context**
   I noticed this behavior when writing to a stream from `pyarrow`, but the 
code above shows that a Rust-to-Rust version of this issue also exists.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to