Hi All

I'm aggregating data using mapWithState with a timeout set in 1.6.0. It broadly 
works well and by providing access to the key and the time in the callback 
allows a much more elegant solution for time based aggregation than the old 
updateStateByKey function.

However there seems to be a problem - the size of the state and the time taken 
to iterate over it for each micro-batch keeps increasing over time, long after 
the number of 'current' keys settles down. We start removing keys after just 
over an hour, but the size of the state keeps increasing in runs of over 6 
hours.

Essentially we start by just adding keys for our input tuples, reaching a peak 
of about 7 million keys. Then we start to output data and remove keys - the 
number of keys drops to about 5 million. We continue processing tuples, which 
adds keys, while removing the keys we no longer need - the number of keys 
fluctuates up and down between 5 million and  8 million.

We know this, and are reasonably confident our removal of keys is correct, 
because we obtain the state with JavaMapWithStateDStream.stateSnapshots and 
count the keys.

>From my reading (I don't know scala!) of the code in 
>org.apache.spark.streaming.util.StateMap.scala it seems clear that the removed 
>keys are only marked as deleted and are really destroyed subsequently by 
>compaction, based upon the length of the chain of delta maps. We'd expect the 
>size of the state RDDs and the time taken to iterate over all the state to 
>stabilize once compaction is run after we remove keys, but it just doesn't 
>happen.

Is there some possible reason why compaction never gets run?

I tried to use the (undocumented?) config setting 
spark.streaming.sessionByKey.deltaChainThreshold to try to control how often 
compaction is run with:
--conf spark.streaming.sessionByKey.deltaChainThreshold=2

I can see it in the Spark application UI Environment page, but it doesn't seem 
to make any difference.

I have noticed that the timeout mechanism only gets invoked on every 10th 
micro-batch. I'm almost sure it isn't a coincidence that the checkpoint 
interval is also 10 micro-batches. I assume that is an intentional performance 
optimization. However because I have a lot of keys, I have a large micro-batch 
duration, so it would make sense for me to reduce that factor of 10. However, 
since I don't call checkpoint on the state stream I can't see how to change it?

Can I change the checkpoint interval  somewhere? [I tried calling 
JavaMapWithStateDStream.checkpoint myself, but that evidently isn't the same 
thing!]

My initial assumption was that there is a new deltaMap for each micro-batch, 
but having noticed the timeout behavior I wonder if there is only a new 
deltaMap for each checkpoint? Or maybe there are other criteria?

Perhaps compaction just hasn't run before my application falls over? Can anyone 
clarify exactly when it should run?

Or maybe compaction doesn't delete old removed keys for some reason?

Thank you for your attention.

Cheers
Iain Cundy


This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement,
you may review at http://www.amdocs.com/email_disclaimer.asp

Reply via email to