Thanks for the reply.
I want to discuss more on points (1) and (2)
If we take care of them rest will be good
Coming to (1)
Please try to give reasonable checkpoint interval time for every job.
Minum checkpoint interval recommended by flink community is 3 minutes
I thin you should give minimum 3 minutes checkpoint interval for all
Coming to (2)
What's your input data rate?
For example you are seeing data at 100 msg/sec, For each message if
there is state changing and you are updating the state with RocksDB,
it's going to
create 100 rows in 1 second at RocksDb end, On the average if 50
records have changed each second, even if you are using RocksDB
differentialstate = true,
there is no use. Because everytime 50% is new rows getting added. So
the best bet is to update records with RocksDB only once in your
checkpoint interval.
Suppose your checkpoint interval is 5 minutes. If you update RocksDB
state once in 5 minutes, then the rate at which new records added to
RocksDB will be 1 record/5min.
Whereas in your original scenario, 30000 records added to rocksDB in 5
min. You can save 1:30000 ratio of records in addition to RocksDB.
Which will save a huge
redundant size addition to RocksDB. Ultimately your state is driven
by your checkpoint interval. From the input source you will go back 5
min back and read the state, similarly from RocksDB side
also you can have a state update once in 5 min should work. Otherwise
even if you add state there is no use.
Regards
Bhaskar
Try to update your RocksDB state in an interval equal to the
checkpoint interval. Otherwise in my case many times what's observed is
state size grows unnecessarily.
On Fri, Jun 19, 2020 at 12:42 AM Jeff Henrikson <jehenri...@gmail.com
<mailto:jehenri...@gmail.com>> wrote:
Vijay,
Thanks for your thoughts. Below are answers to your questions.
> 1. What's your checkpoint interval?
I have used many different checkpoint intervals, ranging from 5
minutes
to never. I usually setMinPasueBetweenCheckpoints to the same
value as
the checkpoint interval.
> 2. How frequently are you updating the state into RocksDB?
My understanding is that for .cogroup:
- Triggers control communication outside the operator
- Evictors control cleanup of internal state
- Configurations like write buffer size control the frequency of
state change at the storage layer
- There is no control for how frequently the window state
updates at
the layer of the RocksDB api layer.
Thus, the state update whenever data is ingested.
> 3. How many task managers are you using?
Usually I have been running with one slot per taskmanager. 28GB of
usable ram on each node.
> 4. How much data each task manager handles while taking the
checkpoint?
Funny you should ask. I would be okay with zero.
The application I am replacing has a latency of 36-48 hours, so if I
had
to fully stop processing to take every snapshot synchronously, it
might
be seen as totally acceptable, especially for initial bootstrap.
Also,
the velocity of running this backfill is approximately 115x real
time on
8 nodes, so the steady-state run may not exhibit the failure mode in
question at all.
It has come as some frustration to me that, in the case of
RocksDBStateBackend, the configuration key state.backend.async
effectively has no meaningful way to be false.
The only way I have found in the existing code to get a behavior like
synchronous snapshot is to POST to /jobs/<jobID>/stop with
drain=false
and a URL. This method of failing fast is the way that I discovered
that I needed to increase transfer threads from the default.
The reason I don't just run the whole backfill and then take one
snapshot is that even in the absence of checkpoints, a very similar
congestion seems to take the cluster down when I am say 20-30% of the
way through my backfill.
Reloading from my largest feasible snapshot makes it possible to make
another snapshot a bit larger before crash, but not by much.
On first glance, the code change to allow RocksDBStateBackend into a
synchronous snapshots mode looks pretty easy. Nevertheless, I was
hoping to do the initial launch of my application without needing to
modify the framework.
Regards,
Jeff Henrikson
On 6/18/20 7:28 AM, Vijay Bhaskar wrote:
> For me this seems to be an IO bottleneck at your task manager.
> I have a couple of queries:
> 1. What's your checkpoint interval?
> 2. How frequently are you updating the state into RocksDB?
> 3. How many task managers are you using?
> 4. How much data each task manager handles while taking the
checkpoint?
>
> For points (3) and (4) , you should be very careful. I feel you
are
> stuck at this.
> You try to scale vertically by increasing more CPU and memory for
each
> task manager.
> If not, try to scale horizontally so that each task manager IO
gets reduces
> Apart from that check is there any bottleneck with the file
system.
>
> Regards
> Bhaskar
>
>
>
>
>
> On Thu, Jun 18, 2020 at 5:12 PM Timothy Victor <vict...@gmail.com
<mailto:vict...@gmail.com>
> <mailto:vict...@gmail.com <mailto:vict...@gmail.com>>> wrote:
>
> I had a similar problem. I ended up solving by not
relying on
> checkpoints for recovery and instead re-read my input sources
(in my
> case a kafka topic) from the earliest offset and rebuilding
only the
> state I need. I only need to care about the past 1 to 2
days of
> state so can afford to drop anything older. My recovery
time went
> from over an hour for just the first checkpoint to under 10
minutes.
>
> Tim
>
> On Wed, Jun 17, 2020, 11:52 PM Yun Tang <myas...@live.com
<mailto:myas...@live.com>
> <mailto:myas...@live.com <mailto:myas...@live.com>>> wrote:
>
> Hi Jeff
>
> 1. "after around 50GB of state, I stop being able to
reliably
> take checkpoints or savepoints. "
> What is the exact reason that job cannot complete
> checkpoint? Expired before completing or decline by
some
> tasks? The former one is manly caused by high
back-pressure
> and the later one is mainly due to some internal
error.
> 2. Have you checked what reason the remote task manager
is lost?
> If the remote task manager is not crashed, it might
be due
> to GC impact, I think you might need to check
task-manager
> logs and GC logs.
>
> Best
> Yun Tang
>
------------------------------------------------------------------------
> *From:* Jeff Henrikson <jehenri...@gmail.com
<mailto:jehenri...@gmail.com>
> <mailto:jehenri...@gmail.com
<mailto:jehenri...@gmail.com>>>
> *Sent:* Thursday, June 18, 2020 1:46
> *To:* user <user@flink.apache.org
<mailto:user@flink.apache.org> <mailto:user@flink.apache.org
<mailto:user@flink.apache.org>>>
> *Subject:* Trouble with large state
> Hello Flink users,
>
> I have an application of around 10 enrichment joins. All
events
> are
> read from kafka and have event timestamps. The joins are
built
> using
> .cogroup, with a global window, triggering on every 1
event, plus a
> custom evictor that drops records once a newer record
for the
> same ID
> has been processed. Deletes are represented by empty
events with
> timestamp and ID (tombstones). That way, we can drop
records when
> business logic dictates, as opposed to when a maximum
retention
> has been
> attained. The application runs RocksDBStateBackend, on
> Kubernetes on
> AWS with local SSDs.
>
> Unit tests show that the joins produce expected
results. On an
> 8 node
> cluster, watermark output progress seems to indicate I
should be
> able to
> bootstrap my state of around 500GB in around 1 day. I am
able
> to save
> and restore savepoints for the first half an hour of run
time.
>
> My current trouble is that after around 50GB of state,
I stop
> being able
> to reliably take checkpoints or savepoints. Some time
after
> that, I
> start getting a variety of failures where the first
suspicious
> log event
> is a generic cluster connectivity error, such as:
>
> 1) java.io.IOException: Connecting the channel
failed:
> Connecting
> to remote task manager + '/10.67.7.101:38955
<http://10.67.7.101:38955>
> <http://10.67.7.101:38955>' has failed. This
> might indicate that the remote task manager has
been lost.
>
> 2) org.apache.flink.runtime.io
<http://org.apache.flink.runtime.io>.network.netty.exception
> .RemoteTransportException: Connection unexpectedly
closed
> by remote
> task manager 'null'. This might indicate that the
remote task
> manager was lost.
>
> 3) Association with remote system
> [akka.tcp://flink@10.67.6.66:34987
<http://flink@10.67.6.66:34987>
> <http://flink@10.67.6.66:34987>] has failed, address is
now
> gated for [50] ms. Reason: [Association failed with
> [akka.tcp://flink@10.67.6.66:34987
<http://flink@10.67.6.66:34987>
> <http://flink@10.67.6.66:34987>]] Caused by:
> [java.net <http://java.net>.NoRouteToHostException:
No route to host]
>
> I don't see any obvious out of memory errors on the
TaskManager UI.
>
> Adding nodes to the cluster does not seem to increase the
maximum
> savable state size.
>
> I could enable HA, but for the time being I have been
leaving it
> out to
> avoid the possibility of masking deterministic faults.
>
> Below are my configurations.
>
> Thanks in advance for any advice.
>
> Regards,
>
>
> Jeff Henrikson
>
>
>
> Flink version: 1.10
>
> Configuration set via code:
> parallelism=8
> maxParallelism=64
> setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> setTolerableCheckpointFailureNumber(1000)
> setMaxConcurrentCheckpoints(1)
>
>
enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
> RocksDBStateBackend
> setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
> setNumberOfTransferThreads(25)
> setDbStoragePath points to a local nvme SSD
>
> Configuration in flink-conf.yaml:
>
> jobmanager.rpc.address: localhost
> jobmanager.rpc.port: 6123
> jobmanager.heap.size: 28000m
> taskmanager.memory.process.size: 28000m
> taskmanager.memory.jvm-metaspace.size: 512m
> taskmanager.numberOfTaskSlots: 1
> parallelism.default: 1
> jobmanager.execution.failover-strategy: full
>
> cluster.evenly-spread-out-slots: false
>
> taskmanager.memory.network.fraction: 0.2 #
> default 0.1
> taskmanager.memory.framework.off-heap.size: 2GB
> taskmanager.memory.task.off-heap.size: 2GB
> taskmanager.network.memory.buffers-per-channel: 32
# default 2
> taskmanager.memory.managed.fraction: 0.4 #
docs say
> default 0.1, but something seems to set 0.4
> taskmanager.memory.task.off-heap.size: 2048MB #
> default 128M
>
> state.backend.fs.memory-threshold: 1048576
> state.backend.fs.write-buffer-size: 10240000
> state.backend.local-recovery: true
> state.backend.rocksdb.writebuffer.size: 64MB
> state.backend.rocksdb.writebuffer.count: 8
> state.backend.rocksdb.writebuffer.number-to-merge: 4
> state.backend.rocksdb.timer-service.factory: heap
> state.backend.rocksdb.block.cache-size: 64000000 #
default 8MB
> state.backend.rocksdb.write-batch-size: 16000000 #
default 2MB
>
> web.checkpoints.history: 250
>