nuno-faria commented on PR #1311:
URL:
https://github.com/apache/datafusion-python/pull/1311#issuecomment-3589863904
> Are you sure this isn't during the first call to `wait_for_future`? We
should get one call that returns very fast in `execute_stream_partitioned` and
then another call during `PartitionedDataFrameStreamReader::next`
I added a small `println!("result: {:?}", result);` to
`PartitionedDataFrameStreamReader::next` to confirm, just after receiving the
result:
```diff
impl Iterator for PartitionedDataFrameStreamReader {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
while self.current < self.streams.len() {
let stream = &mut self.streams[self.current];
let fut = poll_next_batch(stream);
let result = Python::with_gil(|py| wait_for_future(py, fut));
+ println!("result: {:?}", result);
match result {
```
The is the result now in DF51 (also with the previous debug info in
`wait_for_future`), i.e. no `result: `
```
python/tests/test_dataframe.py::test_arrow_c_stream_interrupted thread_id:
35716
x: Ok(())
sent signal to 35716
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
x: Ok(())
thread_id: 35716
...
```
This is the result in DF50 (prints 2992 `result: ` in total, with the last
three being `result: Ok(Ok(None))`, while the others return
`Some(RecordBatch(...))`):
```
python/tests/test_dataframe.py::test_arrow_c_stream_interrupted result:
Ok(Ok(Some(RecordBatch { schema: Schema { fields: [Field { name: "a",
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field {
name: "d", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }, Field { name: "a2", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b2",
data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }, Field { name: "e", data_type: Float64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: "f", data_type: Float64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata:
{} }, columns
: [PrimitiveArray<Int64>
[
1001,
1101,
1201,
1301,
1401,
1501,
1601,
1701,
1801,
1901,
...8172 elements...,
1517,
1617,
1717,
1817,
1917,
6017,
6117,
6217,
6317,
6417,
], StringArray
[
"value_1001",
"value_1101",
"value_1201",
"value_1301",
"value_1401",
"value_1501",
"value_1601",
"value_1701",
"value_1801",
"value_1901",
...8172 elements...,
"value_1517",
"value_1617",
"value_1717",
"value_1817",
"value_1917",
"value_6017",
"value_6117",
"value_6217",
"value_6317",
"value_6417",
], PrimitiveArray<Float64>
...
row_count: 8192 })))
result: Ok(Ok(Some(RecordBatch { schema: Schema { fields: [Field { name:
"a", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }, Field { name: "b", data_type: Utf8, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: "c", data_type: Float64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field {
name: "d", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered:
false, metadata: {} }, Field { name: "a2", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "b2",
data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata:
{} }, Field { name: "e", data_type: Float64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }, Field { name: "f", data_type: Float64,
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata:
{} }, columns: [PrimitiveArray<Int64>
...
+ 5M lines of output before stopping
```
I also removed the interrupt signal in the `test_arrow_c_stream_interrupted`
of the previous version and it returns the same number of batches (2992), so
the interrupt signal is not really stopping the execution. It is, however,
being triggered after the stream ends, since it calls the `except
KeyboardInterrupt`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]