Hi,
But gc is not predictable (though python's is more progressive than
that of jvm), the following code fails randomly if `gc.collect` is not
called.
```
# ENV
# Python 3.8.16 (default, Jun 12 2023, 12:55:15)
# [Clang 14.0.6 ] :: Anaconda, Inc. on darwin
# Type "help", "copyright", "credits" or "license" for more information.
def wrap_from_java_stream_to_generator(java_arrow_stream,
allocator=None, yield_schema=False):
if allocator is None:
allocator = get_java_root_allocator().allocator
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
import jpype.imports
org = jpype.JPackage("org")
java_wrapped_stream = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
org.apache.arrow.c.Data.exportArrayStream(allocator,
java_arrow_stream, java_wrapped_stream)
with pa.RecordBatchReader._import_from_c(c_stream_ptr) as reader:
# type: pa.RecordBatchReader
if yield_schema:
yield reader.schema
for record_batch in reader:
yield record_batch
del record_batch # delete reference, make sure no
reference in python
def wrap_from_java_stream(java_arrow_stream, base_allocator=None):
generator = wrap_from_java_stream_to_generator(java_arrow_stream,
base_allocator, yield_schema=True)
schema = next(generator)
return pa.RecordBatchReader.from_batches(schema, generator)
def test_wrap_from_java_stream(tmp_path):
start_jvm()
org = jpype.JPackage("org")
with child_allocator("test-allocator") as allocator:
r =
org.alipay.fair.panama.server_wrapper.utils.InMemoryArrowReader.create(allocator)
with wrap_from_java_stream(r, allocator) as stream:
pa.dataset.write_dataset(stream, "/tmp/aaaaa", format="parquet",
existing_data_behavior="overwrite_or_ignore")
# gc.collect()
```
Any way to ensure when the allocator closed, all references is collected?
Should I gc every time before allocator is closed, or just use one
single root allocator for all purpose?
Antoine Pitrou <[email protected]> 于2023年6月29日周四 21:50写道:
>
>
> Hi,
>
> To answer precisely:
>
> 1) The exported record batch will live as long as the Python RecordBatch
> object is kept alive. If your script keeps the Python RecordBatch object
> alive until the end, then the exported record batch is kept alive until
> the end.
>
> 2) The rest is standard Python semantics. When an object is not
> referenced by the program anymore, reference counting destroys it. For
> an exported record batch, destroying the Python RecordBatch calls the
> record batch's release callback.
>
> Regards
>
> Antoine.
>
>
>
>
>
>
> Le 29/06/2023 à 15:05, Wenbo Hu a écrit :
> > Thanks for your explanation, Antoine.
> >
> > I figured out why I'm facing the memory leak and need to call delete
> > explicit.
> > my example code may mislead the situation. The key problem is when I
> > wrap the code of convert java stream to RecordBatchReader, I generate
> > a child allocator from current context (lives as long as the
> > RecordBatchReader) to call exportArrayStream in a generator, so the
> > consumer/callback always outlives the RecordBatchReader and its
> > underlying allocator (not the allocator of java stream, but that of
> > exportArrayStream).
> >
> > When I specify the allocator of the convert with a longer lives
> > allocator (as long as the consumer/callback), code works as expected.
> >
> > Antoine Pitrou <[email protected]> 于2023年6月29日周四 17:55写道:
> >>
> >>
> >> Le 29/06/2023 à 09:50, Wenbo Hu a écrit :
> >>> Hi,
> >>>
> >>> I'm using Jpype to pass streams between java and python back and forth.
> >>>
> >>> For follow code works fine with its release callback
> >>> ```python
> >>>
> >>> with child_allocator("test-allocator") as allocator:
> >>> r = some_package.InMemoryArrowReader.create(allocator)
> >>> c_stream = arrow_c.new("struct ArrowArrayStream*")
> >>> c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
> >>>
> >>> s = org.apache.arrow.c.ArrowArrayStream.wrap(c_stream_ptr)
> >>> org.apache.arrow.c.Data.exportArrayStream(allocator, r, s)
> >>>
> >>> with pa.RecordBatchReader._import_from_c(c_stream_ptr) as
> >>> stream:
> >>> for rb in stream: # type: pa.RecordBatch
> >>> logger.info(rb.num_rows) # yield weakref.proxy(rb)
> >>> del rb # release callback directly called in
> >>> current thread?
> >>> ```
> >>>
> >>> But if del statment not called, the allocator from java side would
> >>> raise exception that memory leaks.
> >>
> >> That's not surprising. The `rb` variable keeps the record batch and its
> >> backing memory alive. This is the expected semantics. Otherwise,
> >> accessing `rb`'s contents would crash.
> >>
> >>> Also, an warning message output to err
> >>> ```
> >>> WARNING: Failed to release Java C Data resource: Failed to attach the
> >>> current thread to a Java VM
> >>> ```
> >>
> >> That's probably because the release callback is called at process exit,
> >> after the JVM is shutdown?
> >>
> >> > Is yielding a weakref-ed `rb` a good idea? Will the weakref-ed
> >> > RecordBatchReader works with other pyarrow api (dataset)?
> >>
> >> That would probably not solve the problem. Users can trivially get a
> >> strong reference from the weakref, and keep it alive too long.
> >>
> >> Regards
> >>
> >> Antoine.
> >
> >
> >
--
---------------------
Best Regards,
Wenbo Hu,