Hi Kevin,

Is it possible to provide a simple example to reproduce this issue?

PS: It will use pickle to perform the serialization/deserialization if you
don't specify the type info.

Regards,
Dian


On Mon, Mar 22, 2021 at 10:55 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Kevin,
>
> yes I understood that, but then your Python class contains a Row field,
> where no mapping exists. I'm assuming PyFlink tries to do a deep conversion
> and fails to do so by ending in some infinite loop.
>
> On Mon, Mar 22, 2021 at 3:48 PM Kevin Lam <kevin....@shopify.com> wrote:
>
>> Thanks for the response Arvid! Point of clarification, *things do NOT
>> OOM when I use the Row subclass*. Instead, the code that doesn't use the
>> Row subclass is the code that OOMs (ie. the simple python class).
>>
>>
>>
>> On Mon, Mar 22, 2021 at 10:24 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Kevin,
>>>
>>> I suspect that this is because Row is not supported as a Python field
>>> [1]; it's supposed to be a dict that is mapped to a Row by Flink.
>>> Maybe it runs in some infinite loop while trying serialize and hence the
>>> OOM.
>>>
>>> Subclassing Row might be an undocumented feature.
>>>
>>> I'm also pulling in Dian who knows more about PyFlink.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/datastream-api-users-guide/data_types.html
>>>
>>> On Fri, Mar 19, 2021 at 3:55 PM Kevin Lam <kevin....@shopify.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I've encountered an interesting issue where I observe an OOM issue in
>>>> my Flink Application when I use a DataStream of Python Objects, but when I
>>>> make that Python Object a Subclass of pyflink.common.types.Row and provide
>>>> TypeInformation, there is no issue.
>>>>
>>>> For the OOM scenario, no elements get processed, the application runs
>>>> without printing output and then eventually crashes with 
>>>> java.lang.OutOfMemoryError:
>>>> Java heap space
>>>>
>>>> Any insights into why this might be happening? Appreciate any
>>>> help/suggestions.
>>>> I've included some code that illustrates the two situations below [0].
>>>>
>>>> Thanks in advance!
>>>>
>>>> [0]:
>>>>
>>>> Code Snippet A: OOM scenario
>>>>
>>>> class InputWrapper:
>>>> """Helper class, used to make streams of the same type"""
>>>>
>>>> def __init__(self, key: str, contents: Row = None):
>>>> self.key = key
>>>> self.contents = contents
>>>>
>>>> x_ds = x_ds.map(
>>>> lambda d: InputWrapper(key=d['key'], contents=d))
>>>> y_ds = y_ds.map(
>>>> lambda o: InputWrapper(key=o['key'], contents=o))
>>>> union = x_ds.union(y_ds)
>>>> union.print()
>>>>
>>>> Code Snippet B: Working scenario:
>>>>
>>>> class InputWrapper(Row):
>>>> """Helper class, used to make streams of the same type"""
>>>>
>>>> def __init__(self, key: str, contents: Row = None):
>>>> super().__init__(key, contents)
>>>>
>>>> x_ds = x_ds.map(
>>>> lambda d: InputWrapper(key=d['key'], contents=d),
>>>> output_type=InputWrapperTypeInfo())
>>>> y_ds = y_ds.map(
>>>> lambda o: InputWrapper(key=o['key'], contents=o),
>>>> output_type=InputWrapperTypeInfo())
>>>> union = x_ds.union(y_ds)
>>>> union.print()
>>>>
>>>>
>>>>

Reply via email to