Hi All I'm aggregating data using mapWithState with a timeout set in 1.6.0. It broadly works well and by providing access to the key and the time in the callback allows a much more elegant solution for time based aggregation than the old updateStateByKey function.
However there seems to be a problem - the size of the state and the time taken to iterate over it for each micro-batch keeps increasing over time, long after the number of 'current' keys settles down. We start removing keys after just over an hour, but the size of the state keeps increasing in runs of over 6 hours. Essentially we start by just adding keys for our input tuples, reaching a peak of about 7 million keys. Then we start to output data and remove keys - the number of keys drops to about 5 million. We continue processing tuples, which adds keys, while removing the keys we no longer need - the number of keys fluctuates up and down between 5 million and 8 million. We know this, and are reasonably confident our removal of keys is correct, because we obtain the state with JavaMapWithStateDStream.stateSnapshots and count the keys. >From my reading (I don't know scala!) of the code in >org.apache.spark.streaming.util.StateMap.scala it seems clear that the removed >keys are only marked as deleted and are really destroyed subsequently by >compaction, based upon the length of the chain of delta maps. We'd expect the >size of the state RDDs and the time taken to iterate over all the state to >stabilize once compaction is run after we remove keys, but it just doesn't >happen. Is there some possible reason why compaction never gets run? I tried to use the (undocumented?) config setting spark.streaming.sessionByKey.deltaChainThreshold to try to control how often compaction is run with: --conf spark.streaming.sessionByKey.deltaChainThreshold=2 I can see it in the Spark application UI Environment page, but it doesn't seem to make any difference. I have noticed that the timeout mechanism only gets invoked on every 10th micro-batch. I'm almost sure it isn't a coincidence that the checkpoint interval is also 10 micro-batches. I assume that is an intentional performance optimization. However because I have a lot of keys, I have a large micro-batch duration, so it would make sense for me to reduce that factor of 10. However, since I don't call checkpoint on the state stream I can't see how to change it? Can I change the checkpoint interval somewhere? [I tried calling JavaMapWithStateDStream.checkpoint myself, but that evidently isn't the same thing!] My initial assumption was that there is a new deltaMap for each micro-batch, but having noticed the timeout behavior I wonder if there is only a new deltaMap for each checkpoint? Or maybe there are other criteria? Perhaps compaction just hasn't run before my application falls over? Can anyone clarify exactly when it should run? Or maybe compaction doesn't delete old removed keys for some reason? Thank you for your attention. Cheers Iain Cundy This message and the information contained herein is proprietary and confidential and subject to the Amdocs policy statement, you may review at http://www.amdocs.com/email_disclaimer.asp