Hi Abhi

The concept is what you want – if you set StateSpec timeout to a Duration of 10 
minutes then any keys no seen for more than 10 minutes will be deleted.

However you did say “exactly” and specifically mention “removed from memory” in 
which case you may be interested in the much more complicated actual semantics. 
This is based on my empirical experience and attempting to read code in scala, 
which is not a language I know so e&oe!


1.       The timeout check is only performed when the state is checkpointed. 
This seems to occur once every 10 micro-batches (it is possible that it changes 
based upon something, but I don’t see a good way to configure it since the 
checkpoint call is internal to mapWithState)

2.       When the timeout check is performed two things happen

a.       call gets invoked for the key – a little care is required because 
trying to update the state or remove causes an exception

b.      the key is marked as deleted, which means it will no longer appear in 
snapshots

3.       Note that I didn’t say the key is removed from memory! That only 
happens on some checkpoints when the code decides to “compact” the state data. 
This happen when the chain of delta maps is at least 
“spark.streaming.sessionByKey.deltaChainThreshold”.

a.       What adds a new delta map? – I think it is every checkpoint, but not 
absolutely certain it is that simple

b.      The default seems to be 20, which means deleted keys only get deleted 
from memory once every 190 or 200 micro-batches. The setting above isn’t 
documented anywhere I can find, however you can set it using spark-submit 
–conf. Setting it to 2 does seem to get keys removed from memory when the 
checkpoint deletes them.

Any clarifications or corrections are welcome!

Cheers
Iain

From: Abhishek Anand [mailto:abhis.anan...@gmail.com]
Sent: 05 April 2016 06:40
To: user
Subject: [MARKETING] Timeout in mapWithState

What exactly is timeout in mapWithState ?

I want the keys to get remmoved from the memory if there is no data received on 
that key for 10 minutes.

How can I acheive this in mapWithState ?

Regards,
Abhi

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,
you may review at http://www.amdocs.com/email_disclaimer.asp

Reply via email to