[ 
https://issues.apache.org/jira/browse/ARROW-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17623050#comment-17623050
 ] 

Joris Van den Bossche commented on ARROW-16029:
-----------------------------------------------

Yes, I can reproduce this, this is certainly a valid bug report (and thanks for 
reporting it!)

One note about {{pa.Table.from_batches}}: this actually first iterates through 
all batches (so getting them all in memory), before combining them into a Table 
(which is OK here since Table is an in-memory structure anyway). So this works 
a bit differently. 
The Dataset API is using {{RecordBatcheReader.from_batches}} under the hood 
instead. But, when using this directly, interrupting it with ctrl-c also seems 
to work:

{code} 
reader = pa.ipc.RecordBatchReader.from_batches(schema, mygenerator())
reader.read_all()
{code}


But when this is used in the Datasets API for writing, this RecordBatchReader 
is wrapped in a {{OneShotFragment}}, which is consumed by an async 
BackgroundGenerator, which iterates on a background thread. [~westonpace] that 
might be related to the fact that interupting doesn't work here?

> [Python] Runaway process with generator in "write_dataset()"
> ------------------------------------------------------------
>
>                 Key: ARROW-16029
>                 URL: https://issues.apache.org/jira/browse/ARROW-16029
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Martin Thøgersen
>            Priority: Major
>
> We have a complex containerized data pipeline that keeps running, even if the 
> main process fails, so we have to stop the containers manually. The issue 
> boils down to the following:
> The method {{pyarrow.dataset.write_dataset()}} accepts an [iterable of 
> RecordBatch|https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html].
> This means that generators should also work, since [a generator is an 
> iterator, which is an 
> iterable.|https://stackoverflow.com/questions/2776829/difference-between-pythons-generators-and-iterators]
> The following mininal example can't be stopped with Ctrl-C/KeyboardInterupt 
> (SIGINT signal 2). We need to run at minimum `killall -3 python` (SIGQUIT) to 
> close the process.
> {code:python}
> from time import sleep
> import pyarrow as pa
> import pyarrow.dataset as ds
> def mygenerator():
>     i = 0
>     while True:
>         sleep(0.1)
>         i = i + 1
>         print(i)
>         yield pa.RecordBatch.from_pylist([{"mycol": "myval"}] * 10)
> schema = pa.schema([("mycol", pa.string())])
> # Does NOT respect KeyboardInterrupt:
> ds.write_dataset(data=mygenerator(),
>                  schema=schema,
>                  base_dir="mydir",
>                  format="parquet",
>                  existing_data_behavior="overwrite_or_ignore",
>                  )
> {code}
> In practice the generator is not infinite, but represents a series of API 
> calls that can't be held in memory.
> The following examples shows that generators work well with e.g. 
> {{{}pa.Table.from_batches(){}}}. So the issue could be limited to the Dataset 
> API?
> {code:python}
> # Respects KeyboardInterrupt:
> for i in mygenerator():
>     pass
> {code}
> {code:python}
> # Respects KeyboardInterrupt:
> table = pa.Table.from_batches(mygenerator(), schema)
> {code}
> OS: Ubuntu 20.04.3 LTS
> python: 3.8
> pyarrow: 7.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to