[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343578#comment-17343578
 ] 

Nico Habermann edited comment on KAFKA-12775 at 5/12/21, 8:52 PM:
------------------------------------------------------------------

[~ableegoldman] Thanks for the extra help! Our case is a bit different - we're 
not removing a subtopology / task, we're adding a new one. The end result is 
still quite the same, an internal defensive check that throws an exception 
because it's seeing something that should not exist. However right now I'm 
avoiding that entire code path by deleting all local state, so your tip of 
surgically deleting only certain Task Dirs (and not the entire thing) may still 
apply to this situation. I'll investigate - thanks!

Over all though it has become quite apparent to me that we're going against 
Kafka Stream's interface and guarantees here and it's probably only a matter of 
time until we apply such a workaround in the wrong way, or a new Kafka Streams 
version may change internal behaviour which we overlook and we somehow corrupt 
our state without noticing. I'll be keeping an eye on those KIPs for topology 
changes though :)


was (Author: nhab):
[~ableegoldman] Thanks for the extra help! My case is a bit different - I'm not 
removing a subtopology / task, I'm adding a new one. The end result is still 
quite the same, an internal defensive check that throws an exception because 
it's seeing something that should not exist. However right now I'm avoiding 
that entire code path by deleting all local state, so your tip of surgically 
deleting only certain Task Dirs (and not the entire thing) may still apply to 
this situation. I'll investigate - thanks!

Over all though it has become quite apparent to me that we're going against 
Kafka Stream's interface and guarantees here and it's probably only a matter of 
time until we apply such a workaround in the wrong way, or a new Kafka Streams 
version may change internal behaviour which we overlook and we somehow corrupt 
our state without noticing. I'll be keeping an eye on those KIPs for topology 
changes though :)

> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> ---------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12775
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Nico Habermann
>            Priority: Major
>
> KAFKA-6145 and KAFKA-10079 added an [exception if the Partition Assignor 
> tries to look up the lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to