Hello Darren,

what Uwe suggests is usually the way to go, your active process writes to a
new file every time. Then you have a parallel process/thread that does
compaction of smaller files in the background such that you don't have too
many files.

On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uw...@xhochy.com> wrote:

> Hello Darren,
>
> you're out of luck here. Parquet files are immutable and meant for batch
> writes. Once they're written you cannot modify them anymore. To load them,
> you need to know their metadata which is in the footer. The footer is
> always at the end of the file and written once you call close.
>
> Your use case is normally fulfilled by continously starting new files and
> reading them back in using the ParquetDataset class
>
> Cheers
> Uwe
>
> Am 18.12.2018 um 21:03 schrieb Darren Gallagher <daz...@gmail.com>:
>
> >> [Cross posted from https://github.com/apache/arrow/issues/3203]
> >>
> >> I'm adding new data to a parquet file every 60 seconds using this code:
> >>
> >> import os
> >> import json
> >> import time
> >> import requests
> >> import pandas as pd
> >> import numpy as np
> >> import pyarrow as pa
> >> import pyarrow.parquet as pq
> >>
> >> api_url = 'https://opensky-network.org/api/states/all'
> >>
> >> cols = ['icao24', 'callsign', 'origin', 'time_position',
> >>        'last_contact', 'longitude', 'latitude',
> >>        'baro_altitude', 'on_ground', 'velocity', 'true_track',
> >>        'vertical_rate', 'sensors', 'geo_altitude', 'squawk',
> >>        'spi', 'position_source']
> >>
> >> def get_new_flight_info(writer):
> >>    print("Requesting new data")
> >>    req = requests.get(api_url)
> >>    content = req.json()
> >>
> >>    states = content['states']
> >>    df = pd.DataFrame(states, columns = cols)
> >>    df['timestamp'] = content['time']
> >>    print("Found {} new items".format(len(df)))
> >>
> >>    table = pa.Table.from_pandas(df)
> >>    if writer is None:
> >>        writer = pq.ParquetWriter('openskyflights.parquet', table.schema)
> >>    writer.write_table(table=table)
> >>    return writer
> >>
> >> if __name__ == '__main__':
> >>    writer = None
> >>    while (not os.path.exists('opensky.STOP')):
> >>        writer = get_new_flight_info(writer)
> >>        time.sleep(60)
> >>
> >>    if writer:
> >>        writer.close()
> >>
> >> This is working fine and the file grows every 60 seconds.
> >> However unless I force the loop to exit I am unable to use the parquet
> >> file. In a separate terminal I try to access the parquet file using this
> >> code:
> >>
> >> import pandas as pd
> >> import pyarrow.parquet as pq
> >>
> >> table = pq.read_table("openskyflights.parquet")
> >> df = table.to_pandas()
> >> print(len(df))
> >>
> >> which results in this error:
> >>
> >> Traceback (most recent call last):
> >>  File "checkdownloadsize.py", line 7, in <module>
> >>    table = pq.read_table("openskyflights.parquet")
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 1074, in read_table
> >>    use_pandas_metadata=use_pandas_metadata)
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py",
> line 182, in read_parquet
> >>    filesystem=self)
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 882, in __init__
> >>    self.validate_schemas()
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 895, in validate_schemas
> >>    self.schema = self.pieces[0].get_metadata(open_file).schema
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 453, in get_metadata
> >>    return self._open(open_file_func).metadata
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 459, in _open
> >>    reader = open_file_func(self.path)
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 984, in open_file
> >>    common_metadata=self.common_metadata)
> >>  File
> "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py",
> line 102, in __init__
> >>    self.reader.open(source, metadata=metadata)
> >>  File "pyarrow/_parquet.pyx", line 639, in
> pyarrow._parquet.ParquetReader.open
> >>  File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status
> >> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer.
> >>
> >> Is there a way to achieve this?
> >> I'm assuming that if I call writer.close() in the while loop then it
> will
> >> prevent any further data being written to the file? Is there some kind
> of
> >> "flush" operation that can be used to ensure all data is written to disk
> >> and available to other processes or threads that want to read the data?
> >>
> >> Thanks
> >>
>
>

-- 
Sent from my jetpack.

Reply via email to