Hi Murilo,

A couple of questions:

1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?

Best,
Bruno

On 15.03.21 15:03, Murilo Tavares wrote:
Hi Bruno
No, I haven't tested resetting the application before upgrading on my large
environment. But I was able to reproduce it in my dev environment, which is
way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna <br...@confluent.io.invalid>
wrote:

Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it
work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:
Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this
same
large environment, I had to downgrade the application, and when doing
that
I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna <br...@confluent.io.invalid>
wrote:

Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of the
state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:
Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app
on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded
from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
    org.apache.kafka.streams.processor.internals.StreamThread -
stream-thread
[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
are
corrupted. Will close the task as dirty and re-create and bootstrap
from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}
are
corrupted and hence needs to be re-initialized
at


org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at


org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
~[app.jar:?]
at


org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
~[app.jar:?]
at


org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo





Reply via email to