[ 
https://issues.apache.org/jira/browse/FLINK-17800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17119803#comment-17119803
 ] 

Yu Li commented on FLINK-17800:
-------------------------------

Thanks for debugging and locating the root cause [~yunta].

>From the analysis, this should be a RocksDB bug and requires a fix in FRocksDB 
>and building a new version, which makes it not an easy fix. It's also a long 
>existing issue but with more severe impact when we make RocksDB the default 
>timer store for RocksDB backend in 1.10.0 release, rather than regression 
>caused by changes in 1.11 release cycle.

I agree we should try to fix it before 1.11.0, but tend to not consider it as a 
release blocker due to the above reasons, and suggest to downgrade the severity 
to Critical.

Please also estimate the time needed and see whether we could make it in 
1.11.0. Thanks.

> RocksDB optimizeForPointLookup results in missing time windows
> --------------------------------------------------------------
>
>                 Key: FLINK-17800
>                 URL: https://issues.apache.org/jira/browse/FLINK-17800
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.10.0, 1.10.1
>            Reporter: Yordan Pavlov
>            Assignee: Yun Tang
>            Priority: Blocker
>             Fix For: 1.11.0, 1.10.2
>
>         Attachments: MissingWindows.scala, MyMissingWindows.scala, 
> MyMissingWindows.scala
>
>
> +My Setup:+
> We have been using the _RocksDb_ option of _optimizeForPointLookup_ and 
> running version 1.7 for years. Upon upgrading to Flink 1.10 we started 
> receiving a strange behavior of missing time windows on a streaming Flink 
> job. For the purpose of testing I experimented with previous Flink version 
> and (1.8, 1.9, 1.9.3) and non of them showed the problem
>  
> A sample of the code demonstrating the problem is here:
> {code:java}
>  val datastream = env
>  .addSource(KafkaSource.keyedElements(config.kafkaElements, 
> List(config.kafkaBootstrapServer)))
>  val result = datastream
>  .keyBy( _ => 1)
>  .timeWindow(Time.milliseconds(1))
>  .print()
> {code}
>  
>  
> The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka 
> topics), the elements in each of the streams are separately increasing. The 
> elements generate increasing timestamps using an event time and start from 1, 
> increasing by 1. The first partitions would consist of timestamps 1, 2, 10, 
> 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9...
>  
> +What I observe:+
> The time windows would open as I expect for the first 127 timestamps. Then 
> there would be a huge gap with no opened windows, if the source has many 
> elements, then next open window would be having a timestamp in the thousands. 
> A gap of hundred of elements would be created with what appear to be 'lost' 
> elements. Those elements are not reported as late (if tested with the 
> ._sideOutputLateData_ operator). The way we have been using the option is by 
> setting in inside the config like so:
> ??etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456??
> We have been using it for performance reasons as we have huge RocksDB state 
> backend.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to