[ 
https://issues.apache.org/jira/browse/KAFKA-12775?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Habermann updated KAFKA-12775:
-----------------------------------
    Description: 
KAFKA-6145 added an [exception if the Partition Assignor tries to look up the 
lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 

  was:
KAFKA-6145 added an [exception if the new High Availability Assignor tries to 
look up the lag for a TaskId that seemingly does not 
exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].

 

I believe this is a functional regression.

Before, it was possible for Streams users to make backwards-compatible topology 
changes and roll those out, without having to do a complete restore or reload.

For example:

Existing sample topology:
 
{code:java}
stream1 = stream(topic)

stream1
  .map(...)
  .to(output){code}

And doing this backwards-compatible change:

{code:java}
stream1 = stream(topic)
++ table = stream(topic2).through(repartition-topic)/repartition().toTable()

stream1
  .map(...)
++  .join(table)
  .to(output){code}
 

This effectively creates a new subtopology with a new task for the table 
repartition.

In older KStreams versions, it would have been possible to simply roll this 
change out.
Since 2.6, rolling this out will crash the stream because the linked exception 
gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  tries to look 
up the lag for the new table-repartition-task

 

At this time, the only possible way to avoid this exception seems to be 
deleting all local state and doing a complete restore with the new topology 
change included.

 


> StreamsPartitionAssignor / ClientState throws an exception when a new Task 
> gets added to a KStreams Application in a Backwards-Compatible Topology Change
> ---------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12775
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12775
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Nico Habermann
>            Priority: Major
>
> KAFKA-6145 added an [exception if the Partition Assignor tries to look up the 
> lag for a TaskId that seemingly does not 
> exist|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java#L325].
>  
> I believe this is a functional regression.
> Before, it was possible for Streams users to make backwards-compatible 
> topology changes and roll those out, without having to do a complete restore 
> or reload.
> For example:
> Existing sample topology:
>  
> {code:java}
> stream1 = stream(topic)
> stream1
>   .map(...)
>   .to(output){code}
> And doing this backwards-compatible change:
> {code:java}
> stream1 = stream(topic)
> ++ table = stream(topic2).through(repartition-topic)/repartition().toTable()
> stream1
>   .map(...)
> ++  .join(table)
>   .to(output){code}
>  
> This effectively creates a new subtopology with a new task for the table 
> repartition.
> In older KStreams versions, it would have been possible to simply roll this 
> change out.
> Since 2.6, rolling this out will crash the stream because the linked 
> exception gets thrown when StreamsPartitionAssignor#getPreviousTasksByLag  
> tries to look up the lag for the new table-repartition-task
>  
> At this time, the only possible way to avoid this exception seems to be 
> deleting all local state and doing a complete restore with the new topology 
> change included.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to