[ 
https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939878#comment-16939878
 ] 

Vasiliy commented on KAFKA-6054:
--------------------------------

Hi all,

we got error *Kafka Streams error “TaskAssignmentException: unable to decode 
subscription data: version=4”*

During deployment with only changed Kafka-Streams version from {{1.1.1}} to 
{{2.x.x}} (without changing _{{application.id}}_), we got exceptions on app 
node with older Kafka-Streams version and, as a result, Kafka streams changed 
state to error and closed, meanwhile app node with new Kafka-Streams version 
consumes messages fine.

If we upgrade from {{1.1.1}} to {{2.0.0}}, got error _{{unable to decode 
subscription data: version=3}}_; if from {{1.1.1}} to {{2.3.0}}: _{{unable to 
decode subscription data: version=4}}_. It might be really painful during 
canary deployment, e.g. we have 3 app nodes with previous Kafka-Streams 
version, and when we add one more node with a new version, all existing 3 nodes 
will be in error state. Issue is reproducible in 100% cases and not depend on 
the number of app instances with previous Kafka Streams version (an error 
occurred for both cases with either one or three app nodes having Kafka Streams 
1.1.1, during deployment time the first app node with new Kafka Streams 
version).

Error stack trace:

 
{code:java}
TaskAssignmentException: unable to decode subscription data: version=4
    at 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:128)
    at 
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:297)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:358)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802)
    at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
    at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
{code}
Issue is reproducible on both Kafka broker versions {{1.1.0}} and {{2.1.1}}, 
even with the simple Kafka-Streams DSL example:

 
 Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("default.key.serde", 
"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("default.value.serde", 
"org.apache.kafka.common.serialization.Serdes$StringSerde");
props.put("application.id", "xxx");

StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.<String, String>stream("source")
        .mapValues(value -> value + value)
        .to("destination");
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
Seems it's a bug of Kafka Streams.

> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when 
> upgrading from 0.10.0.0 to 0.10.2.1
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: kip
>             Fix For: 0.10.1.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>
>
> KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling 
> upgrade of the app, so that one point, there were both 0.10.0.0-based 
> instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo 
> -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
>         at 
> org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
>         at 
> org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>         
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you 
> have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, 
> because the internal version number of the protocol changed when adding 
> Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to