Re: How exactly does Idle-state retention policy work?

2019-09-17 Thread Dian Fu
Hi Srikanth,

As you side, the idle-state retention time configuration only works for "group 
aggregate" and "over aggregate".

Regarding to your question:
> If it happens to be that the 'ip' acts as a key in my query for the eviction 
> to work, how does Flink justify the Heap size grew to 80GB and crash?
Is you question "how does the eviction work"? If it is, it will register a 
timer for each key and when the state corresponding to some key is not accessed 
for the configured time, the state will be cleared as a callback of the timer. 
The state clear is not triggered by the heap size, but by the timer.

> Is is that every query with a time windowed join, Flink SQL will 
> automatically clear older records that have become irrelevant?
That's right. For time window join, the outdated records will be cleared 


> 在 2019年9月18日,上午8:50,srikanth flink  写道:
> Hi there,
> I'm using FlinkSQL to solve to do the job for me. Based on this 
> ,
>  configured the idle-state milliseconds. 
> context: FlinkSQL reads Kafka stream with no key and put to dynamic table( 
> sourceKafka). There's another static table( badips) loaded from file and the 
> join is performed from dynamic table on static like: SELECT sourceKafka.* 
> FROM sourceKafka INNER JOIN badips ON sourceKafka.source.ip=badips.ip WHERE 
> sourceKafka.timestamp_received BETWEEN CURRENT_TIMESTAMP - INTERVAL '15' 
> As it said in the docs, my query doesn't have a 'groupby' for the idle-state 
> to act upon and evict the untouched. So how do I manage to evict the older 
> once?
> Questions:
> If it happens to be that the 'ip' acts as a key in my query for the eviction 
> to work, how does Flink justify the Heap size grew to 80GB and crash?
> Is is that every query with a time windowed join, Flink SQL will 
> automatically clear older records that have become irrelevant?
> Thanks
> Srikanth

How exactly does Idle-state retention policy work?

2019-09-17 Thread srikanth flink
Hi there,

I'm using FlinkSQL to solve to do the job for me. Based on this
configured the idle-state milliseconds.

*context*: FlinkSQL reads Kafka stream with no key and put to dynamic
table( sourceKafka). There's another static table( badips) loaded from file
and the join is performed from dynamic table on static like: SELECT
sourceKafka.* FROM sourceKafka INNER JOIN badips ON

As it said in the docs, my query doesn't have a 'groupby' for the
idle-state to act upon and evict the untouched. So how do I manage to evict
the older once?


   1. If it happens to be that the 'ip' acts as a key in my query for the
   eviction to work, how does Flink justify the Heap size grew to 80GB and
   2. Is is that every query with a time windowed join, Flink SQL will
   automatically clear older records that have become irrelevant?
