Hello,
I was wondering if Flink has a size limit to serialize data. I have an object
that stores a big 2D array and when I try to hand it over the next operator, I
have the following error:
```
2024-07-10 10:14:51,983 ERROR
org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING:
Thread 'grpc-default-executor-1' produced an uncaught exception. If you want to
fail on uncaught exceptions, then configure cluster.uncaught-exception-handling
accordingly
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3745) ~[?:?]
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120) ~[?:?]
at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
~[?:?]
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
~[?:?]
at
org.apache.beam.sdk.util.StreamUtils.getBytesWithoutClosing(StreamUtils.java:64)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:101)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at org.apache.beam.sdk.coders.ByteArrayCoder.decode(ByteArrayCoder.java:41)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:819)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:813)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.util.WindowedValue$ParamWindowedValueCoder.decode(WindowedValue.java:737)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:43)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.fn.data.DecodingFnDataReceiver.accept(DecodingFnDataReceiver.java:25)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:68)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.accept(BeamFnDataInboundObserver.java:31)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:144)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer$InboundObserver.onNext(BeamFnDataGrpcMultiplexer.java:130)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:332)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:315)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
org.apache.beam.vendor.grpc.v1p48p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
~[blob_p-ce9fc762fcfc43a2cf28c0d69a738da7efd36b10-09d376712372f30a94bb0be4492bb7a5:1.18.1]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]
at java.lang.Thread.run(Thread.java:829) [?:?]
2024-07-10 10:14:52,006 ERROR
/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py:659
[] - Failed to read inputs in the data plane. Traceback (most recent call
last):
File
"/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py",
line 652, in _read_inputs
for elements in elements_iterator:
File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 543, in
__next__
return self._next()
File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 969, in
_next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that
terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:35379
{created_time:"2024-07-10T10:14:51.986932366+00:00", grpc_status:2,
grpc_message:""}"
>
2024-07-10 10:14:52,007 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - Exception in thread
2024-07-10 10:14:52,007 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - read_grpc_client_inputs
2024-07-10 10:14:52,007 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - :
2024-07-10 10:14:52,007 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - Traceback (most recent call last):
2024-07-10 10:14:52,007 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] -
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - self.run()
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - File "/usr/lib/python3.10/threading.py", line 953, in run
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] -
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - self._target(*self._args, **self._kwargs)
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - File
"/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py",
line 669, in <lambda>
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] -
2024-07-10 10:14:52,009 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - target=lambda: self._read_inputs(elements_iterator),
2024-07-10 10:14:52,010 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - File
"/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/data_plane.py",
line 652, in _read_inputs
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] -
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - for elements in elements_iterator:
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line
543, in __next__
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] -
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - return self._next()
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line
969, in _next
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] -
2024-07-10 10:14:52,011 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - raise self
2024-07-10 10:14:52,012 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - grpc._channel
2024-07-10 10:14:52,012 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - .
2024-07-10 10:14:52,012 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - _MultiThreadedRendezvous
2024-07-10 10:14:52,012 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - :
2024-07-10 10:14:52,012 ERROR
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35
[] - <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = ""
debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:35379
{created_time:"2024-07-10T10:14:51.986932366+00:00", grpc_status:2,
grpc_message:""}"
>
```
I think this error comes from the serialization since I have put a log at the
start of the next operator which is never logged even though the one at the end
of the upstream operator does. Moreover, I have tried to slice the array to see
if the size coud be the issue and I don't have the error when the array is
small. However, the array must be processed as a whole so I can't really split
it... So is there a way to make Flink serialize bigger objects ?
Thanks in advance and best regards
Ky Alexandre