Hi Bruno
Yes, cleaning up before upgrading is probably what I'm gonna do.
I was just trying to understand what's going on, as this shouldn't be
required.
Thanks for your help
Murilo


On Mon, 15 Mar 2021 at 11:16, Bruno Cadonna <br...@confluent.io.invalid>
wrote:

> Hi Murilo,
>
> OK, now I see why you do not get an error in the second case in your
> small environment where you cleaned up before upgrading. You would
> restore from the earliest offset anyway and that is defined by the
> earliest offset at the broker and that always exists. Hence, no out of
> range exception is thrown.
>
> I am wondering why you get a out of range exception after upgrading
> without clean up, though.
>
> A solution would be to clean up before upgrading in your large
> environment. I do not know if this is a viable solution for you.
>
> Best,
> Bruno
>
> On 15.03.21 16:01, Murilo Tavares wrote:
> > Hi Bruno
> > We have an environment variable that, when set, will call
> > KafkaStreams.cleanup() and sleep.
> > The changelog topic is an internal KafkaStreams topic, for which I'm not
> > changing any policies.
> > It should be some default policy for a KTable in my understanding.
> > Thanks
> > Murilo
> >
> >
> >
> > On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna <br...@confluent.io.invalid>
> > wrote:
> >
> >> Hi Murilo,
> >>
> >> A couple of questions:
> >>
> >> 1. What do you mean exactly with clean up?
> >> 2. Do you have acleanup policy specified on the changelog topics?
> >>
> >> Best,
> >> Bruno
> >>
> >> On 15.03.21 15:03, Murilo Tavares wrote:
> >>> Hi Bruno
> >>> No, I haven't tested resetting the application before upgrading on my
> >> large
> >>> environment. But I was able to reproduce it in my dev environment,
> which
> >> is
> >>> way smaller.
> >>> This is what I did:
> >>> - Clean up and downgrade to 2.4.
> >>> - Let it catch up;
> >>> - upgrade to 2.7; Same errors, but it caught up after a while;
> >>>
> >>> Then I tried these steps:
> >>> - Clean up and downgrade to 2.4.
> >>> - Let it catch up;
> >>> - Clean up and upgrade to 2.7. No error this time.
> >>>
> >>> Thanks
> >>> Murilo
> >>>
> >>> On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna <br...@confluent.io.invalid
> >
> >>> wrote:
> >>>
> >>>> Hi Murilo,
> >>>>
> >>>> Did you retry to upgrade again after you reset the application? Did it
> >>>> work?
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 15.03.21 14:26, Murilo Tavares wrote:
> >>>>> Hi Bruno
> >>>>> Thanks for your response.
> >>>>> No, I did not reset the application prior to upgrading. That was
> simply
> >>>>> upgrading KafkaStreams from 2.4 to 2.7.
> >>>>>
> >>>>> I was able to reproduce it on a smaller environment, and it does
> indeed
> >>>>> recover.
> >>>>> In a large environment, though, it keeps like that for hours. In this
> >>>> same
> >>>>> large environment, I had to downgrade the application, and when doing
> >>>> that
> >>>>> I did reset the application, which just took a few minutes.
> >>>>>
> >>>>> Thanks
> >>>>> Murilo
> >>>>>
> >>>>> On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna
> <br...@confluent.io.invalid
> >>>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Murilo,
> >>>>>>
> >>>>>> No, you do not need any special procedure to upgrade from 2.4 to
> 2.7.
> >>>>>>
> >>>>>> What you see in the logs is not an error but a warning. It should
> not
> >>>>>> block you on startup forever. The warning says that the local states
> >> of
> >>>>>> task 7_17 are corrupted because the offset you want to fetch of the
> >>>>>> state changelog topic partition
> >>>>>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is
> >> larger
> >>>>>> or smaller than the offsets that exist on the brokers for that
> >>>>>> partition. If Streams runs into such an exception it will recreate
> the
> >>>>>> state from scratch which might take a while depending on the size of
> >> the
> >>>>>> state.
> >>>>>>
> >>>>>> The root cause of this warning is not clear from the information you
> >>>>>> gave. Did you maybe reset the application but not wipe out the local
> >>>>>> state stores?
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 12.03.21 19:11, Murilo Tavares wrote:
> >>>>>>> Hi
> >>>>>>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams
> >> app
> >>>> on
> >>>>>>> 2.4.0.
> >>>>>>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my
> >> instances
> >>>>>>> stuck on startup.
> >>>>>>> In my understanding, I don't need any special procedure to upgraded
> >>>> from
> >>>>>>> KStreams 2.4.0 to 2.7.0, right?
> >>>>>>>
> >>>>>>> The following error stands out for me:
> >>>>>>>
> >>>>>>> 2021-03-12 16:23:52.005 [...] WARN
> >>>>>>>      org.apache.kafka.streams.processor.internals.StreamThread -
> >>>>>> stream-thread
> >>>>>>> [...] Detected the states of tasks
> >>>>>>>
> >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> >>>>>> are
> >>>>>>> corrupted. Will close the task as dirty and re-create and bootstrap
> >>>> from
> >>>>>>> scratch.
> >>>>>>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
> >>>>>>> changelogs
> >>>>>>>
> >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
> >>>>>> are
> >>>>>>> corrupted and hence needs to be re-initialized
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
> >>>>>>> [app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
> >>>>>>> [app.jar:?]
> >>>>>>> Caused by:
> >> org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
> >>>>>>> Fetch position FetchPosition{offset=738,
> offsetEpoch=Optional.empty,
> >>>>>>> currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
> >>>>>>> euw1-az1)], epoch=27}} is out of range for partition
> >>>>>>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
> >>>>>>> ~[app.jar:?]
> >>>>>>> at
> >>>>>>>
> >>>>>>
> >>>>
> >>
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
> >>>>>>> ~[app.jar:?]
> >>>>>>> ... 4 more
> >>>>>>>
> >>>>>>> Any suggestions on how to upgrade?
> >>>>>>> Thanks
> >>>>>>> Murilo
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to