[ 
https://issues.apache.org/jira/browse/ARROW-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shouheng Yi updated ARROW-10417:
--------------------------------
    Description: 
There might be a memory leak in the {{RecordBatchStreamWriter}}. The memory 
resources were not released. It always hit the memory limit and started doing 
virtual memory swapping. See the picture below:

!Screen Shot 2020-10-28 at 9.43.32 PM.png!

This was the code:
{code:python}
import tempfile
import os
import sys

import pyarrow as pa

B = 1
KB = 1024 * B
MB = 1024 * KB

schema = pa.schema(
    [
        pa.field("a_string", pa.string()),
        pa.field("an_int", pa.int32()),
        pa.field("a_float", pa.float32()),
        pa.field("a_list_of_floats", pa.list_(pa.float32())),
    ]
)

nrows_in_a_batch = 1000
nbatches_in_a_table = 1000

column_arrays = [
    ["string"] * nrows_in_a_batch,
    [123] * nrows_in_a_batch,
    [456.789] * nrows_in_a_batch,
    [range(1000)] * nrows_in_a_batch,
]

def main(sys_args) -> None:
    batch = pa.RecordBatch.from_arrays(column_arrays, schema=schema)
    table = pa.Table.from_batches([batch] * nbatches_in_a_table, schema=schema)

    with tempfile.TemporaryDirectory() as tmpdir:
        filename_template = "file-{n}.arror"
        i = 0

        while True:
            path = os.path.join(tmpdir, filename_template.format(n=i))
            i += 1

            with pa.OSFile(path, "w") as sink:
                with pa.RecordBatchStreamWriter(sink, schema) as writer:
                    writer.write_table(table)
                    print(f"pa.total_allocated_bytes(): 
{pa.total_allocated_bytes() / MB} mb")

if __name__ == "__main__":
    main(sys.argv[1:])
{code}
Strangely enough, using {{total_allocated_bytes}}, it seemed normal.
{code:python}
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
{code}

Am I using {{RecordBatchStreamWriter}} incorrectly? If not, how can I release 
the resources?

[Updates 10/29/2020]

I changed {{RecordBatchStreamWriter}} to {{RecordBatchFileWriter}} in my code, 
i.e.:

{code:python}
...
            with pa.OSFile(path, "w") as sink:
                with pa.RecordBatchFileWriter(sink, schema) as writer:
                    writer.write_table(table)
                    print(f"pa.total_allocated_bytes(): 
{pa.total_allocated_bytes() / MB} mb")
...
{code}

I observed the same memory profile. I'm wondering if it is caused by 
[WriteRecordBatch 
|https://github.com/apache/arrow/blob/maint-0.15.x/cpp/src/arrow/ipc/writer.cc#L594]
 not being able to release memory.




  was:
There might be a memory leak in the {{RecordBatchStreamWriter}}. The memory 
resources were not released. It always hit the memory limit and started doing 
virtual memory swapping. See the picture below:

!Screen Shot 2020-10-28 at 9.43.32 PM.png!

This was the code:
{code:python}
import tempfile
import os
import sys

import pyarrow as pa

B = 1
KB = 1024 * B
MB = 1024 * KB

schema = pa.schema(
    [
        pa.field("a_string", pa.string()),
        pa.field("an_int", pa.int32()),
        pa.field("a_float", pa.float32()),
        pa.field("a_list_of_floats", pa.list_(pa.float32())),
    ]
)

nrows_in_a_batch = 1000
nbatches_in_a_table = 1000

column_arrays = [
    ["string"] * nrows_in_a_batch,
    [123] * nrows_in_a_batch,
    [456.789] * nrows_in_a_batch,
    [range(1000)] * nrows_in_a_batch,
]

def main(sys_args) -> None:
    batch = pa.RecordBatch.from_arrays(column_arrays, schema=schema)
    table = pa.Table.from_batches([batch] * nbatches_in_a_table, schema=schema)

    with tempfile.TemporaryDirectory() as tmpdir:
        filename_template = "file-{n}.arror"
        i = 0

        while True:
            path = os.path.join(tmpdir, filename_template.format(n=i))
            i += 1

            with pa.OSFile(path, "w") as sink:
                with pa.RecordBatchStreamWriter(sink, schema) as writer:
                    writer.write_table(table)
                    print(f"pa.total_allocated_bytes(): 
{pa.total_allocated_bytes() / MB} mb")

if __name__ == "__main__":
    main(sys.argv[1:])
{code}
Strangely enough, using {{total_allocated_bytes}}, it seemed normal.
{code:python}
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
pa.total_allocated_bytes(): 3.95556640625 mb
{code}

Am I using {{RecordBatchStreamWriter}} incorrectly? If not, how can I release 
the resources?

[Updates 10/29/2020]

I changed {{}}



Thank you.



> [Python][C++] Possible Memory Leak in RecordBatchStreamWriter
> -------------------------------------------------------------
>
>                 Key: ARROW-10417
>                 URL: https://issues.apache.org/jira/browse/ARROW-10417
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: C++, Python
>    Affects Versions: 0.15.1
>         Environment: This is the config for my worker node:
> resources:
>     - cpus: 1
>     - maxMemoryMb: 4096
>     - reservedMemoryMb: 2048
>            Reporter: Shouheng Yi
>            Priority: Major
>             Fix For: 3.0.0, 2.0.1
>
>         Attachments: Screen Shot 2020-10-28 at 9.43.32 PM.png, Screen Shot 
> 2020-10-28 at 9.43.40 PM.png
>
>
> There might be a memory leak in the {{RecordBatchStreamWriter}}. The memory 
> resources were not released. It always hit the memory limit and started doing 
> virtual memory swapping. See the picture below:
> !Screen Shot 2020-10-28 at 9.43.32 PM.png!
> This was the code:
> {code:python}
> import tempfile
> import os
> import sys
> import pyarrow as pa
> B = 1
> KB = 1024 * B
> MB = 1024 * KB
> schema = pa.schema(
>     [
>         pa.field("a_string", pa.string()),
>         pa.field("an_int", pa.int32()),
>         pa.field("a_float", pa.float32()),
>         pa.field("a_list_of_floats", pa.list_(pa.float32())),
>     ]
> )
> nrows_in_a_batch = 1000
> nbatches_in_a_table = 1000
> column_arrays = [
>     ["string"] * nrows_in_a_batch,
>     [123] * nrows_in_a_batch,
>     [456.789] * nrows_in_a_batch,
>     [range(1000)] * nrows_in_a_batch,
> ]
> def main(sys_args) -> None:
>     batch = pa.RecordBatch.from_arrays(column_arrays, schema=schema)
>     table = pa.Table.from_batches([batch] * nbatches_in_a_table, 
> schema=schema)
>     with tempfile.TemporaryDirectory() as tmpdir:
>         filename_template = "file-{n}.arror"
>         i = 0
>         while True:
>             path = os.path.join(tmpdir, filename_template.format(n=i))
>             i += 1
>             with pa.OSFile(path, "w") as sink:
>                 with pa.RecordBatchStreamWriter(sink, schema) as writer:
>                     writer.write_table(table)
>                     print(f"pa.total_allocated_bytes(): 
> {pa.total_allocated_bytes() / MB} mb")
> if __name__ == "__main__":
>     main(sys.argv[1:])
> {code}
> Strangely enough, using {{total_allocated_bytes}}, it seemed normal.
> {code:python}
> pa.total_allocated_bytes(): 3.95556640625 mb
> pa.total_allocated_bytes(): 3.95556640625 mb
> pa.total_allocated_bytes(): 3.95556640625 mb
> pa.total_allocated_bytes(): 3.95556640625 mb
> pa.total_allocated_bytes(): 3.95556640625 mb
> {code}
> Am I using {{RecordBatchStreamWriter}} incorrectly? If not, how can I release 
> the resources?
> [Updates 10/29/2020]
> I changed {{RecordBatchStreamWriter}} to {{RecordBatchFileWriter}} in my 
> code, i.e.:
> {code:python}
> ...
>             with pa.OSFile(path, "w") as sink:
>                 with pa.RecordBatchFileWriter(sink, schema) as writer:
>                     writer.write_table(table)
>                     print(f"pa.total_allocated_bytes(): 
> {pa.total_allocated_bytes() / MB} mb")
> ...
> {code}
> I observed the same memory profile. I'm wondering if it is caused by 
> [WriteRecordBatch 
> |https://github.com/apache/arrow/blob/maint-0.15.x/cpp/src/arrow/ipc/writer.cc#L594]
>  not being able to release memory.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to