[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16939878#comment-16939878 ] Vasiliy commented on KAFKA-6054: Hi all, we got error *Kafka Streams error “TaskAssignmentException: unable to decode subscription data: version=4”* During deployment with only changed Kafka-Streams version from {{1.1.1}} to {{2.x.x}} (without changing _{{application.id}}_), we got exceptions on app node with older Kafka-Streams version and, as a result, Kafka streams changed state to error and closed, meanwhile app node with new Kafka-Streams version consumes messages fine. If we upgrade from {{1.1.1}} to {{2.0.0}}, got error _{{unable to decode subscription data: version=3}}_; if from {{1.1.1}} to {{2.3.0}}: _{{unable to decode subscription data: version=4}}_. It might be really painful during canary deployment, e.g. we have 3 app nodes with previous Kafka-Streams version, and when we add one more node with a new version, all existing 3 nodes will be in error state. Issue is reproducible in 100% cases and not depend on the number of app instances with previous Kafka Streams version (an error occurred for both cases with either one or three app nodes having Kafka Streams 1.1.1, during deployment time the first app node with new Kafka Streams version). Error stack trace: {code:java} TaskAssignmentException: unable to decode subscription data: version=4 at org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:128) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:297) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:358) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:520) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:93) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:472) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:822) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:802) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719) {code} Issue is reproducible on both Kafka broker versions {{1.1.0}} and {{2.1.1}}, even with the simple Kafka-Streams DSL example: Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("default.key.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put("default.value.serde", "org.apache.kafka.common.serialization.Serdes$StringSerde"); props.put("application.id", "xxx"); StreamsBuilder streamsBuilder = new StreamsBuilder(); streamsBuilder.stream("source"
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16442058#comment-16442058 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4880: KAFKA-6054: Update Kafka Streams metadata to version 3 URL: https://github.com/apache/kafka/pull/4880 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/build.gradle b/build.gradle index f8daf2fdddc..5b0e6496c2e 100644 --- a/build.gradle +++ b/build.gradle @@ -1087,6 +1087,18 @@ project(':streams:upgrade-system-tests-10') { } } +project(':streams:upgrade-system-tests-11') { + archivesBaseName = "kafka-streams-upgrade-system-tests-11" + + dependencies { +testCompile libs.kafkaStreams_11 + } + + systemTestLibs { +dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index effe763ac45..a6ef5dddeec 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -67,6 +67,7 @@ versions += [ kafka_0102: "0.10.2.1", kafka_0110: "0.11.0.2", kafka_10: "1.0.1", + kafka_11: "1.1.0", lz4: "1.4.1", metrics: "2.2.0", // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta @@ -115,6 +116,7 @@ libs += [ kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102", kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110", kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10", + kafkaStreams_11: "org.apache.kafka:kafka-streams:$versions.kafka_11", log4j: "log4j:log4j:$versions.log4j", lz4: "org.lz4:lz4-java:$versions.lz4", metrics: "com.yammer.metrics:metrics-core:$versions.metrics", diff --git a/settings.gradle b/settings.gradle index 03136849fd5..2a7977cfc93 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,5 +15,6 @@ include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:test-utils', 'streams:examples', 'streams:upgrade-system-tests-0100', 'streams:upgrade-system-tests-0101', 'streams:upgrade-system-tests-0102', -'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'log4j-appender', -'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', 'jmh-benchmarks' +'streams:upgrade-system-tests-0110', 'streams:upgrade-system-tests-10', 'streams:upgrade-system-tests-11', +'log4j-appender', 'connect:api', 'connect:transforms', 'connect:runtime', 'connect:json', 'connect:file', +'jmh-benchmarks' diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 819bebd43b6..65b1da6dede 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -172,6 +172,31 @@ */ public static final String UPGRADE_FROM_0100 = "0.10.0"; +/** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}. + */ +public static final String UPGRADE_FROM_0101 = "0.10.1"; + +/** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}. + */ +public static final String UPGRADE_FROM_0102 = "0.10.2"; + +/** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}. + */ +public static final String UPGRADE_FROM_0110 = "0.11.0"; + +/** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}. + */ +public static final String UPGRADE_FROM_10 = "1.0"; + +/** + * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}. + */ +public static final String UPGRADE_FROM_11 = "1.1"; + /** * Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees. */ @@ -347,8 +372,9 @@ /** {@code upgrade.from} */ public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; -public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " + -"Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x)."; +public static fina
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439502#comment-16439502 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4880: KAFKA-6054: Update Kafka Streams metadata to version 3 URL: https://github.com/apache/kafka/pull/4880 - adds Streams upgrade tests for 1.1 release This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue th
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16423359#comment-16423359 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4768: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4768 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index fc89f25d9d2..81b423afa1e 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index 7a3b4119f81..727bdeae8a5 100644 --- a/build.gradle +++ b/build.gradle @@ -961,6 +961,54 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { +testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0102') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0102" + + dependencies { +testCompile libs.kafkaStreams_0102 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0110') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0110" + + dependencies { +testCompile libs.kafkaStreams_0110 + } + + systemTestLibs { +dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index acff3f625bc..bc7a2bd5779 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -181,7 +181,7 @@ files="SmokeTestDriver.java"/> + files="KStreamKStreamJoinTest.java|SmokeTestDriver.java"/> diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 4756387bba8..4853f92ed6a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -16,7 +16,8 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; import javax.security.auth.Subject; import javax.security.auth.callback.Ca
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415012#comment-16415012 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4746: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4746 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 1f5140b10c8..77123ff8093 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,48 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + for file in "$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index d221d965c5e..2a540211018 100644 --- a/build.gradle +++ b/build.gradle @@ -776,6 +776,19 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 93b92bb52a3..dbbb9127199 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -918,7 +918,7 @@ public void onFailure(RuntimeException e) { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { -log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); +log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 212d701..74887483354 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -316,7 +316,7 @@ public int hashCode() { Field f = this.schema.get(i); if (f.type() instanceof ArrayOf) { if (this.get(f) != null) { -Object[] arrayObject = (Object []) this.get(f); +Object[] arrayObject = (Object[]) this.get(f); for (Object arrayItem: arrayObject) result = prime * result + arrayItem.hashCode(); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslC
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414976#comment-16414976 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4779: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4779 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 in
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414965#comment-16414965 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4761: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4761 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index fe6aefd7321..8e2ba91bf2b 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index ce4b4e44cb2..17f3e00358d 100644 --- a/build.gradle +++ b/build.gradle @@ -909,6 +909,42 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { +testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0102') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0102" + + dependencies { +testCompile libs.kafkaStreams_0102 + } + + systemTestLibs { +dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 7111bad6054..7102414628a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; @@ -25,10 +27,7 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.auth.AuthCallbackHandler; +import java.util.Map; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism diff --git a/docs/streams/upgrad
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414777#comment-16414777 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4758: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4758 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index af10f61b5c4..a25868125e9 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index 20a184c437c..5e97f901cb6 100644 --- a/build.gradle +++ b/build.gradle @@ -770,6 +770,30 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { +testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { +dependsOn testJar + } +} + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 6094b547bb7..b80dfccf3d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -17,7 +17,9 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; @@ -26,10 +28,7 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.auth.AuthCallbackHandler; +import java.util.Map; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism diff --git a/docs/streams.html b/docs/streams.html index fe0e84ee3b7..d691e63a432 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -807,21 +807,50 @@ Upgrade Guid See below a complete list of 0.10.2 API and semantical changes that allow you to advance y
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413272#comment-16413272 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4773: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4773 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 i
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16412266#comment-16412266 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4768: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4768 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 i
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410797#comment-16410797 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4761: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4761 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 i
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16410019#comment-16410019 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4758: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4758 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 in
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16407503#comment-16407503 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4746: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4746 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 in
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16386548#comment-16386548 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4630: KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path URL: https://github.com/apache/kafka/pull/4630 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 6b3626101bd..47becfc239b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -226,11 +226,11 @@ public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = "default.key.serde"; private static final String DEFAULT_KEY_SERDE_CLASS_DOC = " Default serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface."; -/** {@code default timestamp.extractor} */ +/** {@code default.timestamp.extractor} */ public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; -/** {@code default value.serde} */ +/** {@code default.value.serde} */ public static final String DEFAULT_VALUE_SERDE_CLASS_CONFIG = "default.value.serde"; private static final String DEFAULT_VALUE_SERDE_CLASS_DOC = "Default serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface."; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 9aa0e94c8c1..71a84b2ca73 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -66,7 +66,8 @@ public final TaskId taskId; public final TopicPartition partition; -AssignedPartition(final TaskId taskId, final TopicPartition partition) { +AssignedPartition(final TaskId taskId, + final TopicPartition partition) { this.taskId = taskId; this.partition = partition; } @@ -77,11 +78,11 @@ public int compareTo(final AssignedPartition that) { } @Override -public boolean equals(Object o) { +public boolean equals(final Object o) { if (!(o instanceof AssignedPartition)) { return false; } -AssignedPartition other = (AssignedPartition) o; +final AssignedPartition other = (AssignedPartition) o; return compareTo(other) == 0; } @@ -104,8 +105,9 @@ public int hashCode() { final String host = getHost(endPoint); final Integer port = getPort(endPoint); -if (host == null || port == null) +if (host == null || port == null) { throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", endPoint)); +} hostInfo = new HostInfo(host, port); } else { @@ -119,10 +121,11 @@ public int hashCode() { state = new ClientState(); } -void addConsumer(final String consumerMemberId, final SubscriptionInfo info) { +void addConsumer(final String consumerMemberId, + final SubscriptionInfo info) { consumers.add(consumerMemberId); -state.addPreviousActiveTasks(info.prevTasks); -state.addPreviousStandbyTasks(info.standbyTasks); +state.addPreviousActiveTasks(info.prevTasks()); +state.addPreviousStandbyTasks(info.standbyTasks()); state.incrementCapacity(); } @@ -157,8 +160,9 @@ public String toString() { private static final Comparator PARTITION_COMPARATOR = new Comparator() { @Override -public int compare(TopicPartition p1, TopicPartition p2) { -int result = p1.topic().compareTo(p2.topic()); +public int compare(final TopicPartition p1, + final TopicPartition p2) { +final int result = p1.topic().compareTo(p2.to
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383292#comment-16383292 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4636: [WIP] KAFKA-6054: Fix Kafka Streams upgrade path for v0.10.0 URL: https://github.com/apache/kafka/pull/4636 Fixes the upgrade path from 0.10.0.x to 0.10.1.x+ Contained in KIP-258 Adds system tests for rolling bounce upgrades. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 ins
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383259#comment-16383259 ] Matthias J. Sax commented on KAFKA-6054: We need to change the metadata version again for KAFKA-3522 and plan to piggyback a fix for this into this KIP. > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: needs-kip > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed when adding > Interactive Queries. Matthias asked me to file this JIRA> -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16381566#comment-16381566 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4630: KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path URL: https://github.com/apache/kafka/pull/4630 Small change in decoding version 1 metadata: don't upgrade to version 2 automatically This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Priority: Major > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, > because the internal version number of the protocol changed whe
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16200571#comment-16200571 ] James Cheng commented on KAFKA-6054: Here is my conversation with [~mjsax] from the Confluent Slack channel: {quote} James Cheng [9:16 AM] Does this stack trace mean anything to anyone? It happened when we upgraded a kafka streams app from 0.10.0.0 to 0.10.2.1. ^ @mjsax, if you have any time to look. Thanks. Matthias J Sax [9:20 AM] That makes sense. We bumped the internal version number when adding IQ feature -- thus, it seems you cannot mix instances for both version. [9:21] Seems, we messed up the upgrade path :disappointed: [9:21] If you can, you would need to stop all old instances, before starting with the new version. [9:21] Can you also open a JIRA for this? [9:24] Thus, rolling bounces to upgrade should actually work -- is this what you are doing? James Cheng [9:27 AM] Yes, we're doing a rolling upgrade. We had (at one point, at least) both instances running. [9:27] I imagine that if the 0.10.0.0 versions crashed, then restarted running 0.10.2.1, then they would be fine because they are all the same version at that point, right? Matthias J Sax [9:27 AM] Yes. James Cheng [9:27 AM] Cool, thanks. Matthias J Sax [9:28 AM] Anyway. Please file a JIRA -- upgrading should always work without this error. James Cheng [9:29 AM] I'll file the JIRA. Matthias J Sax [9:30 AM] Thx. {quote} > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng > > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.Consum