Re: Error upgrading KafkaStreams
Hi Bruno Yes, cleaning up before upgrading is probably what I'm gonna do. I was just trying to understand what's going on, as this shouldn't be required. Thanks for your help Murilo On Mon, 15 Mar 2021 at 11:16, Bruno Cadonna wrote: > Hi Murilo, > > OK, now I see why you do not get an error in the second case in your > small environment where you cleaned up before upgrading. You would > restore from the earliest offset anyway and that is defined by the > earliest offset at the broker and that always exists. Hence, no out of > range exception is thrown. > > I am wondering why you get a out of range exception after upgrading > without clean up, though. > > A solution would be to clean up before upgrading in your large > environment. I do not know if this is a viable solution for you. > > Best, > Bruno > > On 15.03.21 16:01, Murilo Tavares wrote: > > Hi Bruno > > We have an environment variable that, when set, will call > > KafkaStreams.cleanup() and sleep. > > The changelog topic is an internal KafkaStreams topic, for which I'm not > > changing any policies. > > It should be some default policy for a KTable in my understanding. > > Thanks > > Murilo > > > > > > > > On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna > > wrote: > > > >> Hi Murilo, > >> > >> A couple of questions: > >> > >> 1. What do you mean exactly with clean up? > >> 2. Do you have acleanup policy specified on the changelog topics? > >> > >> Best, > >> Bruno > >> > >> On 15.03.21 15:03, Murilo Tavares wrote: > >>> Hi Bruno > >>> No, I haven't tested resetting the application before upgrading on my > >> large > >>> environment. But I was able to reproduce it in my dev environment, > which > >> is > >>> way smaller. > >>> This is what I did: > >>> - Clean up and downgrade to 2.4. > >>> - Let it catch up; > >>> - upgrade to 2.7; Same errors, but it caught up after a while; > >>> > >>> Then I tried these steps: > >>> - Clean up and downgrade to 2.4. > >>> - Let it catch up; > >>> - Clean up and upgrade to 2.7. No error this time. > >>> > >>> Thanks > >>> Murilo > >>> > >>> On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna > > >>> wrote: > >>> > Hi Murilo, > > Did you retry to upgrade again after you reset the application? Did it > work? > > Best, > Bruno > > On 15.03.21 14:26, Murilo Tavares wrote: > > Hi Bruno > > Thanks for your response. > > No, I did not reset the application prior to upgrading. That was > simply > > upgrading KafkaStreams from 2.4 to 2.7. > > > > I was able to reproduce it on a smaller environment, and it does > indeed > > recover. > > In a large environment, though, it keeps like that for hours. In this > same > > large environment, I had to downgrade the application, and when doing > that > > I did reset the application, which just took a few minutes. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > >>> > > wrote: > > > >> Hi Murilo, > >> > >> No, you do not need any special procedure to upgrade from 2.4 to > 2.7. > >> > >> What you see in the logs is not an error but a warning. It should > not > >> block you on startup forever. The warning says that the local states > >> of > >> task 7_17 are corrupted because the offset you want to fetch of the > >> state changelog topic partition > >> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is > >> larger > >> or smaller than the offsets that exist on the brokers for that > >> partition. If Streams runs into such an exception it will recreate > the > >> state from scratch which might take a while depending on the size of > >> the > >> state. > >> > >> The root cause of this warning is not clear from the information you > >> gave. Did you maybe reset the application but not wipe out the local > >> state stores? > >> > >> Best, > >> Bruno > >> > >> On 12.03.21 19:11, Murilo Tavares wrote: > >>> Hi > >>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams > >> app > on > >>> 2.4.0. > >>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my > >> instances > >>> stuck on startup. > >>> In my understanding, I don't need any special procedure to upgraded > from > >>> KStreams 2.4.0 to 2.7.0, right? > >>> > >>> The following error stands out for me: > >>> > >>> 2021-03-12 16:23:52.005 [...] WARN > >>> org.apache.kafka.streams.processor.internals.StreamThread - > >> stream-thread > >>> [...] Detected the states of tasks > >>> > >> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >> are > >>> corrupted. Will close the task as dirty and re-create and bootstrap > from > >>> scratch. > >>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > >>> changelogs > >>> > >> {7_17=[my-as
Re: Error upgrading KafkaStreams
Hi Murilo, OK, now I see why you do not get an error in the second case in your small environment where you cleaned up before upgrading. You would restore from the earliest offset anyway and that is defined by the earliest offset at the broker and that always exists. Hence, no out of range exception is thrown. I am wondering why you get a out of range exception after upgrading without clean up, though. A solution would be to clean up before upgrading in your large environment. I do not know if this is a viable solution for you. Best, Bruno On 15.03.21 16:01, Murilo Tavares wrote: Hi Bruno We have an environment variable that, when set, will call KafkaStreams.cleanup() and sleep. The changelog topic is an internal KafkaStreams topic, for which I'm not changing any policies. It should be some default policy for a KTable in my understanding. Thanks Murilo On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna wrote: Hi Murilo, A couple of questions: 1. What do you mean exactly with clean up? 2. Do you have acleanup policy specified on the changelog topics? Best, Bruno On 15.03.21 15:03, Murilo Tavares wrote: Hi Bruno No, I haven't tested resetting the application before upgrading on my large environment. But I was able to reproduce it in my dev environment, which is way smaller. This is what I did: - Clean up and downgrade to 2.4. - Let it catch up; - upgrade to 2.7; Same errors, but it caught up after a while; Then I tried these steps: - Clean up and downgrade to 2.4. - Let it catch up; - Clean up and upgrade to 2.7. No error this time. Thanks Murilo On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna wrote: Hi Murilo, Did you retry to upgrade again after you reset the application? Did it work? Best, Bruno On 15.03.21 14:26, Murilo Tavares wrote: Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
Re: Error upgrading KafkaStreams
Hi Bruno We have an environment variable that, when set, will call KafkaStreams.cleanup() and sleep. The changelog topic is an internal KafkaStreams topic, for which I'm not changing any policies. It should be some default policy for a KTable in my understanding. Thanks Murilo On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna wrote: > Hi Murilo, > > A couple of questions: > > 1. What do you mean exactly with clean up? > 2. Do you have acleanup policy specified on the changelog topics? > > Best, > Bruno > > On 15.03.21 15:03, Murilo Tavares wrote: > > Hi Bruno > > No, I haven't tested resetting the application before upgrading on my > large > > environment. But I was able to reproduce it in my dev environment, which > is > > way smaller. > > This is what I did: > > - Clean up and downgrade to 2.4. > > - Let it catch up; > > - upgrade to 2.7; Same errors, but it caught up after a while; > > > > Then I tried these steps: > > - Clean up and downgrade to 2.4. > > - Let it catch up; > > - Clean up and upgrade to 2.7. No error this time. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna > > wrote: > > > >> Hi Murilo, > >> > >> Did you retry to upgrade again after you reset the application? Did it > >> work? > >> > >> Best, > >> Bruno > >> > >> On 15.03.21 14:26, Murilo Tavares wrote: > >>> Hi Bruno > >>> Thanks for your response. > >>> No, I did not reset the application prior to upgrading. That was simply > >>> upgrading KafkaStreams from 2.4 to 2.7. > >>> > >>> I was able to reproduce it on a smaller environment, and it does indeed > >>> recover. > >>> In a large environment, though, it keeps like that for hours. In this > >> same > >>> large environment, I had to downgrade the application, and when doing > >> that > >>> I did reset the application, which just took a few minutes. > >>> > >>> Thanks > >>> Murilo > >>> > >>> On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > > >>> wrote: > >>> > Hi Murilo, > > No, you do not need any special procedure to upgrade from 2.4 to 2.7. > > What you see in the logs is not an error but a warning. It should not > block you on startup forever. The warning says that the local states > of > task 7_17 are corrupted because the offset you want to fetch of the > state changelog topic partition > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is > larger > or smaller than the offsets that exist on the brokers for that > partition. If Streams runs into such an exception it will recreate the > state from scratch which might take a while depending on the size of > the > state. > > The root cause of this warning is not clear from the information you > gave. Did you maybe reset the application but not wipe out the local > state stores? > > Best, > Bruno > > On 12.03.21 19:11, Murilo Tavares wrote: > > Hi > > I have Kafka brokers running on version 2.4.1, with a KafkaStreams > app > >> on > > 2.4.0. > > I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my > instances > > stuck on startup. > > In my understanding, I don't need any special procedure to upgraded > >> from > > KStreams 2.4.0 to 2.7.0, right? > > > > The following error stands out for me: > > > > 2021-03-12 16:23:52.005 [...] WARN > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [...] Detected the states of tasks > > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted. Will close the task as dirty and re-create and bootstrap > >> from > > scratch. > > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > > changelogs > > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted and hence needs to be re-initialized > > at > > > > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > > ~[app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > > ~[app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > > ~[app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > > [app.jar:?] > > at > > > > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > > [app.jar:?] > > Caused by: > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > > Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > > currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > > euw1-az1)], epoch=27}} is out of range for partition > > my-assem
Re: Error upgrading KafkaStreams
Hi Murilo, A couple of questions: 1. What do you mean exactly with clean up? 2. Do you have acleanup policy specified on the changelog topics? Best, Bruno On 15.03.21 15:03, Murilo Tavares wrote: Hi Bruno No, I haven't tested resetting the application before upgrading on my large environment. But I was able to reproduce it in my dev environment, which is way smaller. This is what I did: - Clean up and downgrade to 2.4. - Let it catch up; - upgrade to 2.7; Same errors, but it caught up after a while; Then I tried these steps: - Clean up and downgrade to 2.4. - Let it catch up; - Clean up and upgrade to 2.7. No error this time. Thanks Murilo On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna wrote: Hi Murilo, Did you retry to upgrade again after you reset the application? Did it work? Best, Bruno On 15.03.21 14:26, Murilo Tavares wrote: Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ~[app.jar:?] ... 4 more Any suggestions on how to upgrade? Thanks Murilo
Re: Error upgrading KafkaStreams
Hi Bruno No, I haven't tested resetting the application before upgrading on my large environment. But I was able to reproduce it in my dev environment, which is way smaller. This is what I did: - Clean up and downgrade to 2.4. - Let it catch up; - upgrade to 2.7; Same errors, but it caught up after a while; Then I tried these steps: - Clean up and downgrade to 2.4. - Let it catch up; - Clean up and upgrade to 2.7. No error this time. Thanks Murilo On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna wrote: > Hi Murilo, > > Did you retry to upgrade again after you reset the application? Did it > work? > > Best, > Bruno > > On 15.03.21 14:26, Murilo Tavares wrote: > > Hi Bruno > > Thanks for your response. > > No, I did not reset the application prior to upgrading. That was simply > > upgrading KafkaStreams from 2.4 to 2.7. > > > > I was able to reproduce it on a smaller environment, and it does indeed > > recover. > > In a large environment, though, it keeps like that for hours. In this > same > > large environment, I had to downgrade the application, and when doing > that > > I did reset the application, which just took a few minutes. > > > > Thanks > > Murilo > > > > On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna > > wrote: > > > >> Hi Murilo, > >> > >> No, you do not need any special procedure to upgrade from 2.4 to 2.7. > >> > >> What you see in the logs is not an error but a warning. It should not > >> block you on startup forever. The warning says that the local states of > >> task 7_17 are corrupted because the offset you want to fetch of the > >> state changelog topic partition > >> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger > >> or smaller than the offsets that exist on the brokers for that > >> partition. If Streams runs into such an exception it will recreate the > >> state from scratch which might take a while depending on the size of the > >> state. > >> > >> The root cause of this warning is not clear from the information you > >> gave. Did you maybe reset the application but not wipe out the local > >> state stores? > >> > >> Best, > >> Bruno > >> > >> On 12.03.21 19:11, Murilo Tavares wrote: > >>> Hi > >>> I have Kafka brokers running on version 2.4.1, with a KafkaStreams app > on > >>> 2.4.0. > >>> I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances > >>> stuck on startup. > >>> In my understanding, I don't need any special procedure to upgraded > from > >>> KStreams 2.4.0 to 2.7.0, right? > >>> > >>> The following error stands out for me: > >>> > >>> 2021-03-12 16:23:52.005 [...] WARN > >>>org.apache.kafka.streams.processor.internals.StreamThread - > >> stream-thread > >>> [...] Detected the states of tasks > >>> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >> are > >>> corrupted. Will close the task as dirty and re-create and bootstrap > from > >>> scratch. > >>> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > >>> changelogs > >>> {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > >> are > >>> corrupted and hence needs to be re-initialized > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > >>> [app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > >>> [app.jar:?] > >>> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > >>> Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > >>> currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > >>> euw1-az1)], epoch=27}} is out of range for partition > >>> my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 > >>> at > >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > >>> ~[app.jar:?] > >>> at > >>> > >> > org.apache.kafka.streams.processor.internals.StoreC
Re: Error upgrading KafkaStreams
Hi Murilo, Did you retry to upgrade again after you reset the application? Did it work? Best, Bruno On 15.03.21 14:26, Murilo Tavares wrote: Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ~[app.jar:?] ... 4 more Any suggestions on how to upgrade? Thanks Murilo
Re: Error upgrading KafkaStreams
Hi Bruno Thanks for your response. No, I did not reset the application prior to upgrading. That was simply upgrading KafkaStreams from 2.4 to 2.7. I was able to reproduce it on a smaller environment, and it does indeed recover. In a large environment, though, it keeps like that for hours. In this same large environment, I had to downgrade the application, and when doing that I did reset the application, which just took a few minutes. Thanks Murilo On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna wrote: > Hi Murilo, > > No, you do not need any special procedure to upgrade from 2.4 to 2.7. > > What you see in the logs is not an error but a warning. It should not > block you on startup forever. The warning says that the local states of > task 7_17 are corrupted because the offset you want to fetch of the > state changelog topic partition > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger > or smaller than the offsets that exist on the brokers for that > partition. If Streams runs into such an exception it will recreate the > state from scratch which might take a while depending on the size of the > state. > > The root cause of this warning is not clear from the information you > gave. Did you maybe reset the application but not wipe out the local > state stores? > > Best, > Bruno > > On 12.03.21 19:11, Murilo Tavares wrote: > > Hi > > I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on > > 2.4.0. > > I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances > > stuck on startup. > > In my understanding, I don't need any special procedure to upgraded from > > KStreams 2.4.0 to 2.7.0, right? > > > > The following error stands out for me: > > > > 2021-03-12 16:23:52.005 [...] WARN > > org.apache.kafka.streams.processor.internals.StreamThread - > stream-thread > > [...] Detected the states of tasks > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted. Will close the task as dirty and re-create and bootstrap from > > scratch. > > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with > > changelogs > > {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} > are > > corrupted and hence needs to be re-initialized > > at > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > > [app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > > [app.jar:?] > > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: > > Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, > > currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: > > euw1-az1)], epoch=27}} is out of range for partition > > my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 > > at > > > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) > > ~[app.jar:?] > > at > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > > ~[app.jar:?] > > at > > > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) > > ~[app.jar:?] > > ... 4 more > > > > Any suggestions on how to upgrade? > > Thanks > > Murilo > > >
Re: Error upgrading KafkaStreams
Hi Murilo, No, you do not need any special procedure to upgrade from 2.4 to 2.7. What you see in the logs is not an error but a warning. It should not block you on startup forever. The warning says that the local states of task 7_17 are corrupted because the offset you want to fetch of the state changelog topic partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger or smaller than the offsets that exist on the brokers for that partition. If Streams runs into such an exception it will recreate the state from scratch which might take a while depending on the size of the state. The root cause of this warning is not clear from the information you gave. Did you maybe reset the application but not wipe out the local state stores? Best, Bruno On 12.03.21 19:11, Murilo Tavares wrote: Hi I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on 2.4.0. I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances stuck on startup. In my understanding, I don't need any special procedure to upgraded from KStreams 2.4.0 to 2.7.0, right? The following error stands out for me: 2021-03-12 16:23:52.005 [...] WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [...] Detected the states of tasks {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted. Will close the task as dirty and re-create and bootstrap from scratch. org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs {7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are corrupted and hence needs to be re-initialized at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) [app.jar:?] Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack: euw1-az1)], epoch=27}} is out of range for partition my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 at org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) ~[app.jar:?] ... 4 more Any suggestions on how to upgrade? Thanks Murilo