[ 
https://issues.apache.org/jira/browse/ARROW-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17623330#comment-17623330
 ] 

Weston Pace commented on ARROW-16029:
-------------------------------------

{quote}
Weston Pace that might be related to the fact that interupting doesn't work 
here?
{quote}

Yes, that seems very likely.  We don't have cancellation support in Acero today 
(though it shouldn't be too hard to add).  Just to be clear though, there are 
still two problems right?

1. You cannot cancel the write dataset operation
2. The generator should wrap up and finish but the write dataset operation is 
hanging indefinitely

Also, it seems this was filed some time ago.  Very sorry for missing the first 
ping.

> [Python] Runaway process with generator in "write_dataset()"
> ------------------------------------------------------------
>
>                 Key: ARROW-16029
>                 URL: https://issues.apache.org/jira/browse/ARROW-16029
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Python
>            Reporter: Martin Thøgersen
>            Priority: Critical
>             Fix For: 11.0.0
>
>
> We have a complex containerized data pipeline that keeps running, even if the 
> main process fails, so we have to stop the containers manually. The issue 
> boils down to the following:
> The method {{pyarrow.dataset.write_dataset()}} accepts an [iterable of 
> RecordBatch|https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html].
> This means that generators should also work, since [a generator is an 
> iterator, which is an 
> iterable.|https://stackoverflow.com/questions/2776829/difference-between-pythons-generators-and-iterators]
> The following mininal example can't be stopped with Ctrl-C/KeyboardInterupt 
> (SIGINT signal 2). We need to run at minimum `killall -3 python` (SIGQUIT) to 
> close the process.
> {code:python}
> from time import sleep
> import pyarrow as pa
> import pyarrow.dataset as ds
> def mygenerator():
>     i = 0
>     while True:
>         sleep(0.1)
>         i = i + 1
>         print(i)
>         yield pa.RecordBatch.from_pylist([{"mycol": "myval"}] * 10)
> schema = pa.schema([("mycol", pa.string())])
> # Does NOT respect KeyboardInterrupt:
> ds.write_dataset(data=mygenerator(),
>                  schema=schema,
>                  base_dir="mydir",
>                  format="parquet",
>                  existing_data_behavior="overwrite_or_ignore",
>                  )
> {code}
> In practice the generator is not infinite, but represents a series of API 
> calls that can't be held in memory.
> The following examples shows that generators work well with e.g. 
> {{{}pa.Table.from_batches(){}}}. So the issue could be limited to the Dataset 
> API?
> {code:python}
> # Respects KeyboardInterrupt:
> for i in mygenerator():
>     pass
> {code}
> {code:python}
> # Respects KeyboardInterrupt:
> table = pa.Table.from_batches(mygenerator(), schema)
> {code}
> OS: Ubuntu 20.04.3 LTS
> python: 3.8
> pyarrow: 7.0.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to