[
https://issues.apache.org/jira/browse/SPARK-2629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15184990#comment-15184990
]
Iain Cundy commented on SPARK-2629:
-----------------------------------
Hi Rod
I have a similar requirement - the supported notion of a timeout from last seen
is useful for some cases, but is too simple for our needs. I thought from
reading blogs that maybe if the timeout was set it would happen, but it seems
not - my empirical conclusion is that with timeout set the call function is
invoked for keys with new state and keys that are timing out, but not any
others that have no new state.
I also have a problem and possible bug - the timed out state is being removed
from the list of current keys, but it is never being removed by compaction. –
the size of the state and the time taken to iterate over it for each
micro-batch keeps increasing over time, long after the number of ‘current’ keys
settles down. We start removing keys after just over an hour, but the size of
the state keeps increasing in runs of over 6 hours.
Essentially we start by just adding keys for our input tuples, reaching a peak
of about 7 million keys. Then we start to output data and remove keys – the
number of keys drops to about 5 million. We continue processing tuples, which
adds keys, while removing the keys we no longer need – the number of keys
fluctuates up and down between 5 million and 8 million.
We know this, and are reasonably confident the timing out removal of keys is
correct, because we obtain the state with
JavaMapWithStateDStream.stateSnapshots and count the keys.
>From my reading (I don’t know scala!) of the code in
>org.apache.spark.streaming.util.StateMap.scala it seems clear that the removed
>keys are only marked as deleted and are really destroyed subsequently by
>compaction, based upon the length of the chain of delta maps. We’d expect the
>size of the state RDDs and the time taken to iterate over all the state to
>stabilize once compaction is run after we remove keys, but it just doesn’t
>happen.
Is there some possible reason why compaction never gets run?
I tried to use the (undocumented?) config setting
spark.streaming.sessionByKey.deltaChainThreshold to try to control how often
compaction is run with:
--conf spark.streaming.sessionByKey.deltaChainThreshold=2
I can see it in the Spark application UI Environment page, but it doesn’t seem
to make any difference.
I have noticed that the timeout mechanism only gets invoked on every 10th
micro-batch. I’m almost sure it isn’t a coincidence that the checkpoint
interval is also 10 micro-batches. I assume that is an intentional performance
optimization. However because I have a lot of keys, I have a large micro-batch
duration, so it would make sense for me to reduce that factor of 10. However,
since I don’t call checkpoint on the state stream I can’t see how to change it?
Can I change the checkpoint interval somewhere? [I tried calling
JavaMapWithStateDStream.checkpoint myself, but that evidently isn’t the same
thing!]
My initial assumption was that there is a new deltaMap for each micro-batch,
but having noticed the timeout behavior I wonder if there is only a new
deltaMap for each checkpoint? Or maybe there are other criteria?
Perhaps compaction just hasn’t run before my application falls over? Can anyone
clarify exactly when it should run?
Or maybe compaction doesn’t delete old removed keys for some reason?
Any clarification of the implementation semantics would be much appreciated.
Cheers
Iain
> Improved state management for Spark Streaming
> ---------------------------------------------
>
> Key: SPARK-2629
> URL: https://issues.apache.org/jira/browse/SPARK-2629
> Project: Spark
> Issue Type: Epic
> Components: Streaming
> Affects Versions: 0.9.2, 1.0.2, 1.2.2, 1.3.1, 1.4.1, 1.5.1
> Reporter: Tathagata Das
> Assignee: Tathagata Das
>
> Current updateStateByKey provides stateful processing in Spark Streaming. It
> allows the user to maintain per-key state and manage that state using an
> updateFunction. The updateFunction is called for each key, and it uses new
> data and existing state of the key, to generate an updated state. However,
> based on community feedback, we have learnt the following lessons.
> - Need for more optimized state management that does not scan every key
> - Need to make it easier to implement common use cases - (a) timeout of idle
> data, (b) returning items other than state
> The high level idea that I am proposing is
> - Introduce a new API -trackStateByKey- *mapWithState* that, allows the user
> to update per-key state, and emit arbitrary records. The new API is necessary
> as this will have significantly different semantics than the existing
> updateStateByKey API. This API will have direct support for timeouts.
> - Internally, the system will keep the state data as a map/list within the
> partitions of the state RDDs. The new data RDDs will be partitioned
> appropriately, and for all the key-value data, it will lookup the map/list in
> the state RDD partition and create a new list/map of updated state data. The
> new state RDD partition will be created based on the update data and if
> necessary, with old data.
> Here is the detailed design doc (*outdated, to be updated*). Please take a
> look and provide feedback as comments.
> https://docs.google.com/document/d/1NoALLyd83zGs1hNGMm0Pc5YOVgiPpMHugGMk6COqxxE/edit#heading=h.ph3w0clkd4em
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]