[ https://issues.apache.org/jira/browse/FLINK-17800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17117314#comment-17117314 ]
Yun Tang commented on FLINK-17800: ---------------------------------- [~YordanPavlov] I have tried your program with some changes. However, the job would run with failure as {{assert(input.size == 1)}} cannot be ensured. First of all, I cannot run your program as you use internal class {{StreamingScalaJob}}, I have changed your program to general Flink interface so that it could run. Secondly, the program would fail: {code:java} Caused by: TimerException{java.lang.AssertionError: assertion failed} ... 11 more Caused by: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.flink.streaming.scala.examples.RateChecker.apply(MissingWindows.scala:31) at org.apache.flink.streaming.scala.examples.RateChecker.apply(MissingWindows.scala:28) at org.apache.flink.streaming.api.scala.function.util.ScalaWindowFunctionWrapper.apply(ScalaWindowFunctionWrapper.scala:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44) at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1501) ... 10 more {code} I have attached my code named as [^MyMissingWindows.scala] , please give us reproduceable program code without dependency or correct my usage if I am wrong. > RocksDB optimizeForPointLookup results in missing time windows > -------------------------------------------------------------- > > Key: FLINK-17800 > URL: https://issues.apache.org/jira/browse/FLINK-17800 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.10.0, 1.10.1 > Reporter: Yordan Pavlov > Assignee: Yun Tang > Priority: Critical > Fix For: 1.11.0 > > Attachments: MissingWindows.scala, MyMissingWindows.scala > > > +My Setup:+ > We have been using the _RocksDb_ option of _optimizeForPointLookup_ and > running version 1.7 for years. Upon upgrading to Flink 1.10 we started > receiving a strange behavior of missing time windows on a streaming Flink > job. For the purpose of testing I experimented with previous Flink version > and (1.8, 1.9, 1.9.3) and non of them showed the problem > > A sample of the code demonstrating the problem is here: > {code:java} > val datastream = env > .addSource(KafkaSource.keyedElements(config.kafkaElements, > List(config.kafkaBootstrapServer))) > val result = datastream > .keyBy( _ => 1) > .timeWindow(Time.milliseconds(1)) > .print() > {code} > > > The source consists of 3 streams (being either 3 Kafka partitions or 3 Kafka > topics), the elements in each of the streams are separately increasing. The > elements generate increasing timestamps using an event time and start from 1, > increasing by 1. The first partitions would consist of timestamps 1, 2, 10, > 15..., the second of 4, 5, 6, 11..., the third of 3, 7, 8, 9... > > +What I observe:+ > The time windows would open as I expect for the first 127 timestamps. Then > there would be a huge gap with no opened windows, if the source has many > elements, then next open window would be having a timestamp in the thousands. > A gap of hundred of elements would be created with what appear to be 'lost' > elements. Those elements are not reported as late (if tested with the > ._sideOutputLateData_ operator). The way we have been using the option is by > setting in inside the config like so: > ??etherbi.rocksDB.columnOptions.optimizeForPointLookup=268435456?? > We have been using it for performance reasons as we have huge RocksDB state > backend. -- This message was sent by Atlassian Jira (v8.3.4#803005)