I do generally have more data that can fit into the OS cache and data that
increases mostly monotonically. When I previously benchmarked parquet vs
arrow, parquet did generally compress better. However, read speeds seemed
to be much faster with feather compared to parquet, however I was just
blindly going from file to pd.Dataframe, vs slicing into it. I'll take
another look.

Thanks,
Arun

On Wed, Jul 14, 2021 at 5:01 PM Weston Pace <[email protected]> wrote:

> > Is there any type of performance difference between
> RecordBatchFileReader and RecordBatchStreamReader? Or in other terms, when
> should either one be used?
>
> The performance should usually be similar if you are scanning the entire
> file.  The file reader allows for random access but, when creating a file,
> it is not valid until you have closed the writer.  When you close the
> writer it writes down a footer which contains indices to all of the
> individual batches (this is what gives you the random access).  So if your
> workflow is to create a file and then later read that file then the file
> reader is typically what you want.
>
> The stream reader is useful in situations where you are never terminating
> the file (and maybe never even creating file(s)).  For example, if you are
> processing a continuously updating stream of batches from a socket (e.g.
> Arrow Flight) you will often want to use the stream writer/reader because
> there is no "end".  Random access doesn't even make sense in such a
> scenario.
>
> > Also, is there a way to construct a pyarrow.TimestampScalar from a
> pd.Timestamp?
>
> `pa.scalar(ts)` is the way to do this but you are right that it is
> probably not what you want.  There is indeed an overhead and if you can
> avoid marshalling an entire column of data to python you probably should.
> At this point you would be better served by simply converting the entire
> table to a dataframe and doing your math on the dataframe.  However,
> pyarrow does support some basic compute & filtering operations (see below).
>
> > I'm trying to create a pd.DataFrame from a subsection of a chunked
> feather file/pyarrow table e.g. feather file represents a 24hour time
> series and I want to return a pd.DataFrame of some slice of it.
>
> If I understand correctly then one way you can achieve this is with the
> compute functions[1]:
>
> ```
> import pyarrow as pa
> import pyarrow.compute as pc
> import pandas as pd
>
> times = pd.date_range(start='1/1/2018', end='1/1/2019')
> times_arr = pa.array(times)
> table = pa.Table.from_arrays([times], names=["times"])
>
> start = pa.scalar(pd.Timestamp('2018-03-01'), type=pa.timestamp('ns'))
> end = pa.scalar(pd.Timestamp('2018-04-01'), type=pa.timestamp('ns'))
>
> after_start = pc.greater_equal(times_arr, start)
> before_end = pc.less_equal(times_arr, end)
> in_range = pc.and_(after_start, before_end)
>
> filtered_table = pc.filter(table, in_range)
> print(filtered_table.num_rows) # 32
>
>
> ```
>
> This avoids any marshalling of the times array into python when you are
> doing the filtering.  Of course, at this point, you may be interested in
> the dataset API and filtering:
>
> ```
> import pyarrow as pa
> import pyarrow.dataset as pads
> import pyarrow.parquet as pq
> import pandas as pd
>
> times = pd.date_range(start='1/1/2018', end='1/1/2019')
> times_arr = pa.array(times)
> table = pa.Table.from_arrays([times], names=["times"])
>
> pq.write_table(table, '/tmp/some_table.parquet')
>
> dataset = pads.dataset(['/tmp/some_table.parquet'])
> start = pa.scalar(pd.Timestamp('2018-03-01'), type=pa.timestamp('ns'))
> end = pa.scalar(pd.Timestamp('2018-04-01'), type=pa.timestamp('ns'))
>
> after_start = pads.field('times') >= start
> before_end = pads.field('times') <= end
> in_range = after_start & before_end
>
> filtered_table = dataset.to_table(filter=in_range)
> print(filtered_table.num_rows) # 32
>
>
> ```
>
> If your dates are randomly ordered then you won't see much difference in
> performance between the two approaches.  On the other hand, if your dates
> are monotonically increasing (or even mostly monotonically increasing) I
> would expect to see the dataset API approach perform better as it will be
> able to skip entire batches.
>
> Also, if you have a lot of data (more than fits into memory or enough that
> you can't reliably expect the data to be in the OS cache) AND your data is
> monotonically increasing (or mostly so) then you will want to use parquet
> instead of compressed IPC/feather format.  Parquet supports row group
> statistics so it will store the min/max value for each batch.  The dataset
> API can take advantage of these statistics to prevent even loading the row
> group from disk if your data can be reliably excluded from your filter.
>
> [1] https://arrow.apache.org/docs/python/compute.html
>
> On Wed, Jul 14, 2021 at 4:49 AM Arun Joseph <[email protected]> wrote:
>
>> Thanks for the clarification re: allow_64bit. I was able to get the
>> RecordBatchFileReader approach working, but I will probably need to toy
>> around with different chunk sizes. Is there any type of performance
>> difference between RecordBatchFileReader and RecordBatchStreamReader? Or in
>> other terms, when should either one be used?
>>
>> Also, is there a way to construct a pyarrow.TimestampScalar from a
>> pd.Timestamp? The docs [1] don't seem to expose a way to construct it. I
>> tried initializing the class directly, but I get the following error:
>> `TypeError: Do not call TimestampScalar's constructor directly`, use
>> pa.scalar() instead, which I don't think is the right approach for me. I'm
>> trying to create a pd.DataFrame from a subsection of a chunked feather
>> file/pyarrow table e.g. feather file represents a 24hour time series and I
>> want to return a pd.DataFrame of some slice of it. I'm able to construct
>> the pd.DataFrame as expected, but I think there might be (correct me if I'm
>> wrong) an overhead with converting the pyarrow.TimestampScalar to the
>> pandas equivalent (.as_py()), for every row I iterate through. I
>> essentially check to see if a chunk is part of the range I want, iterate to
>> a start point, and slice the chunk, and do the same for the end, and then
>> concat the results. I want to see if there is a tangible difference if I
>> just convert my desired start/end times once and then use them to do the
>> comparisons to create the slices.
>>
>> Thanks again for the help so far, it's been very useful and much
>> appreciated.
>>
>> [1]
>> https://arrow.apache.org/docs/python/generated/pyarrow.TimestampScalar.html#pyarrow.TimestampScalar.value
>>
>> On Wed, Jul 14, 2021 at 12:13 AM Weston Pace <[email protected]>
>> wrote:
>>
>>> > Additionally, should I be enabling the allow_64bit bool? I have
>>> nanosecond timestamps which would be truncated if it this option acts the
>>> way I think it does.
>>>
>>> Sorry, I missed this question.  I don't think you would need to worry
>>> about this for nanosecond timestamps.  This controls whether arrays are
>>> allowed to contain more than 2^31-1 elements.  Implementations are allowed
>>> to represent lengths as 32 bit signed integers and so even though the
>>> C++/python implementation supports 64 bit lengths it may be incompatible
>>> which is why this defaults to False.
>>>
>>> On Tue, Jul 13, 2021 at 2:11 PM Weston Pace <[email protected]>
>>> wrote:
>>>
>>>> Yes, you can reduce your memory footprint.  Both the
>>>> RecordBatchStreamReader and the RecordBatchFileReader support reading a
>>>> table a batch at a time.  Compression is applied on a per-batch basis so
>>>> there is no need to read the entire file just to decompress it.
>>>>
>>>> For this to work, the file will need to have been written as multiple
>>>> batches in the first place.  You can use the
>>>> RecordBatchFileWriter/RecordBatchStreamWriter to do this or you can set
>>>> `chunksize` when using pyarrow.feather.write_feather.  The default chunk
>>>> size for write_feather is 64k and most tools that create arrows files will
>>>> create reasonable sized chunks by default so this shouldn't be a problem.
>>>>
>>>>
>>>> On Tue, Jul 13, 2021 at 12:06 PM Arun Joseph <[email protected]> wrote:
>>>>
>>>>> cool, that's good to know. I guess for now I'll just use the older
>>>>> method until support is exposed for compression_level. I do have an
>>>>> unrelated question:
>>>>>
>>>>> Is there a way to reduce the memory overhead when loading a compressed
>>>>> feather file? I believe right now I decompress the file and then load the
>>>>> entire thing into memory. Not sure if chunking is something that is
>>>>> applicable here. I've read this article[1] from a couple of years back.
>>>>> Would the right approach be to use pyarrow.RecordBatchStreamer to read a
>>>>> file that was written with chunks and skip chunks that contain series I
>>>>> don't care about? However, would that even reduce the memory footprint if
>>>>> the file was compressed in the first place? or is the compression applied
>>>>> on a per-chunk basis?
>>>>>
>>>>> [1] https://wesmckinney.com/blog/arrow-streaming-columnar/
>>>>>
>>>>> On Tue, Jul 13, 2021 at 5:26 PM Weston Pace <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Ah, good catch.  Looks like this is missing[1].  The default
>>>>>> compression level for zstd is 1.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/ARROW-13091
>>>>>>
>>>>>> On Tue, Jul 13, 2021 at 10:39 AM Arun Joseph <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> The IPC API seems to work for the most part, however is there a way
>>>>>>> to specify compression level with IpcWriteOptions? It doesn't seem to be
>>>>>>> exposed. I'm currently using zstd, so not sure what level it defaults to
>>>>>>> otherwise:
>>>>>>> Additionally, should I be enabling the allow_64bit bool? I have
>>>>>>> nanosecond timestamps which would be truncated if it this option acts 
>>>>>>> the
>>>>>>> way I think it does.
>>>>>>>
>>>>>>> ```
>>>>>>> """
>>>>>>> Serialization options for the IPC format.
>>>>>>>
>>>>>>> Parameters
>>>>>>> ----------
>>>>>>> metadata_version : MetadataVersion, default MetadataVersion.V5
>>>>>>> The metadata version to write. V5 is the current and latest,
>>>>>>> V4 is the pre-1.0 metadata version (with incompatible Union layout).
>>>>>>> allow_64bit: bool, default False
>>>>>>> If true, allow field lengths that don't fit in a signed 32-bit int.
>>>>>>> use_legacy_format : bool, default False
>>>>>>> Whether to use the pre-Arrow 0.15 IPC format.
>>>>>>> compression: str or None
>>>>>>> If not None, compression codec to use for record batch buffers.
>>>>>>> May only be "lz4", "zstd" or None.
>>>>>>> use_threads: bool
>>>>>>> Whether to use the global CPU thread pool to parallelize any
>>>>>>> computational tasks like compression.
>>>>>>> emit_dictionary_deltas: bool
>>>>>>> Whether to emit dictionary deltas. Default is false for maximum
>>>>>>> stream compatibility.
>>>>>>> """
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jul 13, 2021 at 2:41 PM Weston Pace <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I can't speak to the intent.  Adding a feather.write_table version
>>>>>>>> (equivalent to feather.read_table) seems like it would be
>>>>>>>> reasonable.
>>>>>>>>
>>>>>>>> > Is the best way around this to do the following?
>>>>>>>>
>>>>>>>> What you have written does not work for me.  This slightly different
>>>>>>>> version does:
>>>>>>>>
>>>>>>>> ```python3
>>>>>>>> import pyarrow as pa
>>>>>>>> import pyarrow._feather as _feather
>>>>>>>>
>>>>>>>> table = pa.Table.from_pandas(df)
>>>>>>>> _feather.write_feather(table, '/tmp/foo.feather',
>>>>>>>>                          compression=compression,
>>>>>>>> compression_level=compression_level,
>>>>>>>>                          chunksize=chunksize, version=version)
>>>>>>>> ```
>>>>>>>>
>>>>>>>> I'm not sure it's a great practice to be relying on pyarrow._feather
>>>>>>>> though as it is meant to be internal and subject to change without
>>>>>>>> much consideration.
>>>>>>>>
>>>>>>>> You might want to consider using the newer IPC API which should be
>>>>>>>> equivalent (write_feather is indirectly using a
>>>>>>>> RecordBatchFileWriter
>>>>>>>> under the hood although it is buried in the C++[1]).  A complete
>>>>>>>> example:
>>>>>>>>
>>>>>>>> ```python3
>>>>>>>> import pandas as pd
>>>>>>>> import pyarrow as pa
>>>>>>>> import pyarrow.ipc
>>>>>>>>
>>>>>>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>>>>>>> compression = None
>>>>>>>>
>>>>>>>> options = pyarrow.ipc.IpcWriteOptions()
>>>>>>>> options.compression = compression
>>>>>>>> writer = pyarrow.ipc.RecordBatchFileWriter('/tmp/foo2.feather',
>>>>>>>> schema=table.schema, options=options)
>>>>>>>> writer.write_table(table)
>>>>>>>> writer.close()
>>>>>>>> ```
>>>>>>>>
>>>>>>>> If you need chunks it is slightly more work:
>>>>>>>>
>>>>>>>> ```python3
>>>>>>>> options = pyarrow.ipc.IpcWriteOptions()
>>>>>>>> options.compression = compression
>>>>>>>> writer = pyarrow.ipc.RecordBatchFileWriter('/tmp/foo3.feather',
>>>>>>>> schema=table.schema, options=options)
>>>>>>>> batches = table.to_batches(chunksize)
>>>>>>>> for batch in batches:
>>>>>>>>     writer.write_batch(batch)
>>>>>>>> writer.close()
>>>>>>>> ```
>>>>>>>>
>>>>>>>> All three versions should be readable by
>>>>>>>> pyarrow.feather.read_feather
>>>>>>>> and should yield the exact same dataframe.
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/arrow/blob/81ff679c47754692224f655dab32cc0936bb5f55/cpp/src/arrow/ipc/feather.cc#L796
>>>>>>>>
>>>>>>>> On Tue, Jul 13, 2021 at 7:06 AM Arun Joseph <[email protected]>
>>>>>>>> wrote:
>>>>>>>> >
>>>>>>>> > Hi,
>>>>>>>> >
>>>>>>>> > I've noticed that if I pass a pandas dataframe to write_feather
>>>>>>>> (hyperlink to relevant part of code), it will automatically drop the 
>>>>>>>> index.
>>>>>>>> Was this behavior intentionally chosen to only drop the index and not 
>>>>>>>> to
>>>>>>>> allow the user to specify? I assumed the behavior would match the 
>>>>>>>> default
>>>>>>>> behavior of converting from a pandas dataframe to an arrow table as
>>>>>>>> mentioned in the docs.
>>>>>>>> >
>>>>>>>> > Is the best way around this to do the following?
>>>>>>>> >
>>>>>>>> > ```python3
>>>>>>>> > import pyarrow.lib as ext
>>>>>>>> > from pyarrow.lib import Table
>>>>>>>> >
>>>>>>>> > table = Table.from_pandas(df)
>>>>>>>> > ext.write_feather(table, dest,
>>>>>>>> >                          compression=compression,
>>>>>>>> compression_level=compression_level,
>>>>>>>> >                          chunksize=chunksize, version=version)
>>>>>>>> > ```
>>>>>>>> > Thank You,
>>>>>>>> > --
>>>>>>>> > Arun Joseph
>>>>>>>> >
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Arun Joseph
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>> Arun Joseph
>>>>>
>>>>>
>>
>> --
>> Arun Joseph
>>
>>

-- 
Arun Joseph

Reply via email to