Hi Ofir

I've discovered compaction works in 1.6.0 if I switch off Kryo.

I was using a workaround to get around mapWithState not supporting Kryo. See 
https://issues.apache.org/jira/browse/SPARK-12591

My custom KryoRegistrator Java class has

// workaround until bug fixes in spark 1.6.1
kryo.register(OpenHashMapBasedStateMap.class);
kryo.register(EmptyStateMap.class);
kryo.register(MapWithStateRDDRecord.class);

which certainly made the nullPointerException errors when checkpointing go 
away, but (inexplicably to me) doesn't allow compaction to work.

I wonder whether the "proper" Kryo support fix in 1.6.1 enables compaction? Has 
anybody seen compaction working with the patch?

If you are relying on compaction you do need to be aware of the implementation 
semantics, which may explain why your state is growing for longer than you 
expect.

I sent those to the list recently, but I can repeat them to you if you can’t 
find them. I think
--conf spark.streaming.sessionByKey.deltaChainThreshold=2
is critical if you want compaction more often than once every 190 batches.

Your application will slow down dramatically if your state grows too large for 
your container memory, which you can monitor in the task data in 
ApplicationMaster web UI.

Cheers
Iain

From: Ofir Kerker [mailto:ofir.ker...@gmail.com]
Sent: 07 April 2016 17:06
To: Iain Cundy; user@spark.apache.org
Subject: Re: mapWithState not compacting removed state

Hi Iain,
Did you manage to solve this issue?
It looks like we have a similar issue with processing time increasing every 
micro-batch but only after 30 batches.

Thanks.

On Thu, Mar 3, 2016 at 4:45 PM Iain Cundy 
<iain.cu...@amdocs.com<mailto:iain.cu...@amdocs.com>> wrote:
Hi All

I’m aggregating data using mapWithState with a timeout set in 1.6.0. It broadly 
works well and by providing access to the key and the time in the callback 
allows a much more elegant solution for time based aggregation than the old 
updateStateByKey function.

However there seems to be a problem – the size of the state and the time taken 
to iterate over it for each micro-batch keeps increasing over time, long after 
the number of ‘current’ keys settles down. We start removing keys after just 
over an hour, but the size of the state keeps increasing in runs of over 6 
hours.

Essentially we start by just adding keys for our input tuples, reaching a peak 
of about 7 million keys. Then we start to output data and remove keys – the 
number of keys drops to about 5 million. We continue processing tuples, which 
adds keys, while removing the keys we no longer need – the number of keys 
fluctuates up and down between 5 million and  8 million.

We know this, and are reasonably confident our removal of keys is correct, 
because we obtain the state with JavaMapWithStateDStream.stateSnapshots and 
count the keys.

From my reading (I don’t know scala!) of the code in 
org.apache.spark.streaming.util.StateMap.scala it seems clear that the removed 
keys are only marked as deleted and are really destroyed subsequently by 
compaction, based upon the length of the chain of delta maps. We’d expect the 
size of the state RDDs and the time taken to iterate over all the state to 
stabilize once compaction is run after we remove keys, but it just doesn’t 
happen.

Is there some possible reason why compaction never gets run?

I tried to use the (undocumented?) config setting 
spark.streaming.sessionByKey.deltaChainThreshold to try to control how often 
compaction is run with:
--conf spark.streaming.sessionByKey.deltaChainThreshold=2

I can see it in the Spark application UI Environment page, but it doesn’t seem 
to make any difference.

I have noticed that the timeout mechanism only gets invoked on every 10th 
micro-batch. I’m almost sure it isn’t a coincidence that the checkpoint 
interval is also 10 micro-batches. I assume that is an intentional performance 
optimization. However because I have a lot of keys, I have a large micro-batch 
duration, so it would make sense for me to reduce that factor of 10. However, 
since I don’t call checkpoint on the state stream I can’t see how to change it?

Can I change the checkpoint interval  somewhere? [I tried calling 
JavaMapWithStateDStream.checkpoint myself, but that evidently isn’t the same 
thing!]

My initial assumption was that there is a new deltaMap for each micro-batch, 
but having noticed the timeout behavior I wonder if there is only a new 
deltaMap for each checkpoint? Or maybe there are other criteria?

Perhaps compaction just hasn’t run before my application falls over? Can anyone 
clarify exactly when it should run?

Or maybe compaction doesn’t delete old removed keys for some reason?

Thank you for your attention.

Cheers
Iain Cundy

This message and the information contained herein is proprietary and 
confidential and subject to the Amdocs policy statement, you may review at 
http://www.amdocs.com/email_disclaimer.asp

Reply via email to