Hi Ashish,

It would be helpful to share the code of your custom trigger for the first case.
Without that, we cannot tell what state you create and how/when you 
update/clear it.

Cheers,
Kostas

> On May 14, 2018, at 1:04 AM, ashish pok <ashish...@yahoo.com> wrote:
> 
> Hi Till,
> 
> Thanks for getting back. I am sure that will fix the issue but I feel like 
> that would potentially mask an issue. I have been going back and forth with 
> Fabian on a use case where for some of our highly transient datasets, it 
> might make sense to just use memory based state (except of course data loss 
> becomes an issue when apps occasionally hit a problem and whole job restarts 
> or app has to be taken down etc - ie. handling graceful shutdowns / restarts 
> better essentially). I was on the hook to create a business case and post it 
> back to this forum (which I am hoping I can get around to at some point 
> soon). Long story short, this is one of those datasets. 
> 
> States in this case are either fired and cleared normally or on processing 
> timeout. So technically, unless there is a memory leak in app code, memory 
> usage should plateau out at a high-point. What I was noticing was memory 
> would start to creep up ever so slowly. 
> 
> I couldn't tell exactly why heap utilization kept on growing (ever so slowly 
> but it had upward trend for sure) because the states should technically be 
> cleared if not as part of a reducing function then on timeout. App after 
> running for couple of days would then run into Java Heap issues. So changing 
> to RocksDB probably will fix the issue but not necessarily leak of states 
> that should be cleared IMO. Interestingly, I switched my app from using 
> something like this:
> 
> WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats = 
> statsStream
>                               .keyBy(BasicFactTuple::getKey)
>                               .window(GlobalWindows.create())
>                               .trigger(BitwiseOrTrigger.of(60, 
> AppConfigs.getWindowSize(5*60*1000)))
>                       ;
> 
> To 
> 
>  DataStream<PlatformEvent> processStats = pipStatsStream
>                               .keyBy(BasicFactTuple::getKey)
>                               .process(new 
> IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), 60))
> 
> I basically moved logic of trigger to process function over the weekend. Once 
> I did that, heap is completely stabilized. In trigger implementation, I was 
> using FIRE_AND_PURGE on trigger condition or onProcessingTime and in process 
> implementation I am using .clear() method for same. 
> 
> I seem to have solved the problem by using process but I'd be interested to 
> understand the cause of why heap would creep up in trigger scenario. 
> 
> Hope this makes sense,
> 
> Ashish
> 
> On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann 
> <till.rohrm...@gmail.com> wrote:
> 
> 
> Hi Ashish,
> 
> have you tried using Flink's RocksDBStateBackend? If your job accumulates 
> state exceeding the available main memory, then you have to use a state 
> backend which can spill to disk. The RocksDBStateBackend offers you exactly 
> this functionality.
> 
> Cheers,
> Till
> 
> On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <ashish...@yahoo.com 
> <mailto:ashish...@yahoo.com>> wrote:
> All,
> 
> I am using noticing heap utilization creeping up slowly in couple of apps 
> which eventually lead to OOM issue. Apps only have 1 process function that 
> cache state. I did make sure I have a clear method invoked when events are 
> collected normally, on exception and on timeout.
> 
> Are any other best practices others follow for memory backed states?
> 
> Thanks,
> 
> -- Ashish
> 

Reply via email to