Hi Ofir
I've discovered compaction works in 1.6.0 if I switch off Kryo.
I was using a workaround to get around mapWithState not supporting Kryo. See
https://issues.apache.org/jira/browse/SPARK-12591
My custom KryoRegistrator Java class has
// workaround until bug fixes in spark 1.6.1
kryo.register(OpenHashMapBasedStateMap.class);
kryo.register(EmptyStateMap.class);
kryo.register(MapWithStateRDDRecord.class);
which certainly made the nullPointerException errors when checkpointing go
away, but (inexplicably to me) doesn't allow compaction to work.
I wonder whether the "proper" Kryo support fix in 1.6.1 enables compaction? Has
anybody seen compaction working with the patch?
If you are relying on compaction you do need to be aware of the implementation
semantics, which may explain why your state is growing for longer than you
expect.
I sent those to the list recently, but I can repeat them to you if you can’t
find them. I think
--conf spark.streaming.sessionByKey.deltaChainThreshold=2
is critical if you want compaction more often than once every 190 batches.
Your application will slow down dramatically if your state grows too large for
your container memory, which you can monitor in the task data in
ApplicationMaster web UI.
Cheers
Iain
From: Ofir Kerker [mailto:ofir.ker...@gmail.com]
Sent: 07 April 2016 17:06
To: Iain Cundy; user@spark.apache.org
Subject: Re: mapWithState not compacting removed state
Hi Iain,
Did you manage to solve this issue?
It looks like we have a similar issue with processing time increasing every
micro-batch but only after 30 batches.
Thanks.
On Thu, Mar 3, 2016 at 4:45 PM Iain Cundy
mailto:iain.cu...@amdocs.com>> wrote:
Hi All
I’m aggregating data using mapWithState with a timeout set in 1.6.0. It broadly
works well and by providing access to the key and the time in the callback
allows a much more elegant solution for time based aggregation than the old
updateStateByKey function.
However there seems to be a problem – the size of the state and the time taken
to iterate over it for each micro-batch keeps increasing over time, long after
the number of ‘current’ keys settles down. We start removing keys after just
over an hour, but the size of the state keeps increasing in runs of over 6
hours.
Essentially we start by just adding keys for our input tuples, reaching a peak
of about 7 million keys. Then we start to output data and remove keys – the
number of keys drops to about 5 million. We continue processing tuples, which
adds keys, while removing the keys we no longer need – the number of keys
fluctuates up and down between 5 million and 8 million.
We know this, and are reasonably confident our removal of keys is correct,
because we obtain the state with JavaMapWithStateDStream.stateSnapshots and
count the keys.
From my reading (I don’t know scala!) of the code in
org.apache.spark.streaming.util.StateMap.scala it seems clear that the removed
keys are only marked as deleted and are really destroyed subsequently by
compaction, based upon the length of the chain of delta maps. We’d expect the
size of the state RDDs and the time taken to iterate over all the state to
stabilize once compaction is run after we remove keys, but it just doesn’t
happen.
Is there some possible reason why compaction never gets run?
I tried to use the (undocumented?) config setting
spark.streaming.sessionByKey.deltaChainThreshold to try to control how often
compaction is run with:
--conf spark.streaming.sessionByKey.deltaChainThreshold=2
I can see it in the Spark application UI Environment page, but it doesn’t seem
to make any difference.
I have noticed that the timeout mechanism only gets invoked on every 10th
micro-batch. I’m almost sure it isn’t a coincidence that the checkpoint
interval is also 10 micro-batches. I assume that is an intentional performance
optimization. However because I have a lot of keys, I have a large micro-batch
duration, so it would make sense for me to reduce that factor of 10. However,
since I don’t call checkpoint on the state stream I can’t see how to change it?
Can I change the checkpoint interval somewhere? [I tried calling
JavaMapWithStateDStream.checkpoint myself, but that evidently isn’t the same
thing!]
My initial assumption was that there is a new deltaMap for each micro-batch,
but having noticed the timeout behavior I wonder if there is only a new
deltaMap for each checkpoint? Or maybe there are other criteria?
Perhaps compaction just hasn’t run before my application falls over? Can anyone
clarify exactly when it should run?
Or maybe compaction doesn’t delete old removed keys for some reason?
Thank you for your attention.
Cheers
Iain Cundy
This message and the information contained herein is proprietary and
confidential and subject to the Amdocs policy statement, you may review at
http://www.amdocs.com/email_disclaimer.asp