Hi Caizhi,

Thanks so much for responding.

Our autowatermarkinterval is set to 2000 or 2 seconds.

When I upgrade my project to Flink 1.13.2 the setting works but it does not
work in Flink 1.12.1 or 1.12.0.

I don't see anything in release notes that might have fixed this but we
were able to verify this version mismatch.

Any thoughts ?



On Wed, Oct 27, 2021, 7:39 PM Caizhi Weng <[email protected]> wrote:

> Hi!
>
> What's the value of your config.autowatermarkInterval()? It must be larger
> than 0 for table.exec.source.idle-timeout to work. More specifically, auto
> watermark is to avoid sending watermark for each record (which reduces the
> performance) but instead sends watermark once in each auto watermark
> interval. When an interval times up, Flink will check if any record has
> arrived during this interval and if there is none the idle source logic
> kicks in.
>
> Makhanchan Pandey <[email protected]> 于2021年10月28日周四 上午4:01写道:
>
>> Hi all,
>>
>> I have a local Flink SQL app with Kafka source running with 3 partitions
>> (0,1,2).
>> I am running the following code:
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> final EnvironmentSettings settings =
>>         
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
>> settings);
>>
>> env.setMaxParallelism(env.getParallelism() * 8);
>> env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());
>>
>> final TableConfig tConfig = tEnv.getConfig();
>> tConfig.setIdleStateRetention(Duration.ofMinutes(60));
>>
>> tConfig.getConfiguration().setString("table.exec.source.idle-timeout", 
>> "180000 ms");
>>
>> To Test locally:
>> 1. I sent some data to Partitions 0 with timestamp 150.
>> 2. Sent some data to Partition 1 with timestamp 155.
>> 3. I waited for 3 minutes ( infact waited a bit longer)
>> 4. This should have marked P2 as idle and hence watermarks shoudl advance 
>> and data from P0 and P1 should be processed.
>> 5. However, did not see any watermarks advance
>> 6. Now when I sent data to Partition 2 with timestamp 160, I see Flink 
>> advanced its watermark to 150 (i.e from first data sent and processed data 
>> in P0).
>>
>> Ideally, I would expect to see Flink advance its watermark 3 minutes after 
>> sending data to P0 and P1 if my understanding of the 
>> "table.exec.source.idle-timeout"
>> is correct.
>>
>> I also tried populating all three partitions and then repeating to only send 
>> data to some partitions. This too had no effect.
>>
>> On one occassion, I ran into an exception in the Flink UI but I never saw 
>> this exception again (see bottom)
>> Posted in SO:
>> https://stackoverflow.com/questions/69729366/flink-sql-does-not-honor-table-exec-source-idle-timeout-setting
>>
>> But did not get any responses. Could some one confirm the way I am setting 
>> things up and if the expected behavior is correct?
>>
>> 2021-10-26 16:38:14
>> java.lang.NoClassDefFoundError: 
>> org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
>>     at 
>> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
>>     at java.base/java.util.Optional.ifPresent(Unknown Source)
>>     at 
>> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
>>     at java.base/java.util.HashMap.forEach(Unknown Source)
>>     at 
>> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
>>     at 
>> org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
>>     at 
>> org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
>>     at 
>> org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
>>     at java.base/java.util.HashMap.forEach(Unknown Source)
>>     at 
>> org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
>>     at 
>> org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
>>     at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
>>     at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
>>     at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
>>     at 
>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
>>     at 
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
>>
>>

Reply via email to