Hi, The reset tool looks like a great feature. So following this link https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
What I understand is that this tool resets the offsets for internal and intermediate topics and also deletes all the internal local storage and topics. Please confirm this. I was actually doing all this manually. Thanks Sachin On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax <matth...@confluent.io> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA512 > > Hey, > > changelog topics are compacted topics and no retention time is applied > (one exception are window-changelog topics though, which have both -- > compaction and retention policy enabled) > > If an input message is purged via retention time (and this is you > latest committed offset), and you start you Stream application, it > will resume according to "auto.offset.reset" policy what you can > specify in StreamsConfig. So Streams will just run fine, but the data > is of course lost. > > For repartitioning topics that same argument applies. > > > I am asking this because I am planning to keep the retention time > > for internal changelog topics also small so no message gets big > > enough to start getting exceptions. > > I don't understand this part though... > > To set an arbitrary start offset there is no API or tooling available > at the moment. However, we plan to add some of this in future releases. > > As for now, you could set start offsets "manually" by writing a small > consumer application, that does not process data, but only seek() to > (and commit()) the start offsets you want to use. This is a similar > idea as the Streams application reset tool is built on. See this blog > post for details: > > https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset > ting-a-streams-application/ > > However, you should be careful to not mess with internally kept state > (ie, make sure it is still semantically meaningful and compatible if > you start to modify offsets...) > > > Hope this helps. > > - -Matthias > > On 11/9/16 7:29 AM, Sachin Mittal wrote: > > Hi, What happens when the message itself is purged by kafka via > > retention time setting or something else, which was later than the > > last offset stored by the stream consumer. > > > > I am asking this because I am planning to keep the retention time > > for internal changelog topics also small so no message gets big > > enough to start getting exceptions. > > > > So if messages from last offset are deleted then will there be any > > issues? > > > > Also is there anyway to control or set the offset manually when we > > re start the streaming application so certain old messages are not > > consumed at all as logic wise they are not useful to streaming > > application any more. Like say past users sessions created while > > streaming application was stopped. > > > > > > Thanks Sachin > > > > > > On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska > > <eno.there...@gmail.com> wrote: > > > >> Hi Sachin, > >> > >> Kafka Streams is built on top of standard Kafka consumers. For > >> for every topic it consumes from (whether changelog topic or > >> source topic, it doesn't matter), the consumer stores the offset > >> it last consumed from. Upon restart, by default it start > >> consuming from where it left off from each of the topics. So you > >> can think of it this way: a restart should be no different than > >> if you had left the application running (i.e., no restart). > >> > >> Thanks Eno > >> > >> > >>> On 9 Nov 2016, at 13:59, Sachin Mittal <sjmit...@gmail.com> > >>> wrote: > >>> > >>> Hi, I had some basic questions on sequence of tasks for > >>> streaming application restart in case of failure or otherwise. > >>> > >>> Say my stream is structured this way > >>> > >>> source-topic branched into 2 kstreams source-topic-1 > >>> source-topic-2 each mapped to 2 new kstreams (new key,value > >>> pairs) backed by 2 kafka topics source-topic-1-new > >>> source-topic-2-new each aggregated to new ktable backed by > >>> internal changelog topics source-topic-1-new-table > >>> (scource-topic-1-new-changelog) source-topic-2-new-table > >>> (scource-topic-2-new-changelog) table1 left join table2 -> to > >>> final stream Results of final stream are then persisted into > >>> another data storage > >>> > >>> So if you see I have following physical topics or state stores > >>> source-topic source-topic-1-new source-topic-2-new > >>> scource-topic-1-new-changelog scource-topic-2-new-changelog > >>> > >>> Now at a give point if the streaming application is stopped > >>> there is some data in all these topics. Barring the > >>> source-topic all other topic has data inserted by the > >> streaming > >>> application. > >>> > >>> Also I suppose streaming application stores the offset for each > >>> of the topic as where it was last. > >>> > >>> So when I restart the application how does the processing > >>> starts again? Will it pick the data from last left changelog > >>> topics and process them first and then process the source topic > >>> data from the offset last left? > >>> > >>> Or it will start from source topic. I really don't want it to > >>> maintain offset to changelog tables because any old key's value > >>> can be modified as part of aggregation again. > >>> > >>> Bit confused here, any light would help a lot. > >>> > >>> Thanks Sachin > >> > >> > > > -----BEGIN PGP SIGNATURE----- > Comment: GPGTools - https://gpgtools.org > > iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l > +QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr > /T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1 > MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC > Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB > GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys > CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m > w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T > 485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b > IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/ > yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M > udwtipE7kAwoMN0vgQnr > =QX8B > -----END PGP SIGNATURE----- >