Hi Caizhi, Thanks so much for responding.
Our autowatermarkinterval is set to 2000 or 2 seconds. When I upgrade my project to Flink 1.13.2 the setting works but it does not work in Flink 1.12.1 or 1.12.0. I don't see anything in release notes that might have fixed this but we were able to verify this version mismatch. Any thoughts ? On Wed, Oct 27, 2021, 7:39 PM Caizhi Weng <[email protected]> wrote: > Hi! > > What's the value of your config.autowatermarkInterval()? It must be larger > than 0 for table.exec.source.idle-timeout to work. More specifically, auto > watermark is to avoid sending watermark for each record (which reduces the > performance) but instead sends watermark once in each auto watermark > interval. When an interval times up, Flink will check if any record has > arrived during this interval and if there is none the idle source logic > kicks in. > > Makhanchan Pandey <[email protected]> 于2021年10月28日周四 上午4:01写道: > >> Hi all, >> >> I have a local Flink SQL app with Kafka source running with 3 partitions >> (0,1,2). >> I am running the following code: >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> final EnvironmentSettings settings = >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, >> settings); >> >> env.setMaxParallelism(env.getParallelism() * 8); >> env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval()); >> >> final TableConfig tConfig = tEnv.getConfig(); >> tConfig.setIdleStateRetention(Duration.ofMinutes(60)); >> >> tConfig.getConfiguration().setString("table.exec.source.idle-timeout", >> "180000 ms"); >> >> To Test locally: >> 1. I sent some data to Partitions 0 with timestamp 150. >> 2. Sent some data to Partition 1 with timestamp 155. >> 3. I waited for 3 minutes ( infact waited a bit longer) >> 4. This should have marked P2 as idle and hence watermarks shoudl advance >> and data from P0 and P1 should be processed. >> 5. However, did not see any watermarks advance >> 6. Now when I sent data to Partition 2 with timestamp 160, I see Flink >> advanced its watermark to 150 (i.e from first data sent and processed data >> in P0). >> >> Ideally, I would expect to see Flink advance its watermark 3 minutes after >> sending data to P0 and P1 if my understanding of the >> "table.exec.source.idle-timeout" >> is correct. >> >> I also tried populating all three partitions and then repeating to only send >> data to some partitions. This too had no effect. >> >> On one occassion, I ran into an exception in the Flink UI but I never saw >> this exception again (see bottom) >> Posted in SO: >> https://stackoverflow.com/questions/69729366/flink-sql-does-not-honor-table-exec-source-idle-timeout-setting >> >> But did not get any responses. Could some one confirm the way I am setting >> things up and if the expected behavior is correct? >> >> 2021-10-26 16:38:14 >> java.lang.NoClassDefFoundError: >> org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData >> at >> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52) >> at java.base/java.util.Optional.ifPresent(Unknown Source) >> at >> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51) >> at java.base/java.util.HashMap.forEach(Unknown Source) >> at >> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51) >> at >> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37) >> at >> org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37) >> at >> org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798) >> at java.base/java.util.HashMap.forEach(Unknown Source) >> at >> org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774) >> at >> org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235) >> at >> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168) >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249) >> >>
