Sorry, just saw that your question was actually mainly about checkpointing, but 
it can still be related to my previous answer. I assume the checkpointing time 
is the time that is reported in the web interface? This would be the end-to-end 
runtime of the checkpoint which does not really tell you how much time is spend 
on writing the state itself, but you can find this exact detail in the logging; 
you can grep for lines that start with "Asynchronous RocksDB snapshot“. The 
background is that end-to-end also includes the time the checkpoint barrier 
needs to travel to the operator. If there is a lot of backpressure and a lot of 
network buffers, this can take a while. Still, the reason for the backpressure 
could still be in the way you access RocksDB, as it seems you are 
de/serializing every time you update an ever-growing value under a single key. 
I can see that accesses under this conditions could become very slow 
eventually, but could remain fast on the FSBackend for the reason from my first 
answer.

> Am 03.05.2017 um 17:54 schrieb Stefan Richter <s.rich...@data-artisans.com>:
> 
> Hi,
> 
> typically, I would expect that the bottleneck with the RocksDB backend is not 
> RocksDB itself, but your TypeSerializers. I suggest to first run a 
> profiler/sampling attached to the process and check if the problematic 
> methods are in serialization or the actual accesses to RocksDB. The RocksDB 
> backend has to go through de/serialize roundtrips on every single state 
> access, while the FSBackend works on heap objects immediately. For 
> checkpoints, the RocksDB backend can write bytes directly whereas the 
> FSBackend has to use the serializers to get from objects to bytes, so their 
> actions w.r.t. how serializers are used are kind of inverted between 
> operation and checkpointing. For Flink 1.3 we also will introduce incremental 
> checkpoints on RocksDB that piggyback on the SST files. Flink 1.2 is writing 
> checkpoints and savepoints fully and in a custom format.
> 
> Best,
> Stefan
> 
>> Am 03.05.2017 um 16:46 schrieb Jason Brelloch <jb.bc....@gmail.com 
>> <mailto:jb.bc....@gmail.com>>:
>> 
>> Hey all,
>> 
>> I am looking for some advice on tuning rocksDB for better performance in 
>> Flink 1.2.  I created a pretty simple job with a single kafka source and one 
>> flatmap function that just stores 50000 events in a single key of managed 
>> keyed state and then drops everything else, to test checkpoint performance.  
>> Using a basic FsStateBackend configured as:
>> 
>> val backend = new FsStateBackend("file:///home/jason/flink/checkpoint 
>> <file:///home/jason/flink/checkpoint>")
>> env.setStateBackend(backend)
>> 
>> With about 30MB of state we see the checkpoints completing in 151ms.  Using 
>> a RocksDBStateBackend configured as:
>> 
>> val backend = new RocksDBStateBackend("file:///home/jason/flink/checkpoint 
>> <file:///home/jason/flink/checkpoint>")
>> backend.setDbStoragePath("file:///home/jason/flink/rocksdb 
>> <file:///home/jason/flink/rocksdb>")
>> backend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED)
>> env.setStateBackend(backend)
>> 
>> Running the same test the checkpoint takes 3 minutes 42 seconds.
>> 
>> I expect it to be slower, but that seems excessive.  I am also a little 
>> confused as to when rocksDB and flink decide to write to disk, because 
>> watching the database the .sst file wasn't created until significantly after 
>> the checkpoint was completed, and the state had not changed.  Is there 
>> anything I can do to increase the speed of the checkpoints, or anywhere I 
>> can look to debug the issue?  (Nothing seems out of the ordinary in the 
>> flink logs or rocksDB logs)
>> 
>> Thanks!
>> 
>> -- 
>> Jason Brelloch | Product Developer
>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 
>>  <http://www.bettercloud.com/>
>> Subscribe to the BetterCloud Monitor 
>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>  - Get IT delivered to your inbox
> 

Reply via email to