[ https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343578#comment-17343578 ]
Nico Habermann edited comment on KAFKA-12775 at 5/12/21, 8:52 PM: ------------------------------------------------------------------ [~ableegoldman] Thanks for the extra help! Our case is a bit different - we're not removing a subtopology / task, we're adding a new one. The end result is still quite the same, an internal defensive check that throws an exception because it's seeing something that should not exist. However right now I'm avoiding that entire code path by deleting all local state, so your tip of surgically deleting only certain Task Dirs (and not the entire thing) may still apply to this situation. I'll investigate - thanks! Over all though it has become quite apparent to me that we're going against Kafka Stream's interface and guarantees here and it's probably only a matter of time until we apply such a workaround in the wrong way, or a new Kafka Streams version may change internal behaviour which we overlook and we somehow corrupt our state without noticing. I'll be keeping an eye on those KIPs for topology changes though :) was (Author: nhab): [~ableegoldman] Thanks for the extra help! My case is a bit different - I'm not removing a subtopology / task, I'm adding a new one. The end result is still quite the same, an internal defensive check that throws an exception because it's seeing something that should not exist. However right now I'm avoiding that entire code path by deleting all local state, so your tip of surgically deleting only certain Task Dirs (and not the entire thing) may still apply to this situation. I'll investigate - thanks! Over all though it has become quite apparent to me that we're going against Kafka Stream's interface and guarantees here and it's probably only a matter of time until we apply such a workaround in the wrong way, or a new Kafka Streams version may change internal behaviour which we overlook and we somehow corrupt our state without noticing. I'll be keeping an eye on those KIPs for topology changes though :) > StreamsPartitionAssignor / ClientState throws an exception when a new Task > gets added to a KStreams Application in a Backwards-Compatible Topology Change > --------------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-12775 > URL: https://issues.apache.org/jira/browse/KAFKA-12775 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Nico Habermann > Priority: Major > > KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor > tries to look up the lag for a TaskId that seemingly does not > exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325]. > > I believe this is a functional regression. > Before, it was possible for Streams users to make backwards-compatible > topology changes and roll those out, without having to do a complete restore > or reload. > For example: > Existing sample topology: > > {code:java} > stream1 = stream(topic) > stream1 > .map(...) > .to(output){code} > And doing this backwards-compatible change: > {code:java} > stream1 = stream(topic) > ++ table = stream(topic2).through(repartition-topic)/repartition().toTable() > stream1 > .map(...) > ++ .join(table) > .to(output){code} > > This effectively creates a new subtopology with a new task for the table > repartition. > In older KStreams versions, it would have been possible to simply roll this > change out. > Since 2.6, rolling this out will crash the stream because the linked > exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag > tries to look up the lag for the new table-repartition-task > > At this time, the only possible way to avoid this exception seems to be > deleting all local state and doing a complete restore with the new topology > change included. > -- This message was sent by Atlassian Jira (v8.3.4#803005)