If a store is backed by a changelog topic, the changelog topic is
responsible to hold the latest state of the store. Thus, the topic must
store the latest value per key. For this, we use a compacted topic.

If case of restore, the local RocksDB store is cleared so it is empty,
and we read the complete changelog topic an apply those updates to the
store.

This allows a fast recovery, because no source topic rewind and not
reprocessing is required. Furthermore, because the changelog topic is
compacted, it is roughly the size of the number of distinct keys in the
store -- this also reduced recovery time as you don't need to replay
every update to the store.

We are currently working on an optimization, that allows us to only
reply the tail to the changelog topic in certain cases to get the store
back into a valid state: See
https://issues.apache.org/jira/browse/KAFKA-4317

Furthermore, changelog topic allow to maintain StandbyTask -- those
tasks only apply all updates to the changelog topic (that are written by
the main task maintaining the store) to a local copy of the store. Thus,
in case of fail-over those StandbyTasks can replace a failed task and
because they have a copy of the state, they can take over even more
quickly than a newly created tasks that needs to reply the changelog to
rebuild the state first.



-Matthias

On 2/28/17 8:17 AM, Steven Schlansker wrote:
> 
>> On Feb 28, 2017, at 12:17 AM, Michael Noll <mich...@confluent.io> wrote:
>>
>> Sachin,
>>
>> disabling (change)logging for state stores disables the fault-tolerance of
>> the state store -- i.e. changes to the state store will not be backed up to
>> Kafka, regardless of whether the store uses a RocksDB store, an in-memory
>> store, or something else
> 
> One thing I've wanted is a more concrete description of this failure mode.
> What exactly is the process to recover from such a "failed" state store?
> 
> Does Kafka Streams rewind the source topic and replay?  (Including any 
> Processors you may have wired up?)
> Does the state store remain faulted?  Can an administrator fix it by 
> resetting some offsets?
> 
> I looked around both in the project and Confluent documentation and didn't 
> really find
> an answer to how non-logged state stores fail or recover.
> 
> Thanks for any insight!
> 
>>
>>
>>> When disabling this in 0.10.2 what does this exactly means.
>>
>> See above.
>>
>>
>>> Does this means no longer any rocksdb state store would get created?
>>
>> No, local state stores will still be created.  By default, the storage
>> engine is RocksDB, so if you disable changelogging then you will still have
>> local RocksDB stores (as usual) but those stores will not be backed up to
>> Kafka behind the scenes.  If, in this situation, you lose a machine that
>> has local RocksDB stores, then this state data is lost, too.
>>
>> So there are two different things at play here:
>>
>> 1. Whether you want to enable or disable (change)logging of state store,
>> and thus to enable/disable fault-tolerant state stores.
>>
>> 2. Which storage engine you want to use for the state stores.  The default
>> is RocksDB.
>>
>> If, for (2), you do not want to have RocksDB state stores, you can switch
>> the storage engine to e.g. the in-memory store.  However, when you do
>> switch from RocksDB to in-memory then all your state store's data must fit
>> into memory (obviously), otherwise you'll run OOM.
>>
>> In summary, you can have either of the following:
>>
>> a. RocksDB state stores with changelogging enabled (= fault-tolerant
>> stores).
>>
>> b. RocksDB state stores with changelogging disabled (= stores are not
>> fault-tolerant, you may suffer from data loss during e.g. machine failures).
>>
>> c. In-memory state stores with changelogging enabled (= fault-tolerant
>> stores). But careful: you may run OOM if the state data does not fit into
>> the available memory.
>>
>> d. In-memory state stores with changelogging disabled (= stores are not
>> fault-tolerant, you may suffer from data loss during e.g. machine
>> failures). But careful: you may run OOM if the state data does not fit into
>> the available memory.
>>
>>
>> Hope this helps,
>> Michael
>>
>>
>>
>>
>> On Tue, Feb 28, 2017 at 8:01 AM, Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>>> I had a question regarding
>>> http://docs.confluent.io/3.1.2/streams/developer-guide.
>>> html#enable-disable-state-store-changelogs
>>>
>>> When disabling this in 0.10.2 what does this exactly means.
>>> Dos this means no longer any rocksdb state store would get created?
>>>
>>> On this subject we had started with spark streaming, but we ran into memory
>>> issues and the hardware we have got is not so fantastic to support spark
>>> streaming.
>>>
>>> So we switched to high level DSL kafka streaming .
>>>
>>> I think if your source is kafka queues, kafka streaming is good and simple
>>> to use. However you need to plan ahead as anticipate the (max) load and
>>> create adequate partitions based on some key on which aggregations can be
>>> performed independently.
>>>
>>> Then you can run cluster of stream threads (same and multiple machines),
>>> each processing a partition.
>>>
>>> Having said this, we however run into lot of issues of frequent stream
>>> re-balance, especially when we have multiple instances of rocks db running
>>> on a single machine.
>>> Now we don't know if this is some bad VM configuration issue or some
>>> problem with kafka streams/rocks db integration, we are still working on
>>> that.
>>>
>>> So I would suggest if you partition your data well enough and have single
>>> streams thread consuming only one partition and not many instances of
>>> rocksdb created on a single machine, the overall applications runs fine.
>>> Also make sure not to create big time windows and set a not so long
>>> retention time, so that state stores size is limited.
>>>
>>> We use a sliding 5 minutes window of size 10 minutes and retention of 30
>>> minutes and see overall performance much better than say 30 minutes sliding
>>> of size 1 hour and retention of 3 hours.
>>>
>>> So to conclude if you can manage rocks db, then kafka streams is good to
>>> start with, its simple and very intuitive to use.
>>>
>>> Again on rocksdb side, is there a way to eliminate that and is
>>>
>>> disableLogging
>>>
>>> for that?
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>> On Mon, Feb 27, 2017 at 7:47 PM, Michael Noll <mich...@confluent.io>
>>> wrote:
>>>
>>>>> Also, is it possible to stop the syncing between state stores to
>>> brokers,
>>>> if I am fine with failures?
>>>>
>>>> Yes, you can disable the syncing (or the "changelog" feature) of state
>>>> stores:
>>>> http://docs.confluent.io/current/streams/developer-
>>>> guide.html#enable-disable-state-store-changelogs
>>>>
>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming
>>> can
>>>> do this differently.
>>>>> Guozhang, could you comment anything regarding Kafka Streams vs Spark
>>>> Streaming, especially
>>>>> in terms of aggregations/groupbys/joins implementation logic?
>>>>
>>>> As you are hinting at yourself, if you want fault-tolerant state, then
>>> this
>>>> fault tolerance comes at a price (in Kafka Streams, this is achieved by
>>>> changelog-ing state stores).  Other tools such as Flink or Spark work in
>>> a
>>>> similar fashion, there's no free lunch.
>>>>
>>>> One option, which you brought up above, is to disable the fault tolerance
>>>> functionality for state by disabling the changelogs of state stores (see
>>>> above).  Another option is to leverage Kafka's record caching for Kafka
>>>> Streams, which does lower the amount of data that is sent across the
>>>> network (from your app's state store changelogs to the Kafka cluster and
>>>> vice versa), though you may need to tune some parameters in your
>>> situation
>>>> because your key space has high cardinality and message volume per key is
>>>> relatively low (= you don't benefit as much from record caching as most
>>>> other users/use cases).
>>>>
>>>>
>>>> -Michael
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Feb 27, 2017 at 2:42 PM, Tianji Li <skyah...@gmail.com> wrote:
>>>>
>>>>> Hi Guozhang and Kohki,
>>>>>
>>>>> Thanks for your replies.
>>>>>
>>>>> I think I know how to deal with partitioning now, but I am still not
>>> sure
>>>>> how to deal with the traffic between the hidden state store sizes and
>>>> Kafka
>>>>> Brokers (same as Kohki).
>>>>>
>>>>> I feel like the easiest thing to do is to set a larger commit window,
>>> so
>>>>> that the state stores are synced to brokers slower than default.
>>>>>
>>>>> I do have a Spark Cluster, but I am not convince how Spark Streaming
>>> can
>>>>> do this differently. Guozhang, could you comment anything regarding
>>> Kafka
>>>>> Streams vs Spark Streaming, especially in terms of
>>>>> aggregations/groupbys/joins implementation logic?
>>>>>
>>>>> Also, is it possible to stop the syncing between state stores to
>>> brokers,
>>>>> if I am fine with failures?
>>>>>
>>>>> Thanks
>>>>> Tianji
>>>>>
>>>>>
>>>>> On 2017-02-26 23:52 (-0500), Guozhang Wang <wangg...@gmail.com> wrote:
>>>>>> Hello Tianji,
>>>>>>
>>>>>> As Kohki mentioned, in Streams joins and aggregations are always done
>>>>>> pre-partitioned, and hence locally. So there won't be any inter-node
>>>>>> communications needed to execute the join / aggregations. Also they
>>> can
>>>>> be
>>>>>> hosted as persistent local state stores so you don't need to keep
>>> them
>>>> in
>>>>>> memory. So for example if you partition your data with K1 / K2, then
>>>> data
>>>>>> with the same values in combo (K1, K2) will always goes to the same
>>>>>> partition, and hence good for aggregations / joins on either K1, K2,
>>> or
>>>>>> combo(K1, K2), but not sufficient for combo(K1, K2, K3, K4), as data
>>>> with
>>>>>> the same values of K3 / K4 might still goes to different partitions
>>>>>> processed by different Streams instances.
>>>>>>
>>>>>> So what you want is really to partition based on the "maximum
>>> superset"
>>>>> of
>>>>>> all the involved keys. Note that with the superset of all the keys
>>> one
>>>>>> thing to watch out is the even distribution of the partitions. If it
>>> is
>>>>> not
>>>>>> evenly distributed, then some instance might become hot points. This
>>>> can
>>>>> be
>>>>>> tackled by customizing the "PartitionGrouper" interface in Streams,
>>>> which
>>>>>> indicates which set of partitions will be assigned to each of the
>>> tasks
>>>>> (by
>>>>>> default each one partition from the source topics will form a task,
>>> and
>>>>>> task is the unit of parallelism in Streams).
>>>>>>
>>>>>> Hope this helps.
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Sun, Feb 26, 2017 at 10:57 AM, Kohki Nishio <tarop...@gmail.com>
>>>>> wrote:
>>>>>>
>>>>>>> Tianji,
>>>>>>> KStream is indeed Append mode as long as I do stateless processing,
>>>> but
>>>>>>> when you do aggregation that is a stateful operation and it turns
>>> to
>>>>> KTable
>>>>>>> and that does Update mode.
>>>>>>>
>>>>>>> In regard to your aggregation, I believe Kafka's aggregation works
>>>> for
>>>>> a
>>>>>>> single partition not over multiple partitions, are you doing 100
>>>>>>> different aggregation against record key ? Then you should have a
>>>>> single
>>>>>>> data object for those 100 values, anyway it sounds like we have
>>>> similar
>>>>>>> problem ..
>>>>>>>
>>>>>>> -Kohki
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Feb 25, 2017 at 1:11 PM, Tianji Li <skyah...@gmail.com>
>>>> wrote:
>>>>>>>
>>>>>>>> Hi Kohki,
>>>>>>>>
>>>>>>>> Thanks very much for providing your investigation results.
>>>> Regarding
>>>>>>>> 'append' mode with Kafka Streams, isn't KStream the thing you
>>> want?
>>>>>>>>
>>>>>>>> Hi Guozhang,
>>>>>>>>
>>>>>>>> Thanks for the pointers to the two blogs. I read one of them
>>> before
>>>>> and
>>>>>>>> just had a look at the other one.
>>>>>>>>
>>>>>>>> What I am hoping to do is below, can you help me decide if Kafka
>>>>> Stream
>>>>>>> is
>>>>>>>> a good fit?
>>>>>>>>
>>>>>>>> We have a few data sources, and we are hoping to correlate these
>>>>> sources,
>>>>>>>> and then do aggregations, as *a stream in real-time*.
>>>>>>>>
>>>>>>>> The number of aggregations is around 100 which means, if using
>>>> Kafka
>>>>>>>> Streams, we need to maintain around 100 state stores with 100
>>>>> change-log
>>>>>>>> topics behind
>>>>>>>> the scene when joining and aggregations.
>>>>>>>>
>>>>>>>> The number of unique entries in each of these state stores is
>>>>> expected to
>>>>>>>> be at the level of < 100M. The size of each record is around 1K
>>>>> bytes and
>>>>>>>> so,
>>>>>>>> each state is expected to be ~100G bytes in size. The total
>>> number
>>>> of
>>>>>>>> bytes in all these state stores is thus around 10T bytes.
>>>>>>>>
>>>>>>>> If keeping all these stores in memory, this translates into
>>> around
>>>> 50
>>>>>>>> machines with 256Gbytes for this purpose alone.
>>>>>>>>
>>>>>>>> Plus, the incoming raw data rate could reach 10M records per
>>> second
>>>>> in
>>>>>>>> peak hours. So, during aggregation, data movement between Kafka
>>>>> Streams
>>>>>>>> instances
>>>>>>>> will be heavy, i.e., 10M records per second in the cluster for
>>>>> joining
>>>>>>> and
>>>>>>>> aggregations.
>>>>>>>>
>>>>>>>> Is Kafka Streams good for this? My gut feeling is Kafka Streams
>>> is
>>>>> fine.
>>>>>>>> But I'd like to run this by you.
>>>>>>>>
>>>>>>>> And, I am hoping to minimize data movement (to saving bandwidth)
>>>>> during
>>>>>>>> joins/groupBys. If I partition the raw data with the minimum
>>> subset
>>>>> of
>>>>>>>> aggregation keys (say K1 and K2),  then I wonder if the following
>>>>>>>> joins/groupBys (say on keys K1, K2, K3, K4) happen on local data,
>>>> if
>>>>>>> using
>>>>>>>> DSL?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Tianji
>>>>>>>>
>>>>>>>>
>>>>>>>> On 2017-02-25 13:49 (-0500), Guozhang Wang <w...@gmail.com>
>>> wrote:
>>>>>>>>> Hello Kohki,>
>>>>>>>>>
>>>>>>>>> Thanks for the email. I'd like to learn what's your concern of
>>>> the
>>>>> size
>>>>>>>> of>
>>>>>>>>> the state store? From your description it's a bit hard to
>>> figure
>>>>> out
>>>>>>> but>
>>>>>>>>> I'd guess you have lots of state stores while each of them are
>>>>>>>> relatively>
>>>>>>>>> small?>
>>>>>>>>>
>>>>>>>>> Hello Tianji,>
>>>>>>>>>
>>>>>>>>> Regarding your question about maturity and users of Streams,
>>> you
>>>>> can
>>>>>>>> take a>
>>>>>>>>> look at a bunch of the blog posts written about their Streams
>>>>> usage in>
>>>>>>>>> production, for example:>
>>>>>>>>>
>>>>>>>>> http://engineering.skybettingandgaming.com/2017/01/23/
>>>>>>>> streaming-architectures/>
>>>>>>>>>
>>>>>>>>> http://developers.linecorp.com/blog/?p=3960>
>>>>>>>>>
>>>>>>>>> Guozhang>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Feb 25, 2017 at 7:52 AM, Kohki Nishio <ta...@gmail.com
>>>>
>>>>>>> wrote:>
>>>>>>>>>
>>>>>>>>>> I did a bit of research on that matter recently, the
>>> comparison
>>>>> is
>>>>>>>> between>
>>>>>>>>>> Spark Structured Streaming(SSS) and Kafka Streams,>
>>>>>>>>>>>
>>>>>>>>>> Both are relatively new (~1y) and trying to solve similar
>>>>> problems,
>>>>>>>> however>
>>>>>>>>>> if you go with Spark, you have to go with a cluster, if your
>>>>>>>> environment>
>>>>>>>>>> already have a cluster, then it's good. However our team
>>>> doesn't
>>>>> do
>>>>>>>> any>
>>>>>>>>>> Spark, so the initial cost would be very high. On the other
>>>> hand,
>>>>>>>> Kafka>
>>>>>>>>>> Streams is a java library, since we have a service framework,
>>>>> doing
>>>>>>>> stream>
>>>>>>>>>> inside a service is super easy.>
>>>>>>>>>>>
>>>>>>>>>> However for some reason, people see SSS is more mature and
>>>> Kafka
>>>>>>>> Streams is>
>>>>>>>>>> not so mature (like Beta). But old fashion stream is both
>>>> mature
>>>>>>>> enough (in>
>>>>>>>>>> my opinion), I didn't see any difference in DStream(Spark)
>>> and>
>>>>>>>>>> KStream(Kafka)>
>>>>>>>>>>>
>>>>>>>>>> DataFrame (Structured Streaming) and KTable, I found it quite
>>>>>>>> different.>
>>>>>>>>>> Kafka's model is more like a change log, that means you need
>>> to
>>>>> see
>>>>>>>> the>
>>>>>>>>>> latest entry to make a final decision. I would call this as
>>>>> 'Update'
>>>>>>>> model,>
>>>>>>>>>> whereas Spark does 'Append' model and it doesn't support
>>>> 'Update'
>>>>>>>> model>
>>>>>>>>>> yet. (it's coming to 2.2)>
>>>>>>>>>>>
>>>>>>>>>> http://spark.apache.org/docs/latest/structured-streaming-pro
>>>>
>>>>>>>>>> gramming-guide.html#output-modes>
>>>>>>>>>>>
>>>>>>>>>> I wanted to have 'Append' model with Kafka, but it seems it's
>>>> not
>>>>>>> easy>
>>>>>>>>>> thing to do, also Kafka Streams uses an internal topic to
>>> keep
>>>>> state>
>>>>>>>>>> changes for fail-over scenario, but I'm dealing with a lots
>>> of
>>>>> tiny>
>>>>>>>>>> information and I have a big concern about the size of the
>>>> state
>>>>>>> store
>>>>>>>> />
>>>>>>>>>> topic, so my decision is that I'm going with my own handling
>>> of
>>>>> Kafka
>>>>>>>> API>
>>>>>>>>>> ..>
>>>>>>>>>>>
>>>>>>>>>> If you do stateless operation and don't have a spark cluster,
>>>>> yeah
>>>>>>>> Kafka>
>>>>>>>>>> Streams is perfect.>
>>>>>>>>>> If you do stateful complicated operation and happen to have a
>>>>> spark>
>>>>>>>>>> cluster, give Spark a try>
>>>>>>>>>> else you have to write a code which is optimized for your use
>>>>> case>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> thanks>
>>>>>>>>>> -Kohki>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 24, 2017 at 6:22 PM, Tianji Li <sk...@gmail.com>
>>>>> wrote:>
>>>>>>>>>>>
>>>>>>>>>>> Hi there,>
>>>>>>>>>>>>
>>>>>>>>>>> Can anyone give a good explanation in what cases Kafka
>>>> Streams
>>>>> is>
>>>>>>>>>>> preferred, and in what cases Sparking Streaming is better?>
>>>>>>>>>>>>
>>>>>>>>>>> Thanks>
>>>>>>>>>>> Tianji>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> -->
>>>>>>>>>> Kohki Nishio>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -- >
>>>>>>>>> -- Guozhang>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Kohki Nishio
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to