Hi Srikanth, As you side, the idle-state retention time configuration only works for "group aggregate" and "over aggregate".
Regarding to your question: > If it happens to be that the 'ip' acts as a key in my query for the eviction > to work, how does Flink justify the Heap size grew to 80GB and crash? Is you question "how does the eviction work"? If it is, it will register a timer for each key and when the state corresponding to some key is not accessed for the configured time, the state will be cleared as a callback of the timer. The state clear is not triggered by the heap size, but by the timer. > Is is that every query with a time windowed join, Flink SQL will > automatically clear older records that have become irrelevant? That's right. For time window join, the outdated records will be cleared automatically. Regards, Dian > 在 2019年9月18日,上午8:50,srikanth flink <flink.d...@gmail.com> 写道: > > Hi there, > > I'm using FlinkSQL to solve to do the job for me. Based on this > <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html>, > configured the idle-state milliseconds. > > context: FlinkSQL reads Kafka stream with no key and put to dynamic table( > sourceKafka). There's another static table( badips) loaded from file and the > join is performed from dynamic table on static like: SELECT sourceKafka.* > FROM sourceKafka INNER JOIN badips ON sourceKafka.source.ip=badips.ip WHERE > sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' > MINUTE AND CURRENT_TIMESTAMP; > > As it said in the docs, my query doesn't have a 'groupby' for the idle-state > to act upon and evict the untouched. So how do I manage to evict the older > once? > > Questions: > If it happens to be that the 'ip' acts as a key in my query for the eviction > to work, how does Flink justify the Heap size grew to 80GB and crash? > Is is that every query with a time windowed join, Flink SQL will > automatically clear older records that have become irrelevant? > > Thanks > Srikanth >