Hey all

I'm trying to run an Apache Beam pipeline on Flink (hosted on GKE).  I'm
using a custom container image that was built to run trained TensorFlow
models on a series of images, but the base Docker image is
apache/beam_python3.7_sdk:2.30.0, with a few libraries installed on top of
that.

I'm using the Apache Flink operator
<https://github.com/spotify/flink-on-k8s-operator> which is currently being
maintained by Spotify.  Because the pods are processing a lot of data, we
had to significantly increase memory usage of the TaskManagers.  However,
internally the code is throwing an OOM exception at one point, linked to a
gRPC call that is taking too long to complete:

*Output channel stalled for 1023s, outbound thread CHAIN MapPartition
(MapPartition at [1]PerformInference) -> FlatMap (FlatMap at
ExtractOutput[0]) -> Map (Key Extractor) -> GroupCombine (GroupCombine at
GroupCombine:
PerformInferenceAndCombineResults_dep_049/GroupPredictionsByImage) -> Map
(Key Extractor) (1/1). See: https://issues.apache.org/jira/browse/BEAM-4280
<https://issues.apache.org/jira/browse/BEAM-4280> for the history for this
issue.*








*Feb 18, 2022 11:51:05 AM
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyServerTransport
notifyTerminatedINFO: Transport
failedorg.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.OutOfDirectMemoryError:
failed to allocate 2097152 byte(s) of direct memory (used: 1205862679, max:
1207959552) at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:754)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:709)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:645)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:621)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:204)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:188)*

I've tried to assign more memory to either the TaskManager or JobManager in
Flink, but the error stays the same and the numbers don't change either,
which tells me that this is not an issue with Flink itself, but with an
underlying process.

I've done some googling, but I couldn't find more information.  As this
isn't anything we touch, but an underlying library, I'm not sure what we
can change to solve the issue.

Any idea what the problem could be?

Thanks.

Kr,

Bjorn

Reply via email to