Hey all I'm trying to run an Apache Beam pipeline on Flink (hosted on GKE). I'm using a custom container image that was built to run trained TensorFlow models on a series of images, but the base Docker image is apache/beam_python3.7_sdk:2.30.0, with a few libraries installed on top of that.
I'm using the Apache Flink operator <https://github.com/spotify/flink-on-k8s-operator> which is currently being maintained by Spotify. Because the pods are processing a lot of data, we had to significantly increase memory usage of the TaskManagers. However, internally the code is throwing an OOM exception at one point, linked to a gRPC call that is taking too long to complete: *Output channel stalled for 1023s, outbound thread CHAIN MapPartition (MapPartition at [1]PerformInference) -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) -> GroupCombine (GroupCombine at GroupCombine: PerformInferenceAndCombineResults_dep_049/GroupPredictionsByImage) -> Map (Key Extractor) (1/1). See: https://issues.apache.org/jira/browse/BEAM-4280 <https://issues.apache.org/jira/browse/BEAM-4280> for the history for this issue.* *Feb 18, 2022 11:51:05 AM org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyServerTransport notifyTerminatedINFO: Transport failedorg.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 2097152 byte(s) of direct memory (used: 1205862679, max: 1207959552) at org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:754) at org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:709) at org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645) at org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621) at org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204) at org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:188)* I've tried to assign more memory to either the TaskManager or JobManager in Flink, but the error stays the same and the numbers don't change either, which tells me that this is not an issue with Flink itself, but with an underlying process. I've done some googling, but I couldn't find more information. As this isn't anything we touch, but an underlying library, I'm not sure what we can change to solve the issue. Any idea what the problem could be? Thanks. Kr, Bjorn