Anakin100100 commented on issue #49474:
URL: https://github.com/apache/arrow/issues/49474#issuecomment-4085405993

   I've investigated this deeper and it's become clear that there is no memory 
leak here. I initially managed to reproduce this. Afterwards I looked at what 
was allocated in the RSS and most of it is mimalloc pages that are cached 
rather than being returned to the os https://github.com/microsoft/mimalloc. If 
you want to run this in HPC environment you can do it with forcing mimalloc to 
return the memory when it is released by arrow by setting
   ```bash
    export MIMALLOC_PURGE_DELAY=0
   ```
   I modified your example slightly to demonstrate this. 
   ```python
   import gc
   import os
   import tempfile
   import numpy as np
   import psutil
   import pyarrow as pa
   import pyarrow.parquet as pq
   import pyarrow.dataset as pad
   import pyarrow.compute as pc
   
   
   BATCH_SIZE = 131_072
   N_BATCHES  = 200
   
   
   def get_rss_mb():
       return psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
   
   def get_arrow_mb():
       return pa.total_allocated_bytes() / 1024 / 1024
   
   
   def generate_dataset(base_path: str, n_rows: int = 30_000_000):
       rng = np.random.default_rng(42)
       for a in [1, 2]:
           for b in [1, 2]:
               part_dir = os.path.join(base_path, f"A={a}", f"B={b}")
               os.makedirs(part_dir, exist_ok=True)
               pq.write_table(
                   pa.table({"C": 
rng.standard_normal(n_rows).astype(np.float32)}),
                   os.path.join(part_dir, "data.parquet"),
                   row_group_size=BATCH_SIZE,
               )
   
   
   def run(dataset_path: str, img_label: str):
       dataset = pad.dataset(dataset_path, format="parquet", 
partitioning="hive")
       scanner = dataset.scanner(
           filter=(pc.field("A") == pc.scalar(1)) & (pc.field("B") == 
pc.scalar(1)),
           columns=["C"],
           batch_size=BATCH_SIZE,
           fragment_readahead=0,
       )
   
       gc.collect()
       rss_log, arrow_log = [], []
   
       for i, batch in enumerate(scanner.to_batches()):
           if i >= N_BATCHES:
               break
           _ = batch.column("C")
           pa.default_memory_pool().release_unused() 
           gc.collect()
           rss_log.append(get_rss_mb())
           arrow_log.append(get_arrow_mb())
   
   if __name__ == "__main__":
       pa.set_memory_pool(pa.system_memory_pool())
       for i in range(3):
           pa.default_memory_pool().release_unused() 
           gc.collect()
           print(f"iter {i}")
           print(f"before rss: {get_rss_mb()} mb, arrow: {get_arrow_mb()} mb")
           with tempfile.TemporaryDirectory() as tmp:
               generate_dataset(tmp)
               run(tmp, "memleak_minimal")
               pa.default_memory_pool().release_unused() 
               gc.collect()
               print(f"after rss: {get_rss_mb()} mb, arrow: {get_arrow_mb()} 
mb")
   ``` 
   
   And here is the result
   ```bash
   (pyarrow-dev) pawel-biegun@pawel-biegun-AORUS-15-9KF:~/pyarrow-dev$ python 
minimal_working_example.py
   iter 0
   before rss: 130.4765625 mb, arrow: 0.0 mb
   after rss: 159.55859375 mb, arrow: 0.0 mb
   iter 1
   before rss: 159.55859375 mb, arrow: 0.0 mb
   after rss: 158.0625 mb, arrow: 0.0 mb
   iter 2
   before rss: 158.0625 mb, arrow: 0.0 mb
   after rss: 159.26171875 mb, arrow: 0.0 mb
   ```
   
   This shows that there is no memory leak here. 
   
   @pitrou what would be a good place in the docs to document this? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to