Hi Robert,

Comments in line

> On Feb 28, 2020, at 2:51 AM, Robert Metzger <rmetz...@apache.org> wrote:
> 
> Sorry for the late reply.
> 
> There's not much you can do at the moment, as Flink needs to sync on the 
> checkpoint barriers.
> There's something in the making for addressing the issue soon: 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-76:+Unaligned+Checkpoints>
If I understand correctly, we need to make sure when snapshot state called, 
inflight records between barriers from different channels needs to be 
"materialized" (processed and pushed to down stream before snapshot called)

Be more specifically, if we honor watermark progression and operator snapshot 
(barriers aligned), drain out of order processed records before actually 
snapshot. Will it work correctly? Detail here 
https://github.com/apache/flink/pull/11267/files 
<https://github.com/apache/flink/pull/11267/files>

> Did you try out using the FsStateBackend?
It’s skewed key causing rocksdb update states slow as far as we know, Ran 
probably can share more in flinkforward 2020 :)
> If you are going to stick with rocks, I would recommend to understand what 
> exactly causes the poor performance. I see the following areas:
> - serialization costs
> - disk / ssd speed
> - network speed (during checkpoint creation) (as Yu mentioned)
> - if you have asynchronous checkpoints enabled, they will also slow down the 
> processing.
> 
> 
> On Sun, Feb 23, 2020 at 8:27 PM Chen Qin <c...@pinterest.com 
> <mailto:c...@pinterest.com>> wrote:
> Just follow up on this thread, it accurately caused by key skew. Given single 
> subtask is single threaded 5% of slow processing cause entire job back 
> pressures on rocksdbstatebackend.
> 
> Robert,
> 
> What is blocking us enable multi threading in processor? I recall it has 
> something todo with barrier and record in order. Can you share more insights 
> on this?
> 
> Chen
> 
>> On Feb 21, 2020, at 4:56 AM, Robert Metzger <rmetz...@apache.org 
>> <mailto:rmetz...@apache.org>> wrote:
>> 
>> 
>> I would try the FsStateBackend in this scenario, as you have enough memory 
>> available.
>> 
>> On Thu, Jan 30, 2020 at 5:26 PM Ran Zhang <ranzh...@pinterest.com 
>> <mailto:ranzh...@pinterest.com>> wrote:
>> Hi Gordon,
>> 
>> Thanks for your reply! Regarding state size - we are at 200-300gb but we 
>> have 120 parallelism which will make each task handle ~2 - 3 gb state. (when 
>> we submit the job we are setting tm memory to 15g.) In this scenario what 
>> will be the best fit for statebackend? 
>> 
>> Thanks,
>> Ran
>> 
>> On Wed, Jan 29, 2020 at 6:37 PM Tzu-Li (Gordon) Tai <tzuli...@apache.org 
>> <mailto:tzuli...@apache.org>> wrote:
>> Hi Ran,
>> 
>> On Thu, Jan 30, 2020 at 9:39 AM Ran Zhang <ranzh...@pinterest.com 
>> <mailto:ranzh...@pinterest.com>> wrote:
>> Hi all,
>> 
>> We have a Flink app that uses a KeyedProcessFunction, and in the function it 
>> requires a ValueState(of TreeSet) and the processElement method needs to 
>> access and update it. We tried to use RocksDB as our stateBackend but the 
>> performance is not good, and intuitively we think it was because of the 
>> serialization / deserialization on each processElement call.
>> 
>> As you have already pointed out, serialization behaviour is a major 
>> difference between the 2 state backends, and will directly impact 
>> performance due to the extra runtime overhead in RocksDB.
>> If you plan to continue using the RocksDB state backend, make sure to use 
>> MapState instead of ValueState where possible, since every access to the 
>> ValueState in the RocksDB backend requires serializing / deserializing the 
>> whole value.
>> For MapState, de-/serialization happens per K-V access. Whether or not this 
>> makes sense would of course depend on your state access pattern.
>>  
>> Then we tried to switch to use FsStateBackend (which keeps the in-flight 
>> data in the TaskManager’s memory according to doc), and it could resolve the 
>> performance issue. So we want to understand better what are the tradeoffs in 
>> choosing between these 2 stateBackend. Our checkpoint size is 200 - 300 GB 
>> in stable state. For now we know one benefits of RocksDB is it supports 
>> incremental checkpoint, but would love to know what else we are losing in 
>> choosing FsStateBackend.
>> 
>> As of now, feature-wise both backends support asynchronous snapshotting, 
>> state schema evolution, and access via the State Processor API.
>> In the end, the major factor for deciding between the two state backends 
>> would be your expected state size.
>> That being said, it could be possible in the future that savepoint formats 
>> for the backends are changed to be compatible, meaning that you will be able 
>> to switch between different backends upon restore [1].
>>  
>> 
>> Thanks a lot!
>> Ran Zhang
>> 
>> Cheers,
>> Gordon
>> 
>>  [1] 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State
>>  
>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Binary+format+for+Keyed+State>

Reply via email to