I do generally have more data that can fit into the OS cache and data that increases mostly monotonically. When I previously benchmarked parquet vs arrow, parquet did generally compress better. However, read speeds seemed to be much faster with feather compared to parquet, however I was just blindly going from file to pd.Dataframe, vs slicing into it. I'll take another look.
Thanks, Arun On Wed, Jul 14, 2021 at 5:01 PM Weston Pace <[email protected]> wrote: > > Is there any type of performance difference between > RecordBatchFileReader and RecordBatchStreamReader? Or in other terms, when > should either one be used? > > The performance should usually be similar if you are scanning the entire > file. The file reader allows for random access but, when creating a file, > it is not valid until you have closed the writer. When you close the > writer it writes down a footer which contains indices to all of the > individual batches (this is what gives you the random access). So if your > workflow is to create a file and then later read that file then the file > reader is typically what you want. > > The stream reader is useful in situations where you are never terminating > the file (and maybe never even creating file(s)). For example, if you are > processing a continuously updating stream of batches from a socket (e.g. > Arrow Flight) you will often want to use the stream writer/reader because > there is no "end". Random access doesn't even make sense in such a > scenario. > > > Also, is there a way to construct a pyarrow.TimestampScalar from a > pd.Timestamp? > > `pa.scalar(ts)` is the way to do this but you are right that it is > probably not what you want. There is indeed an overhead and if you can > avoid marshalling an entire column of data to python you probably should. > At this point you would be better served by simply converting the entire > table to a dataframe and doing your math on the dataframe. However, > pyarrow does support some basic compute & filtering operations (see below). > > > I'm trying to create a pd.DataFrame from a subsection of a chunked > feather file/pyarrow table e.g. feather file represents a 24hour time > series and I want to return a pd.DataFrame of some slice of it. > > If I understand correctly then one way you can achieve this is with the > compute functions[1]: > > ``` > import pyarrow as pa > import pyarrow.compute as pc > import pandas as pd > > times = pd.date_range(start='1/1/2018', end='1/1/2019') > times_arr = pa.array(times) > table = pa.Table.from_arrays([times], names=["times"]) > > start = pa.scalar(pd.Timestamp('2018-03-01'), type=pa.timestamp('ns')) > end = pa.scalar(pd.Timestamp('2018-04-01'), type=pa.timestamp('ns')) > > after_start = pc.greater_equal(times_arr, start) > before_end = pc.less_equal(times_arr, end) > in_range = pc.and_(after_start, before_end) > > filtered_table = pc.filter(table, in_range) > print(filtered_table.num_rows) # 32 > > > ``` > > This avoids any marshalling of the times array into python when you are > doing the filtering. Of course, at this point, you may be interested in > the dataset API and filtering: > > ``` > import pyarrow as pa > import pyarrow.dataset as pads > import pyarrow.parquet as pq > import pandas as pd > > times = pd.date_range(start='1/1/2018', end='1/1/2019') > times_arr = pa.array(times) > table = pa.Table.from_arrays([times], names=["times"]) > > pq.write_table(table, '/tmp/some_table.parquet') > > dataset = pads.dataset(['/tmp/some_table.parquet']) > start = pa.scalar(pd.Timestamp('2018-03-01'), type=pa.timestamp('ns')) > end = pa.scalar(pd.Timestamp('2018-04-01'), type=pa.timestamp('ns')) > > after_start = pads.field('times') >= start > before_end = pads.field('times') <= end > in_range = after_start & before_end > > filtered_table = dataset.to_table(filter=in_range) > print(filtered_table.num_rows) # 32 > > > ``` > > If your dates are randomly ordered then you won't see much difference in > performance between the two approaches. On the other hand, if your dates > are monotonically increasing (or even mostly monotonically increasing) I > would expect to see the dataset API approach perform better as it will be > able to skip entire batches. > > Also, if you have a lot of data (more than fits into memory or enough that > you can't reliably expect the data to be in the OS cache) AND your data is > monotonically increasing (or mostly so) then you will want to use parquet > instead of compressed IPC/feather format. Parquet supports row group > statistics so it will store the min/max value for each batch. The dataset > API can take advantage of these statistics to prevent even loading the row > group from disk if your data can be reliably excluded from your filter. > > [1] https://arrow.apache.org/docs/python/compute.html > > On Wed, Jul 14, 2021 at 4:49 AM Arun Joseph <[email protected]> wrote: > >> Thanks for the clarification re: allow_64bit. I was able to get the >> RecordBatchFileReader approach working, but I will probably need to toy >> around with different chunk sizes. Is there any type of performance >> difference between RecordBatchFileReader and RecordBatchStreamReader? Or in >> other terms, when should either one be used? >> >> Also, is there a way to construct a pyarrow.TimestampScalar from a >> pd.Timestamp? The docs [1] don't seem to expose a way to construct it. I >> tried initializing the class directly, but I get the following error: >> `TypeError: Do not call TimestampScalar's constructor directly`, use >> pa.scalar() instead, which I don't think is the right approach for me. I'm >> trying to create a pd.DataFrame from a subsection of a chunked feather >> file/pyarrow table e.g. feather file represents a 24hour time series and I >> want to return a pd.DataFrame of some slice of it. I'm able to construct >> the pd.DataFrame as expected, but I think there might be (correct me if I'm >> wrong) an overhead with converting the pyarrow.TimestampScalar to the >> pandas equivalent (.as_py()), for every row I iterate through. I >> essentially check to see if a chunk is part of the range I want, iterate to >> a start point, and slice the chunk, and do the same for the end, and then >> concat the results. I want to see if there is a tangible difference if I >> just convert my desired start/end times once and then use them to do the >> comparisons to create the slices. >> >> Thanks again for the help so far, it's been very useful and much >> appreciated. >> >> [1] >> https://arrow.apache.org/docs/python/generated/pyarrow.TimestampScalar.html#pyarrow.TimestampScalar.value >> >> On Wed, Jul 14, 2021 at 12:13 AM Weston Pace <[email protected]> >> wrote: >> >>> > Additionally, should I be enabling the allow_64bit bool? I have >>> nanosecond timestamps which would be truncated if it this option acts the >>> way I think it does. >>> >>> Sorry, I missed this question. I don't think you would need to worry >>> about this for nanosecond timestamps. This controls whether arrays are >>> allowed to contain more than 2^31-1 elements. Implementations are allowed >>> to represent lengths as 32 bit signed integers and so even though the >>> C++/python implementation supports 64 bit lengths it may be incompatible >>> which is why this defaults to False. >>> >>> On Tue, Jul 13, 2021 at 2:11 PM Weston Pace <[email protected]> >>> wrote: >>> >>>> Yes, you can reduce your memory footprint. Both the >>>> RecordBatchStreamReader and the RecordBatchFileReader support reading a >>>> table a batch at a time. Compression is applied on a per-batch basis so >>>> there is no need to read the entire file just to decompress it. >>>> >>>> For this to work, the file will need to have been written as multiple >>>> batches in the first place. You can use the >>>> RecordBatchFileWriter/RecordBatchStreamWriter to do this or you can set >>>> `chunksize` when using pyarrow.feather.write_feather. The default chunk >>>> size for write_feather is 64k and most tools that create arrows files will >>>> create reasonable sized chunks by default so this shouldn't be a problem. >>>> >>>> >>>> On Tue, Jul 13, 2021 at 12:06 PM Arun Joseph <[email protected]> wrote: >>>> >>>>> cool, that's good to know. I guess for now I'll just use the older >>>>> method until support is exposed for compression_level. I do have an >>>>> unrelated question: >>>>> >>>>> Is there a way to reduce the memory overhead when loading a compressed >>>>> feather file? I believe right now I decompress the file and then load the >>>>> entire thing into memory. Not sure if chunking is something that is >>>>> applicable here. I've read this article[1] from a couple of years back. >>>>> Would the right approach be to use pyarrow.RecordBatchStreamer to read a >>>>> file that was written with chunks and skip chunks that contain series I >>>>> don't care about? However, would that even reduce the memory footprint if >>>>> the file was compressed in the first place? or is the compression applied >>>>> on a per-chunk basis? >>>>> >>>>> [1] https://wesmckinney.com/blog/arrow-streaming-columnar/ >>>>> >>>>> On Tue, Jul 13, 2021 at 5:26 PM Weston Pace <[email protected]> >>>>> wrote: >>>>> >>>>>> Ah, good catch. Looks like this is missing[1]. The default >>>>>> compression level for zstd is 1. >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/ARROW-13091 >>>>>> >>>>>> On Tue, Jul 13, 2021 at 10:39 AM Arun Joseph <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> The IPC API seems to work for the most part, however is there a way >>>>>>> to specify compression level with IpcWriteOptions? It doesn't seem to be >>>>>>> exposed. I'm currently using zstd, so not sure what level it defaults to >>>>>>> otherwise: >>>>>>> Additionally, should I be enabling the allow_64bit bool? I have >>>>>>> nanosecond timestamps which would be truncated if it this option acts >>>>>>> the >>>>>>> way I think it does. >>>>>>> >>>>>>> ``` >>>>>>> """ >>>>>>> Serialization options for the IPC format. >>>>>>> >>>>>>> Parameters >>>>>>> ---------- >>>>>>> metadata_version : MetadataVersion, default MetadataVersion.V5 >>>>>>> The metadata version to write. V5 is the current and latest, >>>>>>> V4 is the pre-1.0 metadata version (with incompatible Union layout). >>>>>>> allow_64bit: bool, default False >>>>>>> If true, allow field lengths that don't fit in a signed 32-bit int. >>>>>>> use_legacy_format : bool, default False >>>>>>> Whether to use the pre-Arrow 0.15 IPC format. >>>>>>> compression: str or None >>>>>>> If not None, compression codec to use for record batch buffers. >>>>>>> May only be "lz4", "zstd" or None. >>>>>>> use_threads: bool >>>>>>> Whether to use the global CPU thread pool to parallelize any >>>>>>> computational tasks like compression. >>>>>>> emit_dictionary_deltas: bool >>>>>>> Whether to emit dictionary deltas. Default is false for maximum >>>>>>> stream compatibility. >>>>>>> """ >>>>>>> >>>>>>> >>>>>>> On Tue, Jul 13, 2021 at 2:41 PM Weston Pace <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I can't speak to the intent. Adding a feather.write_table version >>>>>>>> (equivalent to feather.read_table) seems like it would be >>>>>>>> reasonable. >>>>>>>> >>>>>>>> > Is the best way around this to do the following? >>>>>>>> >>>>>>>> What you have written does not work for me. This slightly different >>>>>>>> version does: >>>>>>>> >>>>>>>> ```python3 >>>>>>>> import pyarrow as pa >>>>>>>> import pyarrow._feather as _feather >>>>>>>> >>>>>>>> table = pa.Table.from_pandas(df) >>>>>>>> _feather.write_feather(table, '/tmp/foo.feather', >>>>>>>> compression=compression, >>>>>>>> compression_level=compression_level, >>>>>>>> chunksize=chunksize, version=version) >>>>>>>> ``` >>>>>>>> >>>>>>>> I'm not sure it's a great practice to be relying on pyarrow._feather >>>>>>>> though as it is meant to be internal and subject to change without >>>>>>>> much consideration. >>>>>>>> >>>>>>>> You might want to consider using the newer IPC API which should be >>>>>>>> equivalent (write_feather is indirectly using a >>>>>>>> RecordBatchFileWriter >>>>>>>> under the hood although it is buried in the C++[1]). A complete >>>>>>>> example: >>>>>>>> >>>>>>>> ```python3 >>>>>>>> import pandas as pd >>>>>>>> import pyarrow as pa >>>>>>>> import pyarrow.ipc >>>>>>>> >>>>>>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) >>>>>>>> compression = None >>>>>>>> >>>>>>>> options = pyarrow.ipc.IpcWriteOptions() >>>>>>>> options.compression = compression >>>>>>>> writer = pyarrow.ipc.RecordBatchFileWriter('/tmp/foo2.feather', >>>>>>>> schema=table.schema, options=options) >>>>>>>> writer.write_table(table) >>>>>>>> writer.close() >>>>>>>> ``` >>>>>>>> >>>>>>>> If you need chunks it is slightly more work: >>>>>>>> >>>>>>>> ```python3 >>>>>>>> options = pyarrow.ipc.IpcWriteOptions() >>>>>>>> options.compression = compression >>>>>>>> writer = pyarrow.ipc.RecordBatchFileWriter('/tmp/foo3.feather', >>>>>>>> schema=table.schema, options=options) >>>>>>>> batches = table.to_batches(chunksize) >>>>>>>> for batch in batches: >>>>>>>> writer.write_batch(batch) >>>>>>>> writer.close() >>>>>>>> ``` >>>>>>>> >>>>>>>> All three versions should be readable by >>>>>>>> pyarrow.feather.read_feather >>>>>>>> and should yield the exact same dataframe. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/feather.cc#L796 >>>>>>>> >>>>>>>> On Tue, Jul 13, 2021 at 7:06 AM Arun Joseph <[email protected]> >>>>>>>> wrote: >>>>>>>> > >>>>>>>> > Hi, >>>>>>>> > >>>>>>>> > I've noticed that if I pass a pandas dataframe to write_feather >>>>>>>> (hyperlink to relevant part of code), it will automatically drop the >>>>>>>> index. >>>>>>>> Was this behavior intentionally chosen to only drop the index and not >>>>>>>> to >>>>>>>> allow the user to specify? I assumed the behavior would match the >>>>>>>> default >>>>>>>> behavior of converting from a pandas dataframe to an arrow table as >>>>>>>> mentioned in the docs. >>>>>>>> > >>>>>>>> > Is the best way around this to do the following? >>>>>>>> > >>>>>>>> > ```python3 >>>>>>>> > import pyarrow.lib as ext >>>>>>>> > from pyarrow.lib import Table >>>>>>>> > >>>>>>>> > table = Table.from_pandas(df) >>>>>>>> > ext.write_feather(table, dest, >>>>>>>> > compression=compression, >>>>>>>> compression_level=compression_level, >>>>>>>> > chunksize=chunksize, version=version) >>>>>>>> > ``` >>>>>>>> > Thank You, >>>>>>>> > -- >>>>>>>> > Arun Joseph >>>>>>>> > >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Arun Joseph >>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Arun Joseph >>>>> >>>>> >> >> -- >> Arun Joseph >> >> -- Arun Joseph
