Hi,

I'm trying to see if we have been given enough resources (i.e. CPU and
memory) to each task node to perform a deduplication job. Currently, the
job is not running very stable. What I have been observing is that after a
couple of days run, we will suddenly see backpressure happen on one
arbitrary ec2 instance in the cluster and when that happens, we will have
to give up the current state and restart the job with an empty state. We
can no longer take savepoint as it would timeout after 10 minutes, which is
understandable.

Additional Observations

When the backpressure happens, we see an increase in our state read time
(we are measuring it using a custom metric) from about 0.1 milliseconds to
40-60 milliseconds on that specific problematic ec2 instance. We tried to
reboot that ec2 instance, so that the corresponding tasks would be assigned
to a different ec2 instance, but the problem persists.

However, I’m not sure if this read time increase is a symptom or the cause
of the problem.

Background about this deduplication job:

We are making sessionization with deduplication on an event stream by a
session key that is embedded in the event. The throughput of the input
stream is around 50k records per second. The after-aggregation output is
around 8k records per second.

We are currently using RocksDb-backend state with SSD support and in the
state, we are storing session keys with a TTL of 1 week. Based on the
current throughput, this could become really huge. I assume RocksDB would
flush to the disc as needed, but please correct me if I am wrong.

Information about the cluster:

I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). I'm
using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job
running in the Yarn session.

Questions:

1. Currently, I'm starting the yarn session w/ 7g memory on both the Task
Manager and and the Job Manager, so that each Yarn container could get 1
CPU. Is this setting reasonable based on your experience?

Here is the command I used to start the Yarn cluster:

export HADOOP_CLASSPATH=`hadoop classpath` &&
/usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached

2. Is there a scientific way to tell what's the right amount of resources I
should give to an arbitrary job? Or is this a try and see kinda process?

3. Right now, I'm suspecting resources caused the job to run unstably, but
I'm not quite sure. Any other potential causes here? How should I debug
from here if resources are not the issue? Is there a way to detect memory
leaks?

Thanks in advance!

Thomas

Reply via email to