Hi all,

I've encountered an interesting issue where I observe an OOM issue in my
Flink Application when I use a DataStream of Python Objects, but when I
make that Python Object a Subclass of pyflink.common.types.Row and provide
TypeInformation, there is no issue.

For the OOM scenario, no elements get processed, the application runs
without printing output and then eventually crashes with
java.lang.OutOfMemoryError:
Java heap space

Any insights into why this might be happening? Appreciate any
help/suggestions.
I've included some code that illustrates the two situations below [0].

Thanks in advance!

[0]:

Code Snippet A: OOM scenario

class InputWrapper:
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
self.key = key
self.contents = contents

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d))
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o))
union = x_ds.union(y_ds)
union.print()

Code Snippet B: Working scenario:

class InputWrapper(Row):
"""Helper class, used to make streams of the same type"""

def __init__(self, key: str, contents: Row = None):
super().__init__(key, contents)

x_ds = x_ds.map(
lambda d: InputWrapper(key=d['key'], contents=d),
output_type=InputWrapperTypeInfo())
y_ds = y_ds.map(
lambda o: InputWrapper(key=o['key'], contents=o),
output_type=InputWrapperTypeInfo())
union = x_ds.union(y_ds)
union.print()

Reply via email to