Hi,
The reset tool looks like a great feature.

So following this link
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/

What I understand is that this tool resets the offsets for internal and
intermediate topics and also deletes all the internal local storage and
topics.
Please confirm this.

I was actually doing all this manually.

Thanks
Sachin


On Thu, Nov 10, 2016 at 12:05 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA512
>
> Hey,
>
> changelog topics are compacted topics and no retention time is applied
> (one exception are window-changelog topics though, which have both --
> compaction and retention policy enabled)
>
> If an input message is purged via retention time (and this is you
> latest committed offset), and you start you Stream application, it
> will resume according to "auto.offset.reset" policy what you can
> specify in StreamsConfig. So Streams will just run fine, but the data
> is of course lost.
>
> For repartitioning topics that same argument applies.
>
> > I am asking this because I am planning to keep the retention time
> > for internal changelog topics also small so no message gets big
> > enough to start getting exceptions.
>
> I don't understand this part though...
>
> To set an arbitrary start offset there is no API or tooling available
> at the moment. However, we plan to add some of this in future releases.
>
> As for now, you could set start offsets "manually" by writing a small
> consumer application, that does not process data, but only seek() to
> (and commit()) the start offsets you want to use. This is a similar
> idea as the Streams application reset tool is built on. See this blog
> post for details:
>
> https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-reset
> ting-a-streams-application/
>
> However, you should be careful to not mess with internally kept state
> (ie, make sure it is still semantically meaningful and compatible if
> you start to modify offsets...)
>
>
> Hope this helps.
>
> - -Matthias
>
> On 11/9/16 7:29 AM, Sachin Mittal wrote:
> > Hi, What happens when the message itself is purged by kafka via
> > retention time setting or something else, which was later than the
> > last offset stored by the stream consumer.
> >
> > I am asking this because I am planning to keep the retention time
> > for internal changelog topics also small so no message gets big
> > enough to start getting exceptions.
> >
> > So if messages from last offset are deleted then will there be any
> > issues?
> >
> > Also is there anyway to control or set the offset manually when we
> > re start the streaming application so certain old messages are not
> > consumed at all as logic wise they are not useful to streaming
> > application any more. Like say past users sessions created while
> > streaming application was stopped.
> >
> >
> > Thanks Sachin
> >
> >
> > On Wed, Nov 9, 2016 at 7:46 PM, Eno Thereska
> > <eno.there...@gmail.com> wrote:
> >
> >> Hi Sachin,
> >>
> >> Kafka Streams is built on top of standard Kafka consumers. For
> >> for every topic it consumes from (whether changelog topic or
> >> source topic, it doesn't matter), the consumer stores the offset
> >> it last consumed from. Upon restart, by default it start
> >> consuming from where it left off from each of the topics. So you
> >> can think of it this way: a restart should be no different than
> >> if you had left the application running (i.e., no restart).
> >>
> >> Thanks Eno
> >>
> >>
> >>> On 9 Nov 2016, at 13:59, Sachin Mittal <sjmit...@gmail.com>
> >>> wrote:
> >>>
> >>> Hi, I had some basic questions on sequence of tasks for
> >>> streaming application restart in case of failure or otherwise.
> >>>
> >>> Say my stream is structured this way
> >>>
> >>> source-topic branched into 2 kstreams source-topic-1
> >>> source-topic-2 each mapped to 2 new kstreams (new key,value
> >>> pairs) backed by 2 kafka topics source-topic-1-new
> >>> source-topic-2-new each aggregated to new ktable backed by
> >>> internal changelog topics source-topic-1-new-table
> >>> (scource-topic-1-new-changelog) source-topic-2-new-table
> >>> (scource-topic-2-new-changelog) table1 left join table2 -> to
> >>> final stream Results of final stream are then persisted into
> >>> another data storage
> >>>
> >>> So if you see I have following physical topics or state stores
> >>> source-topic source-topic-1-new source-topic-2-new
> >>> scource-topic-1-new-changelog scource-topic-2-new-changelog
> >>>
> >>> Now at a give point if the streaming application is stopped
> >>> there is some data in all these topics. Barring the
> >>> source-topic all other topic has data inserted by the
> >> streaming
> >>> application.
> >>>
> >>> Also I suppose streaming application stores the offset for each
> >>> of the topic as where it was last.
> >>>
> >>> So when I restart the application how does the processing
> >>> starts again? Will it pick the data from last left changelog
> >>> topics and process them first and then process the source topic
> >>> data from the offset last left?
> >>>
> >>> Or it will start from source topic. I really don't want it to
> >>> maintain offset to changelog tables because any old key's value
> >>> can be modified as part of aggregation again.
> >>>
> >>> Bit confused here, any light would help a lot.
> >>>
> >>> Thanks Sachin
> >>
> >>
> >
> -----BEGIN PGP SIGNATURE-----
> Comment: GPGTools - https://gpgtools.org
>
> iQIcBAEBCgAGBQJYI2xaAAoJECnhiMLycopPnSoP/1lrPCV2c6pQ4gWdBFa4Mq6l
> +QBcQ7CnM9cobbBgwsUuyzqw2tK75f37HWJOTjnkEU/0U9WKj85z9vL1GMfYTMCr
> /T1Tz9Mz1JhzHouxC7bdKVPxYWWNu2tiL65ODy6DOkRyT+TjTUKMbuvEadBAAmd1
> MySSVdEnRxq6BZOxAU5I3Xvl6tVCZiB5WBHHOeByWowdvPFwyNq8dVBdFkTOdMBC
> Kkw1+9rx2DP5IzrI5jVfUldXtC+4uFt8tBR2hVrbzPUU6auxSIClpMupeBntJLgB
> GhIvDXGwsQkYWbOb9XJAwxRsFfUrN691DamJWFrOXT+iudI/BUd3RquaWPNNXsys
> CASe9zxFUHnujb6bv21xyLJ1dCYw/91SVIDjqCGdbnZImEKwq64FbBktoLTQ9A8m
> w7ZBOtUKWLmNTiOnMS4hSyYIDtX7MbVfjSCVbYXOTjwyhnr+qZAOht+t9JOprq7T
> 485lYfB8ceueLK9vqRvUJ+fnkFosIn/+gmultKPypwnByzcZhRnfgTsxbWllZL3b
> IylX40f0EsolzWwBLoFRkJyhvWUbfVPSgJABFIjden8vDo6GWx/Oj58BLP2gfaW/
> yUIYcYychYTwHAhchrwanIFTK8y6yQdimYZRO7dNDSOQwcC+xb2dqs8g3wuztG/M
> udwtipE7kAwoMN0vgQnr
> =QX8B
> -----END PGP SIGNATURE-----
>

Reply via email to