[ 
https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15184990#comment-15184990
 ] 

Iain Cundy commented on SPARK-2629:
-----------------------------------

Hi Rod

I have a similar requirement - the supported notion of a timeout from last seen 
is useful for some cases, but is too simple for our needs. I thought from 
reading blogs that maybe if the timeout was set it would happen, but it seems 
not - my empirical conclusion is that with timeout set the call function is 
invoked for keys with new state and keys that are timing out, but not any 
others that have no new state.

I also have a problem and possible bug - the timed out state is being removed 
from the list of current keys, but it is never being removed by compaction. – 
the size of the state and the time taken to iterate over it for each 
micro-batch keeps increasing over time, long after the number of ‘current’ keys 
settles down. We start removing keys after just over an hour, but the size of 
the state keeps increasing in runs of over 6 hours.

Essentially we start by just adding keys for our input tuples, reaching a peak 
of about 7 million keys. Then we start to output data and remove keys – the 
number of keys drops to about 5 million. We continue processing tuples, which 
adds keys, while removing the keys we no longer need – the number of keys 
fluctuates up and down between 5 million and  8 million. 

We know this, and are reasonably confident the timing out removal of keys is 
correct, because we obtain the state with 
JavaMapWithStateDStream.stateSnapshots and count the keys.

>From my reading (I don’t know scala!) of the code in 
>org.apache.spark.streaming.util.StateMap.scala it seems clear that the removed 
>keys are only marked as deleted and are really destroyed subsequently by 
>compaction, based upon the length of the chain of delta maps. We’d expect the 
>size of the state RDDs and the time taken to iterate over all the state to 
>stabilize once compaction is run after we remove keys, but it just doesn’t 
>happen.

Is there some possible reason why compaction never gets run?

I tried to use the (undocumented?) config setting 
spark.streaming.sessionByKey.deltaChainThreshold to try to control how often 
compaction is run with:
--conf spark.streaming.sessionByKey.deltaChainThreshold=2

I can see it in the Spark application UI Environment page, but it doesn’t seem 
to make any difference.  

I have noticed that the timeout mechanism only gets invoked on every 10th 
micro-batch. I’m almost sure it isn’t a coincidence that the checkpoint 
interval is also 10 micro-batches. I assume that is an intentional performance 
optimization. However because I have a lot of keys, I have a large micro-batch 
duration, so it would make sense for me to reduce that factor of 10. However, 
since I don’t call checkpoint on the state stream I can’t see how to change it? 
 

Can I change the checkpoint interval  somewhere? [I tried calling 
JavaMapWithStateDStream.checkpoint myself, but that evidently isn’t the same 
thing!] 

My initial assumption was that there is a new deltaMap for each micro-batch, 
but having noticed the timeout behavior I wonder if there is only a new 
deltaMap for each checkpoint? Or maybe there are other criteria?

Perhaps compaction just hasn’t run before my application falls over? Can anyone 
clarify exactly when it should run?

Or maybe compaction doesn’t delete old removed keys for some reason?

Any clarification of the implementation semantics would be much appreciated.

Cheers
Iain

> Improved state management for Spark Streaming
> ---------------------------------------------
>
>                 Key: SPARK-2629
>                 URL: https://issues.apache.org/jira/browse/SPARK-2629
>             Project: Spark
>          Issue Type: Epic
>          Components: Streaming
>    Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>
>  Current updateStateByKey provides stateful processing in Spark Streaming. It 
> allows the user to maintain per-key state and manage that state using an 
> updateFunction. The updateFunction is called for each key, and it uses new 
> data and existing state of the key, to generate an updated state. However, 
> based on community feedback, we have learnt the following lessons.
> - Need for more optimized state management that does not scan every key
> - Need to make it easier to implement common use cases - (a) timeout of idle 
> data, (b) returning items other than state
> The high level idea that I am proposing is 
> - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user 
> to update per-key state, and emit arbitrary records. The new API is necessary 
> as this will have significantly different semantics than the existing 
> updateStateByKey API. This API will have direct support for timeouts.
> - Internally, the system will keep the state data as a map/list within the 
> partitions of the state RDDs. The new data RDDs will be partitioned 
> appropriately, and for all the key-value data, it will lookup the map/list in 
> the state RDD partition and create a new list/map of updated state data. The 
> new state RDD partition will be created based on the update data and if 
> necessary, with old data. 
> Here is the detailed design doc (*outdated, to be updated*). Please take a 
> look and provide feedback as comments.
> https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to