Thank you for the new example.

# Why is it 2x?

This is essentially a "peak RAM" usage of the operation.  Given that
split_blocks helped I think we can attribute this doubling to the
pandas conversion.

# Why doesn't the memory get returned?

It does, it just doesn't do so immediately.  If I put a 5 second sleep
before I print the memory I see that the RSS shrinks down.  This is
how jemalloc is configured in Arrow (actually I think it is 1 second)
for releasing RSS after reaching peak consumption.

BEFORE mem_size: 0.082276352gb
AFTER: mem_size: 6.68639232gb df_size: 3.281625104gb
AFTER-ARROW: 3.281625024gb
---five second sleep---
AFTER-SLEEP: mem_size: 3.3795072gb df_size: 3.281625104gb
AFTER-SLEEP-ARROW: 3.281625024gb

# Why didn't switching to the system allocator help?

The problem isn't "the dynamic allocator is allocating more than it
needs".  There is a point in this process where ~6GB are actually
needed.  The system allocator either also holds on to that RSS for a
little bit or the RSS numbers themselves take a little bit of time to
update.  I'm not entirely sure.

# Why isn't this a zero-copy conversion to pandas?

That's a good question, I don't know the details.  If I try manually
doing the conversion with zero_copy_only I get the error "Cannot do
zero copy conversion into multi-column DataFrame block"

# What is up with the numpy.ndarray objects in the heap?

I'm pretty sure guppy3 is double-counting.  Note that the total size
is ~20GB.  I've been able to reproduce this in cases where the heap is
3GB and guppy still shows the dataframe taking up 6GB.  In fact, I
once even managed to generate this:

AFTER-SLEEP: mem_size: 3.435835392gb df_size: 3.339197344gb
AFTER-SLEEP-ARROW: 0.0gb
Partition of a set of 212560 objects. Total size = 13328742559 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0     57   0 6563250864  49 6563250864  49 pandas.core.series.Series
     1    133   0 3339213718  25 9902464582  74 numpy.ndarray
     2      1   0 3339197360  25 13241661942  99 pandas.core.frame.DataFrame

The RSS is 3.44GB but guppy reports the dataframe as 13GB.

I did see some strange behavior when working with the
RecordBatchFileReader and I opened ARROW-15017 to resolve this but you
can work around this by deleting the reader.

# Can I return the data immediately / I don't want to use 2x memory consumption

I think split_blocks and self_destruct is the best answer at the
moment.  self_destruct has remained in the code since at least 1.0.0
so perhaps it is time we remove the "experimental" flag and maybe
replace it with a "caution" or "danger" flag (as it causes the table
to become unusable afterwards).

Jemalloc has some manual facilities to purge dirty memory and we
expose some of them with
pyarrow.default_memory_pool().release_unused() but that doesn't seem
to be helping in this situation.  Either the excess memory is in the
non-jemalloc pool or the jemalloc command can't quite release this
memory, or the RSS stats are just stale.  I'm not entirely sure.

On Tue, Dec 7, 2021 at 11:54 AM Arun Joseph <[email protected]> wrote:
>
> Slightly related, I have some other code that opens up an arrow file using a 
> `pyarrow.ipc.RecordBatchFileReader` and then converts RecordBatch to a pandas 
> dataframe. After this conversion is done, and I inspect the heap, I always 
> see the following:
>
> hpy().heap()
> Partition of a set of 351136 objects. Total size = 20112096840 bytes.
>  Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
>      0    121   0 9939601034  49 9939601034  49 numpy.ndarray
>      1      1   0 9939585700  49 19879186734  99 pandas.core.frame.DataFrame
>      2      1   0 185786680   1 20064973414 100 
> pandas.core.indexes.datetimes.DatetimeIndex
>
> Specifically the numpy.ndarray. It only shows up after the conversion and it 
> does not seem to go away. It also seems to be roughly the same size as the 
> dataframe itself.
>
> - Arun
>
> On Tue, Dec 7, 2021 at 10:21 AM Arun Joseph <[email protected]> wrote:
>>
>> Just to follow up on this, is there a way to manually force the arrow pool 
>> to de-allocate? My usecase is essentially having multiple processes in a 
>> Pool or via Slurm read from an arrow file, do some work, and then exit. 
>> Issue is that the 2x memory consumption reduces the bandwidth on the machine 
>> to effectively half.
>>
>> Thank You,
>> Arun
>>
>> On Mon, Dec 6, 2021 at 10:38 AM Arun Joseph <[email protected]> wrote:
>>>
>>> Additionally, I tested with my actual data, and did not see memory savings.
>>>
>>> On Mon, Dec 6, 2021 at 10:35 AM Arun Joseph <[email protected]> wrote:
>>>>
>>>> Hi Joris,
>>>>
>>>> Thank you for the explanation. The 2x memory consumption on conversion 
>>>> makes sense if there is a copy, but it does seem like it persists longer 
>>>> than it should. Might be because of python's GC policies?
>>>> I tried out your recommendations but they did not seem to work. However, I 
>>>> did notice an experimental option on `to_pandas`, `self_destruct`, which 
>>>> seems to address the issue I'm facing. Sadly, that itself did not work 
>>>> either... but, combined with `split_blocks=True`, I am seeing memory 
>>>> savings:
>>>>
>>>> import pandas as pd
>>>> import numpy as np
>>>> import pyarrow as pa
>>>> from pyarrow import feather
>>>> import os
>>>> import psutil
>>>> pa.set_memory_pool(pa.system_memory_pool())
>>>> DATA_FILE = 'test.arrow'
>>>>
>>>> def setup():
>>>>   np.random.seed(0)
>>>>   df = pd.DataFrame(np.random.randint(0,100,size=(7196546, 57)), 
>>>> columns=list([f'{i}' for i in range(57)]))
>>>>   df.to_feather(DATA_FILE)
>>>>   print(f'wrote {DATA_FILE}')
>>>>   import sys
>>>>   sys.exit()
>>>>
>>>> if __name__ == "__main__":
>>>>   # setup()
>>>>   process = psutil.Process(os.getpid())
>>>>   path = DATA_FILE
>>>>
>>>>   mem_size = process.memory_info().rss / 1e9
>>>>   print(f'BEFORE mem_size: {mem_size}gb')
>>>>
>>>>   feather_table = feather.read_table(path)
>>>>   # df = feather_table.to_pandas(split_blocks=True)
>>>>   # df = feather_table.to_pandas()
>>>>   df = feather_table.to_pandas(self_destruct=True, split_blocks=True)
>>>>
>>>>   mem_size = process.memory_info().rss / 1e9
>>>>   df_size = df.memory_usage().sum() / 1e9
>>>>   print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb')
>>>>   print(f'ARROW: {pa.default_memory_pool().bytes_allocated() / 1e9}gb')
>>>>
>>>>
>>>> OUTPUT(to_pandas()):
>>>> BEFORE mem_size: 0.091795456gb
>>>> AFTER mem_size: 6.737887232gb df_size: 3.281625104gb
>>>> ARROW: 3.281625024gb
>>>>
>>>> OUTPUT (to_pandas(split_blocks=True)):
>>>> BEFORE mem_size: 0.091795456gb
>>>> AFTER mem_size: 6.752907264gb df_size: 3.281625104gb
>>>> ARROW: 3.281627712gb
>>>>
>>>> OUTPUT (to_pandas(self_destruct=True, split_blocks=True)):
>>>> BEFORE mem_size: 0.091795456gb
>>>> AFTER mem_size: 4.039512064gb df_size: 3.281625104gb
>>>> ARROW: 3.281627712gb
>>>>
>>>> I'm guessing since this feature is experimental, it might either go away, 
>>>> or might have strange behaviors. Is there anything I should look out for, 
>>>> or is there some alternative to reproduce these results?
>>>>
>>>> Thank You,
>>>> Arun
>>>>
>>>> On Mon, Dec 6, 2021 at 10:07 AM Joris Van den Bossche 
>>>> <[email protected]> wrote:
>>>>>
>>>>> Hi Aron, Weston,
>>>>>
>>>>> I didn't try running the script locally, but a quick note: the
>>>>> `feather.read_feather` function reads the Feather file into an Arrow
>>>>> table ànd directly converts it to a pandas DataFrame. A memory
>>>>> consumption 2x the size of the dataframe sounds not that unexpected to
>>>>> me: most of the time, when converting an arrow table to a pandas
>>>>> DataFrame, the data will be copied to accommodate for pandas' specific
>>>>> internal memory layout (at least numeric columns will be combined
>>>>> together in 2D arrays).
>>>>>
>>>>> To verify if this is the cause, you might want to do either of:
>>>>> - use `feather.read_table` instead of `feather.read_feather`, which
>>>>> will read the file as an Arrow table instead (and don't do any
>>>>> conversion to pandas)
>>>>> - if you want to include the conversion to pandas, also use
>>>>> `read_table` and do the conversion to pandas explicitly with a
>>>>> `to_pandas()` call on the result. In that case, you can specify
>>>>> `split_blocks=True` to use more zero-copy conversion in the
>>>>> arrow->pandas conversion
>>>>>
>>>>> Joris
>>>>>
>>>>> On Mon, 6 Dec 2021 at 15:05, Arun Joseph <[email protected]> wrote:
>>>>> >
>>>>> > Hi Wes,
>>>>> >
>>>>> > Sorry for the late reply on this, but I think I got a reproducible test 
>>>>> > case:
>>>>> >
>>>>> > import pandas as pd
>>>>> > import numpy as np
>>>>> > import pyarrow as pa
>>>>> > from pyarrow import feather
>>>>> > import os
>>>>> > import psutil
>>>>> > pa.set_memory_pool(pa.system_memory_pool())
>>>>> > DATA_FILE = 'test.arrow'
>>>>> >
>>>>> > def setup():
>>>>> >   np.random.seed(0)
>>>>> >   df = pd.DataFrame(np.random.uniform(0,100,size=(7196546, 57)), 
>>>>> > columns=list([f'i_{i}' for i in range(57)]))
>>>>> >   df.to_feather(DATA_FILE)
>>>>> >   print(f'wrote {DATA_FILE}')
>>>>> >   import sys
>>>>> >   sys.exit()
>>>>> >
>>>>> > if __name__ == "__main__":
>>>>> >   # setup()
>>>>> >   process = psutil.Process(os.getpid())
>>>>> >   path = DATA_FILE
>>>>> >
>>>>> >   mem_size = process.memory_info().rss / 1e9
>>>>> >   print(f'BEFORE mem_size: {mem_size}gb')
>>>>> >
>>>>> >   df = feather.read_feather(path)
>>>>> >
>>>>> >   mem_size = process.memory_info().rss / 1e9
>>>>> >   df_size = df.memory_usage().sum() / 1e9
>>>>> >   print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb')
>>>>> >   print(f'ARROW: {pa.default_memory_pool().bytes_allocated() / 1e9}gb')
>>>>> >
>>>>> > OUTPUT:
>>>>> > BEFORE mem_size: 0.091795456gb
>>>>> > AFTER mem_size: 6.762156032gb df_size: 3.281625104gb
>>>>> > ARROW: 3.281625024gb
>>>>> >
>>>>> > Let me know if you're able to see similar results.
>>>>> >
>>>>> > Thanks,
>>>>> > Arun
>>>>> >
>>>>> > On Fri, Dec 3, 2021 at 6:03 PM Weston Pace <[email protected]> 
>>>>> > wrote:
>>>>> >>
>>>>> >> I get more or less the same results as you for the provided setup data
>>>>> >> (exact same #'s for arrow & df_size and slightly different for RSS
>>>>> >> which is to be expected).  The fact that the arrow size is much lower
>>>>> >> than the dataframe size is not too surprising to me.  If a column
>>>>> >> can't be zero copied then it's memory will disappear from the arrow
>>>>> >> pool (I think).  Plus, object columns will have overhead in pandas
>>>>> >> that they do not have in Arrow.
>>>>> >>
>>>>> >> The df_size issue for me seems to be tied to string columns.  I think
>>>>> >> pandas is overestimating how much size is needed there (many of my
>>>>> >> strings are similar and I wonder if some kind of object sharing is
>>>>> >> happening).  But we can table this for another time.
>>>>> >>
>>>>> >> I tried writing my feather file with your parameters and it didn't
>>>>> >> have much impact on any of the numbers.
>>>>> >>
>>>>> >> Since the arrow size for you is expected (nearly the same as the
>>>>> >> df_size) I'm not sure what to investigate next.  The memory does not
>>>>> >> seem to be retained by Arrow.  Is there any chance you could create a
>>>>> >> reproducible test case using randomly generated numpy data (then you
>>>>> >> could share that setup function)?
>>>>> >>
>>>>> >> On Fri, Dec 3, 2021 at 12:13 PM Arun Joseph <[email protected]> wrote:
>>>>> >> >
>>>>> >> > Hi Wes,
>>>>> >> >
>>>>> >> > I'm not including the setup() call when I encounter the issue. I 
>>>>> >> > just kept it in there for ease of reproducibility. Memory usage is 
>>>>> >> > indeed higher when it is included, but that isn't surprising.
>>>>> >> >
>>>>> >> > I tried switching over to the system allocator but there is no 
>>>>> >> > change.
>>>>> >> >
>>>>> >> > I've updated to Arrow 6.0.1 as well and there is no change.
>>>>> >> >
>>>>> >> > I updated my script to also include the Arrow bytes allocated and it 
>>>>> >> > gave me the following:
>>>>> >> >
>>>>> >> > MVE:
>>>>> >> > import pandas as pd
>>>>> >> > import pyarrow as pa
>>>>> >> > from pyarrow import feather
>>>>> >> > import os
>>>>> >> > import psutil
>>>>> >> > pa.set_memory_pool(pa.system_memory_pool())
>>>>> >> >
>>>>> >> >
>>>>> >> > def setup():
>>>>> >> >   df = 
>>>>> >> > pd.read_csv('https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-csv.csv')
>>>>> >> >   df.to_feather('test.csv')
>>>>> >> >
>>>>> >> > if __name__ == "__main__":
>>>>> >> >   # setup()
>>>>> >> >   process = psutil.Process(os.getpid())
>>>>> >> >   path = 'test.csv'
>>>>> >> >
>>>>> >> >   mem_size = process.memory_info().rss / 1e9
>>>>> >> >   print(f'BEFORE mem_size: {mem_size}gb')
>>>>> >> >
>>>>> >> >   df = feather.read_feather(path)
>>>>> >> >
>>>>> >> >   df_size = df.memory_usage(deep=True).sum() / 1e9
>>>>> >> >   mem_size = process.memory_info().rss / 1e10
>>>>> >> >   print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb')
>>>>> >> >   print(f'ARROW: {pa.default_memory_pool().bytes_allocated() / 
>>>>> >> > 1e9}gb')
>>>>> >> >
>>>>> >> > Output with my data:
>>>>> >> > BEFORE mem_size: 0.08761344gb
>>>>> >> > AFTER mem_size: 6.297198592gb df_size: 3.080121688gb
>>>>> >> > ARROW: 3.080121792gb
>>>>> >> >
>>>>> >> > Output with Provided Setup Data:
>>>>> >> > BEFORE mem_size: 0.09179136gb
>>>>> >> > AFTER mem_size: 0.011487232gb df_size: 0.024564664gb
>>>>> >> > ARROW: 0.00029664gb
>>>>> >> >
>>>>> >> > I'm assuming that the df and the arrow bytes allocated/sizes are 
>>>>> >> > distinct and non-overlapping, but it seems strange that the output 
>>>>> >> > with the provided data has the Arrow bytes allocated at ~0GB whereas 
>>>>> >> > the one with my data has the allocated data approximately equal to 
>>>>> >> > the dataframe size. I'm not sure if it affects anything but my file 
>>>>> >> > was written with the following:
>>>>> >> >
>>>>> >> > import pyarrow.lib as ext
>>>>> >> > import pyarrow
>>>>> >> > COMPRESSION_LEVEL = 19
>>>>> >> > COMPRESSION_ALGO = 'zstd'
>>>>> >> > KILOBYTE = 1 << 10
>>>>> >> > MEGABYTE = KILOBYTE * KILOBYTE
>>>>> >> > CHUNK_SIZE = MEGABYTE
>>>>> >> >
>>>>> >> > table = pyarrow.Table.from_pandas(df, preserve_index=preserve_index)
>>>>> >> > ext.write_feather(table, dest, compression=compression, 
>>>>> >> > compression_level=compression_level,chunksize=chunk_size, version=2)
>>>>> >> >
>>>>> >> > As to the discrepancy around calculating dataframe size. I'm not 
>>>>> >> > sure why that would be so off for you. Going off the docs, it seems 
>>>>> >> > like it should be accurate. My Dataframe in question is [7196546 
>>>>> >> > rows x 56 columns] where each column is mostly a float or integer 
>>>>> >> > and datetime index. 7196546 * 56 * 8 = 3224052608 ~= 3.2GB which 
>>>>> >> > roughly aligns.
>>>>> >> >
>>>>> >> > Thank You,
>>>>> >> > Arun
>>>>> >> >
>>>>> >> > On Fri, Dec 3, 2021 at 4:36 PM Weston Pace <[email protected]> 
>>>>> >> > wrote:
>>>>> >> >>
>>>>> >> >> 2x overshoot of memory does seem a little high.  Are you including 
>>>>> >> >> the
>>>>> >> >> "setup" part when you encounter that?  Arrow's file-based CSV reader
>>>>> >> >> will require 2-3x memory usage because it buffers the bytes in 
>>>>> >> >> memory
>>>>> >> >> in case it needs to re-convert them later (because it realizes the
>>>>> >> >> data type for the column is different).  I'm not sure if Panda's CSV
>>>>> >> >> reader is similar.
>>>>> >> >>
>>>>> >> >> Dynamic memory allocators (e.g. jemalloc) can cause Arrow to hold on
>>>>> >> >> to a bit more memory and hold onto it (for a little while at least)
>>>>> >> >> even after it is no longer used.  Even malloc will hold onto memory
>>>>> >> >> sometimes due to fragmentation or other concerns.  You could try
>>>>> >> >> changing to the system allocator
>>>>> >> >> (pa.set_memory_pool(pa.system_memory_pool()) at the top of your 
>>>>> >> >> file)
>>>>> >> >> to see if that makes a difference.
>>>>> >> >>
>>>>> >> >> I'm not sure your method of calculating the dataframe size is
>>>>> >> >> reliable.  I don't actually know enough about pandas but when I 
>>>>> >> >> tried
>>>>> >> >> your experiment with my own 1.9G CSV file it ended up reporting:
>>>>> >> >>
>>>>> >> >> AFTER mem_size: 2.348068864gb df_size: 4.519898461gb
>>>>> >> >>
>>>>> >> >> which seems suspicious.
>>>>> >> >>
>>>>> >> >> Anyways, my tests with my own CSV file (on Arrow 6.0.1) didn't seem
>>>>> >> >> all that unexpected.  There was 2.348GB of usage.  Arrow itself was
>>>>> >> >> only using ~1.9GB and I will naively assume the difference between 
>>>>> >> >> the
>>>>> >> >> two is bloat caused by object wrappers when converting to pandas.
>>>>> >> >>
>>>>> >> >> Another thing you might try and measure is
>>>>> >> >> `pa.default_memory_pool().bytes_allocated()`.  This will tell you 
>>>>> >> >> how
>>>>> >> >> much memory Arrow itself is hanging onto.  If that is not 6GB then 
>>>>> >> >> it
>>>>> >> >> is a pretty good guess that memory is being held somewhere else.
>>>>> >> >>
>>>>> >> >> On Fri, Dec 3, 2021 at 10:54 AM Arun Joseph <[email protected]> 
>>>>> >> >> wrote:
>>>>> >> >> >
>>>>> >> >> > Hi Apache Arrow Members,
>>>>> >> >> >
>>>>> >> >> > My question is below but I've compiled a minimum reproducible 
>>>>> >> >> > example with a public dataset:
>>>>> >> >> >
>>>>> >> >> > import pandas as pd
>>>>> >> >> > from pyarrow import feather
>>>>> >> >> > import os
>>>>> >> >> > import psutil
>>>>> >> >> >
>>>>> >> >> >
>>>>> >> >> > def setup():
>>>>> >> >> >   df = 
>>>>> >> >> > pd.read_csv('https://www.stats.govt.nz/assets/Uploads/Annual-enterprise-survey/Annual-enterprise-survey-2020-financial-year-provisional/Download-data/annual-enterprise-survey-2020-financial-year-provisional-csv.csv')
>>>>> >> >> >   df.to_feather('test.csv')
>>>>> >> >> >
>>>>> >> >> > if __name__ == "__main__":
>>>>> >> >> >   # setup()
>>>>> >> >> >   process = psutil.Process(os.getpid())
>>>>> >> >> >   path = 'test.csv'
>>>>> >> >> >
>>>>> >> >> >   mem_size = process.memory_info().rss / 1e9
>>>>> >> >> >   print(f'BEFORE mem_size: {mem_size}gb')
>>>>> >> >> >
>>>>> >> >> >   df = feather.read_feather(path)
>>>>> >> >> >
>>>>> >> >> >   df_size = df.memory_usage(deep=True).sum() / 1e9
>>>>> >> >> >   mem_size = process.memory_info().rss / 1e9
>>>>> >> >> >   print(f'AFTER mem_size: {mem_size}gb df_size: {df_size}gb')
>>>>> >> >> >
>>>>> >> >> > I substituted my df with a sample csv. I had trouble finding a 
>>>>> >> >> > sample CSV of adequate size however, my dataset is ~3GB, and I 
>>>>> >> >> > see memory usage of close to 6GB.
>>>>> >> >> >
>>>>> >> >> > Output with My Data:
>>>>> >> >> > BEFORE mem_size: 0.088891392gb
>>>>> >> >> > AFTER mem_size: 6.324678656gb df_size: 3.080121688gb
>>>>> >> >> >
>>>>> >> >> > It seems strange that the overall memory usage of the process is 
>>>>> >> >> > approx double of the size of the dataframe itself. Is there a 
>>>>> >> >> > reason for this, and is there a way to mitigate this?
>>>>> >> >> >
>>>>> >> >> > $ conda list pyarrow
>>>>> >> >> > #
>>>>> >> >> > # Name                    Version                   Build  Channel
>>>>> >> >> > pyarrow                   4.0.1           py37h0f64622_13_cpu    
>>>>> >> >> > conda-forge
>>>>> >> >> >
>>>>> >> >> > Thank You,
>>>>> >> >> > Arun Joseph
>>>>> >> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> >
>>>>> >> > --
>>>>> >> > Arun Joseph
>>>>> >> >
>>>>> >
>>>>> >
>>>>> >
>>>>> > --
>>>>> > Arun Joseph
>>>>> >
>>>>
>>>>
>>>>
>>>> --
>>>> Arun Joseph
>>>>
>>>
>>>
>>> --
>>> Arun Joseph
>>>
>>
>>
>> --
>> Arun Joseph
>>
>
>
> --
> Arun Joseph
>

Reply via email to