[ https://issues.apache.org/jira/browse/ARROW-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17623050#comment-17623050 ]
Joris Van den Bossche commented on ARROW-16029: ----------------------------------------------- Yes, I can reproduce this, this is certainly a valid bug report (and thanks for reporting it!) One note about {{pa.Table.from_batches}}: this actually first iterates through all batches (so getting them all in memory), before combining them into a Table (which is OK here since Table is an in-memory structure anyway). So this works a bit differently. The Dataset API is using {{RecordBatcheReader.from_batches}} under the hood instead. But, when using this directly, interrupting it with ctrl-c also seems to work: {code} reader = pa.ipc.RecordBatchReader.from_batches(schema, mygenerator()) reader.read_all() {code} But when this is used in the Datasets API for writing, this RecordBatchReader is wrapped in a {{OneShotFragment}}, which is consumed by an async BackgroundGenerator, which iterates on a background thread. [~westonpace] that might be related to the fact that interupting doesn't work here? > [Python] Runaway process with generator in "write_dataset()" > ------------------------------------------------------------ > > Key: ARROW-16029 > URL: https://issues.apache.org/jira/browse/ARROW-16029 > Project: Apache Arrow > Issue Type: Bug > Components: Python > Reporter: Martin Thøgersen > Priority: Major > > We have a complex containerized data pipeline that keeps running, even if the > main process fails, so we have to stop the containers manually. The issue > boils down to the following: > The method {{pyarrow.dataset.write_dataset()}} accepts an [iterable of > RecordBatch|https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html]. > This means that generators should also work, since [a generator is an > iterator, which is an > iterable.|https://stackoverflow.com/questions/2776829/difference-between-pythons-generators-and-iterators] > The following mininal example can't be stopped with Ctrl-C/KeyboardInterupt > (SIGINT signal 2). We need to run at minimum `killall -3 python` (SIGQUIT) to > close the process. > {code:python} > from time import sleep > import pyarrow as pa > import pyarrow.dataset as ds > def mygenerator(): > i = 0 > while True: > sleep(0.1) > i = i + 1 > print(i) > yield pa.RecordBatch.from_pylist([{"mycol": "myval"}] * 10) > schema = pa.schema([("mycol", pa.string())]) > # Does NOT respect KeyboardInterrupt: > ds.write_dataset(data=mygenerator(), > schema=schema, > base_dir="mydir", > format="parquet", > existing_data_behavior="overwrite_or_ignore", > ) > {code} > In practice the generator is not infinite, but represents a series of API > calls that can't be held in memory. > The following examples shows that generators work well with e.g. > {{{}pa.Table.from_batches(){}}}. So the issue could be limited to the Dataset > API? > {code:python} > # Respects KeyboardInterrupt: > for i in mygenerator(): > pass > {code} > {code:python} > # Respects KeyboardInterrupt: > table = pa.Table.from_batches(mygenerator(), schema) > {code} > OS: Ubuntu 20.04.3 LTS > python: 3.8 > pyarrow: 7.0.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)