Hi Bruno Yes, cleaning up before upgrading is probably what I'm gonna do. I was just trying to understand what's going on, as this shouldn't be required. Thanks for your help Murilo
On Mon, 15 Mar 2021 at 11:16, Bruno Cadonna <br...@confluent.io.invalid> wrote: > Hi Murilo, > > OK, now I see why you do not get an error in the second case in your > small environment where you cleaned up before upgrading. You would > restore from the earliest offset anyway and that is defined by the > earliest offset at the broker and that always exists. Hence, no out of > range exception is thrown. > > I am wondering why you get a out of range exception after upgrading > without clean up, though. > > A solution would be to clean up before upgrading in your large > environment. I do not know if this is a viable solution for you. > > Best, > Bruno > > On 15.03.21 16:01, Murilo Tavares wrote: > > Hi Bruno > > We have an environment variable that, when set, will call > > KafkaStreams.cleanup() and sleep. > > The changelog topic is an internal KafkaStreams topic, for which I'm not > > changing any policies. > > It should be some default policy for a KTable in my understanding. > > Thanks > > Murilo > > > > > > > > On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna <br...@confluent.io.invalid> > > wrote: > > > >> Hi Murilo, > >> > >> A couple of questions: > >> > >> 1. What do you mean exactly with clean up? > >> 2. Do you have acleanup policy specified on the changelog topics? > >> > >> Best, > >> Bruno > >> > >> On 15.03.21 15:03, Murilo Tavares wrote: > >>> Hi Bruno > >>> No, I haven't tested resetting the application before upgrading on my > >> large > >>> environment. But I was able to reproduce it in my dev environment, > which > >> is > >>> way smaller. > >>> This is what I did: > >>> - Clean up and downgrade to 2.4. > >>> - Let it catch up; > >>> - upgrade to 2.7; Same errors, but it caught up after a while; > >>> > >>> Then I tried these steps: > >>> - Clean up and downgrade to 2.4. > >>> - Let it catch up; > >>> - Clean up and upgrade to 2.7. No error this time. > >>> > >>> Thanks > >>> Murilo > >>> > >>> On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna <br...@confluent.io.invalid > > > >>> wrote: > >>> > >>>> Hi Murilo, > >>>> > >>>> Did you retry to upgrade again after you reset the application? Did it > >>>> work? > >>>> > >>>> Best, > >>>> Bruno > >>>> > >>>> On 15.03.21 14:26, Murilo Tavares wrote: > >>>>> Hi Bruno > >>>>> Thanks for your response. > >>>>> No, I did not reset the application prior to upgrading. That was > simply > >>>>> upgrading KafkaStreams from 2.4 to 2.7. > >>>>> > >>>>> I was able to reproduce it on a smaller environment, and it does > indeed > >>>>> recover. > >>>>> In a large environment, though, it keeps like that for hours. In this > >>>> same > >>>>> large environment, I had to downgrade the application, and when doing > >>>> that > >>>>> I did reset the application, which just took a few minutes. > >>>>> > >>>>> Thanks > >>>>> Murilo > >>>>> > >>>>> On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > <br...@confluent.io.invalid > >>> > >>>>> wrote: > >>>>> > >>>>>> Hi Murilo, > >>>>>> > >>>>>> No, you do not need any special procedure to upgrade from 2.4 to > 2.7. > >>>>>> > >>>>>> What you see in the logs is not an error but a warning. It should > not > >>>>>> block you on startup forever. The warning says that the local states > >> of > >>>>>> task 7_17 are corrupted because the offset you want to fetch of the > >>>>>> state changelog topic partition > >>>>>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is > >> larger > >>>>>> or smaller than the offsets that exist on the brokers for that > >>>>>> partition. If Streams runs into such an exception it will recreate > the > >>>>>> state from scratch which might take a while depending on the size of > >> the > >>>>>> state. > >>>>>> > >>>>>> The root cause of this warning is not clear from the information you > >>>>>> gave. Did you maybe reset the application but not wipe out the local > >>>>>> state stores? > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> On 12.03.21 19:11, Murilo Tavares wrote: > >>>>>>> Hi > >>>>>>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams > >> app > >>>> on > >>>>>>> 2.4.0. > >>>>>>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my > >> instances > >>>>>>> stuck on startup. > >>>>>>> In my understanding, I don't need any special procedure to upgraded > >>>> from > >>>>>>> KStreams 2.4.0 to 2.7.0, right? > >>>>>>> > >>>>>>> The following error stands out for me: > >>>>>>> > >>>>>>> 2021-03-12 16:23:52.005 [...] WARN > >>>>>>> org.apache.kafka.streams.processor.internals.StreamThread - > >>>>>> stream-thread > >>>>>>> [...] Detected the states of tasks > >>>>>>> > >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >>>>>> are > >>>>>>> corrupted. Will close the task as dirty and re-create and bootstrap > >>>> from > >>>>>>> scratch. > >>>>>>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > >>>>>>> changelogs > >>>>>>> > >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >>>>>> are > >>>>>>> corrupted and hence needs to be re-initialized > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > >>>>>>> [app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > >>>>>>> [app.jar:?] > >>>>>>> Caused by: > >> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > >>>>>>> Fetch position FetchPosition{offset=738, > offsetEpoch=Optional.empty, > >>>>>>> currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > >>>>>>> euw1-az1)], epoch=27}} is out of range for partition > >>>>>>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > >>>>>>> ~[app.jar:?] > >>>>>>> at > >>>>>>> > >>>>>> > >>>> > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) > >>>>>>> ~[app.jar:?] > >>>>>>> ... 4 more > >>>>>>> > >>>>>>> Any suggestions on how to upgrade? > >>>>>>> Thanks > >>>>>>> Murilo > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >