Hello Darren, what Uwe suggests is usually the way to go, your active process writes to a new file every time. Then you have a parallel process/thread that does compaction of smaller files in the background such that you don't have too many files.
On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <[email protected]> wrote: > Hello Darren, > > you're out of luck here. Parquet files are immutable and meant for batch > writes. Once they're written you cannot modify them anymore. To load them, > you need to know their metadata which is in the footer. The footer is > always at the end of the file and written once you call close. > > Your use case is normally fulfilled by continously starting new files and > reading them back in using the ParquetDataset class > > Cheers > Uwe > > Am 18.12.2018 um 21:03 schrieb Darren Gallagher <[email protected]>: > > >> [Cross posted from https://github.com/apache/arrow/issues/3203] > >> > >> I'm adding new data to a parquet file every 60 seconds using this code: > >> > >> import os > >> import json > >> import time > >> import requests > >> import pandas as pd > >> import numpy as np > >> import pyarrow as pa > >> import pyarrow.parquet as pq > >> > >> api_url = 'https://opensky-network.org/api/states/all' > >> > >> cols = ['icao24', 'callsign', 'origin', 'time_position', > >> 'last_contact', 'longitude', 'latitude', > >> 'baro_altitude', 'on_ground', 'velocity', 'true_track', > >> 'vertical_rate', 'sensors', 'geo_altitude', 'squawk', > >> 'spi', 'position_source'] > >> > >> def get_new_flight_info(writer): > >> print("Requesting new data") > >> req = requests.get(api_url) > >> content = req.json() > >> > >> states = content['states'] > >> df = pd.DataFrame(states, columns = cols) > >> df['timestamp'] = content['time'] > >> print("Found {} new items".format(len(df))) > >> > >> table = pa.Table.from_pandas(df) > >> if writer is None: > >> writer = pq.ParquetWriter('openskyflights.parquet', table.schema) > >> writer.write_table(table=table) > >> return writer > >> > >> if __name__ == '__main__': > >> writer = None > >> while (not os.path.exists('opensky.STOP')): > >> writer = get_new_flight_info(writer) > >> time.sleep(60) > >> > >> if writer: > >> writer.close() > >> > >> This is working fine and the file grows every 60 seconds. > >> However unless I force the loop to exit I am unable to use the parquet > >> file. In a separate terminal I try to access the parquet file using this > >> code: > >> > >> import pandas as pd > >> import pyarrow.parquet as pq > >> > >> table = pq.read_table("openskyflights.parquet") > >> df = table.to_pandas() > >> print(len(df)) > >> > >> which results in this error: > >> > >> Traceback (most recent call last): > >> File "checkdownloadsize.py", line 7, in <module> > >> table = pq.read_table("openskyflights.parquet") > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 1074, in read_table > >> use_pandas_metadata=use_pandas_metadata) > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", > line 182, in read_parquet > >> filesystem=self) > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 882, in __init__ > >> self.validate_schemas() > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 895, in validate_schemas > >> self.schema = self.pieces[0].get_metadata(open_file).schema > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 453, in get_metadata > >> return self._open(open_file_func).metadata > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 459, in _open > >> reader = open_file_func(self.path) > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 984, in open_file > >> common_metadata=self.common_metadata) > >> File > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > line 102, in __init__ > >> self.reader.open(source, metadata=metadata) > >> File "pyarrow/_parquet.pyx", line 639, in > pyarrow._parquet.ParquetReader.open > >> File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > >> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer. > >> > >> Is there a way to achieve this? > >> I'm assuming that if I call writer.close() in the while loop then it > will > >> prevent any further data being written to the file? Is there some kind > of > >> "flush" operation that can be used to ensure all data is written to disk > >> and available to other processes or threads that want to read the data? > >> > >> Thanks > >> > > -- Sent from my jetpack.
