This turns out to be a very common problem (landing incremental updates, dealing with compaction and small files). It's part of the reason that systems like Apache Kudu were developed, e.g.
https://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/ If you have to use file storage, then figuring out a scheme to compact Parquet files (e.g. once per hour, once per day) will definitely be worth it compared with using a slower file format (like Avro) - Wes On Wed, Dec 19, 2018 at 7:37 AM Joel Pfaff <joel.pf...@gmail.com> wrote: > > Hello, > > For my company's usecases, we have found that the number of files was a > critical part of the time spent doing the execution plan, so we found the > idea of very regularly writing small parquet files to be rather inefficient. > > There are some formats that support an `append` semantic (I have tested > successfully with avro, but there are a couple others that could be used > similarly). > So we had a few cases where we were aggregating data in a `current table` > in set of avro files, and rewriting all of it in few parquet files at the > end of the day. > This allowed us to have files that have been prepared to optimize their > querying performance (file size, row group size, sorting per column) by > maximizing the ability to benefit from the statistics. > And our queries were doing an UNION between "optimized for speed" history > tables and "optimized for latency" current tables, when the query timeframe > was crossing the boundaries of the current day. > > Regards, Joel > > On Wed, Dec 19, 2018 at 2:14 PM Francois Saint-Jacques < > fsaintjacq...@networkdump.com> wrote: > > > Hello Darren, > > > > what Uwe suggests is usually the way to go, your active process writes to a > > new file every time. Then you have a parallel process/thread that does > > compaction of smaller files in the background such that you don't have too > > many files. > > > > On Wed, Dec 19, 2018 at 7:59 AM Uwe L. Korn <uw...@xhochy.com> wrote: > > > > > Hello Darren, > > > > > > you're out of luck here. Parquet files are immutable and meant for batch > > > writes. Once they're written you cannot modify them anymore. To load > > them, > > > you need to know their metadata which is in the footer. The footer is > > > always at the end of the file and written once you call close. > > > > > > Your use case is normally fulfilled by continously starting new files and > > > reading them back in using the ParquetDataset class > > > > > > Cheers > > > Uwe > > > > > > Am 18.12.2018 um 21:03 schrieb Darren Gallagher <daz...@gmail.com>: > > > > > > >> [Cross posted from https://github.com/apache/arrow/issues/3203] > > > >> > > > >> I'm adding new data to a parquet file every 60 seconds using this > > code: > > > >> > > > >> import os > > > >> import json > > > >> import time > > > >> import requests > > > >> import pandas as pd > > > >> import numpy as np > > > >> import pyarrow as pa > > > >> import pyarrow.parquet as pq > > > >> > > > >> api_url = 'https://opensky-network.org/api/states/all' > > > >> > > > >> cols = ['icao24', 'callsign', 'origin', 'time_position', > > > >> 'last_contact', 'longitude', 'latitude', > > > >> 'baro_altitude', 'on_ground', 'velocity', 'true_track', > > > >> 'vertical_rate', 'sensors', 'geo_altitude', 'squawk', > > > >> 'spi', 'position_source'] > > > >> > > > >> def get_new_flight_info(writer): > > > >> print("Requesting new data") > > > >> req = requests.get(api_url) > > > >> content = req.json() > > > >> > > > >> states = content['states'] > > > >> df = pd.DataFrame(states, columns = cols) > > > >> df['timestamp'] = content['time'] > > > >> print("Found {} new items".format(len(df))) > > > >> > > > >> table = pa.Table.from_pandas(df) > > > >> if writer is None: > > > >> writer = pq.ParquetWriter('openskyflights.parquet', > > table.schema) > > > >> writer.write_table(table=table) > > > >> return writer > > > >> > > > >> if __name__ == '__main__': > > > >> writer = None > > > >> while (not os.path.exists('opensky.STOP')): > > > >> writer = get_new_flight_info(writer) > > > >> time.sleep(60) > > > >> > > > >> if writer: > > > >> writer.close() > > > >> > > > >> This is working fine and the file grows every 60 seconds. > > > >> However unless I force the loop to exit I am unable to use the parquet > > > >> file. In a separate terminal I try to access the parquet file using > > this > > > >> code: > > > >> > > > >> import pandas as pd > > > >> import pyarrow.parquet as pq > > > >> > > > >> table = pq.read_table("openskyflights.parquet") > > > >> df = table.to_pandas() > > > >> print(len(df)) > > > >> > > > >> which results in this error: > > > >> > > > >> Traceback (most recent call last): > > > >> File "checkdownloadsize.py", line 7, in <module> > > > >> table = pq.read_table("openskyflights.parquet") > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 1074, in read_table > > > >> use_pandas_metadata=use_pandas_metadata) > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/filesystem.py", > > > line 182, in read_parquet > > > >> filesystem=self) > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 882, in __init__ > > > >> self.validate_schemas() > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 895, in validate_schemas > > > >> self.schema = self.pieces[0].get_metadata(open_file).schema > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 453, in get_metadata > > > >> return self._open(open_file_func).metadata > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 459, in _open > > > >> reader = open_file_func(self.path) > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 984, in open_file > > > >> common_metadata=self.common_metadata) > > > >> File > > > > > "/home/xxxx/.local/share/virtualenvs/opensky-WcPvsoLj/lib/python3.5/site-packages/pyarrow/parquet.py", > > > line 102, in __init__ > > > >> self.reader.open(source, metadata=metadata) > > > >> File "pyarrow/_parquet.pyx", line 639, in > > > pyarrow._parquet.ParquetReader.open > > > >> File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status > > > >> pyarrow.lib.ArrowIOError: Invalid parquet file. Corrupt footer. > > > >> > > > >> Is there a way to achieve this? > > > >> I'm assuming that if I call writer.close() in the while loop then it > > > will > > > >> prevent any further data being written to the file? Is there some kind > > > of > > > >> "flush" operation that can be used to ensure all data is written to > > disk > > > >> and available to other processes or threads that want to read the > > data? > > > >> > > > >> Thanks > > > >> > > > > > > > > > > -- > > Sent from my jetpack. > >