[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415012#comment-16415012 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4746: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4746 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 1f5140b10c8..77123ff8093 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,48 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + for file in "$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index d221d965c5e..2a540211018 100644 --- a/build.gradle +++ b/build.gradle @@ -776,6 +776,19 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 93b92bb52a3..dbbb9127199 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -918,7 +918,7 @@ public void onFailure(RuntimeException e) { log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e); this.failed.set(new RuntimeException(e)); } catch (RuntimeException e) { -log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); +log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e); this.failed.set(e); } } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 212d701..74887483354 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -316,7 +316,7 @@ public int hashCode() { Field f = this.schema.get(i); if (f.type() instanceof ArrayOf) { if (this.get(f) != null) { -Object[] arrayObject = (Object []) this.get(f); +Object[] arrayObject = (Object[]) this.get(f); for (Object arrayItem: arrayObject) result = prime * result + arrayItem.hashCode(); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslC
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414976#comment-16414976 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax opened a new pull request #4779: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4779 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when > upgrading from 0.10.0.0 to 0.10.2.1 > - > > Key: KAFKA-6054 > URL: https://issues.apache.org/jira/browse/KAFKA-6054 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1 >Reporter: James Cheng >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 1.2.0 > > > KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade] > We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling > upgrade of the app, so that one point, there were both 0.10.0.0-based > instances and 0.10.2.1-based instances running. > We observed the following stack trace: > {code:java} > 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo > - > unable to decode subscription data: version=2 > org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode > subscription data: version=2 > at > org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113) > at > org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) > > {code} > I spoke with [~mjsax] and he said this is a known issue that happens when you > have both 0.10.0.0 in
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414965#comment-16414965 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4761: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4761 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index fe6aefd7321..8e2ba91bf2b 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index ce4b4e44cb2..17f3e00358d 100644 --- a/build.gradle +++ b/build.gradle @@ -909,6 +909,42 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { +testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0102') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0102" + + dependencies { +testCompile libs.kafkaStreams_0102 + } + + systemTestLibs { +dependsOn testJar + } +} + project(':jmh-benchmarks') { apply plugin: 'com.github.johnrengelman.shadow' diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 7111bad6054..7102414628a 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; @@ -25,10 +27,7 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.auth.AuthCallbackHandler; +import java.util.Map; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism diff --git a/docs/streams/upgrad
[jira] [Commented] (KAFKA-6666) OffsetOutOfRangeException: Replica Thread Stopped Resulting in Underreplicated Partitions
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414951#comment-16414951 ] Srinivas Dhruvakumar commented on KAFKA-: - [~huxi_2b] any update on the bug. Did we get a conclusion on the issue ? > OffsetOutOfRangeException: Replica Thread Stopped Resulting in > Underreplicated Partitions > - > > Key: KAFKA- > URL: https://issues.apache.org/jira/browse/KAFKA- > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Srinivas Dhruvakumar >Priority: Critical > Attachments: Screen Shot 2018-03-15 at 3.52.13 PM.png > > > Hello All, > Currently we were seeing a few underreplicated partitions on our test cluster > which is used for Intergation testing. On debugging more we found the replica > thread was stopped due to an error > Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot > increment the log start offset to 50 of partition since it is larger > than the high watermark -1 > Kindly find the attached screenshot. > !Screen Shot 2018-03-15 at 3.52.13 PM.png! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
[ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414777#comment-16414777 ] ASF GitHub Bot commented on KAFKA-6054: --- mjsax closed pull request #4758: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0 URL: https://github.com/apache/kafka/pull/4758 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index af10f61b5c4..a25868125e9 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -73,28 +73,50 @@ do fi done -for file in "$base_dir"/clients/build/libs/kafka-clients*.jar; -do - if should_include_file "$file"; then -CLASSPATH="$CLASSPATH":"$file" - fi -done +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + clients_lib_dir=$(dirname $0)/../clients/build/libs + streams_lib_dir=$(dirname $0)/../streams/build/libs + rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION} +else + clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs + streams_lib_dir=$clients_lib_dir + rocksdb_lib_dir=$streams_lib_dir +fi + -for file in "$base_dir"/streams/build/libs/kafka-streams*.jar; +for file in "$clients_lib_dir"/kafka-clients*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; +for file in "$streams_lib_dir"/kafka-streams*.jar; do if should_include_file "$file"; then CLASSPATH="$CLASSPATH":"$file" fi done -for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; +if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then + for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +else + VERSION_NO_DOTS=`echo $UPGRADE_KAFKA_STREAMS_TEST_VERSION | sed 's/\.//g'` + SHORT_VERSION_NO_DOTS=${VERSION_NO_DOTS:0:((${#VERSION_NO_DOTS} - 1))} # remove last char, ie, bug-fix number + for file in "$base_dir"/streams/upgrade-system-tests-$SHORT_VERSION_NO_DOTS/build/libs/kafka-streams-upgrade-system-tests*.jar; + do +if should_include_file "$file"; then + CLASSPATH="$CLASSPATH":"$file" +fi + done +fi + +for file in "$rocksdb_lib_dir"/rocksdb*.jar; do CLASSPATH="$CLASSPATH":"$file" done diff --git a/build.gradle b/build.gradle index 20a184c437c..5e97f901cb6 100644 --- a/build.gradle +++ b/build.gradle @@ -770,6 +770,30 @@ project(':streams:examples') { } } +project(':streams:upgrade-system-tests-0100') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0100" + + dependencies { +testCompile libs.kafkaStreams_0100 + } + + systemTestLibs { +dependsOn testJar + } +} + +project(':streams:upgrade-system-tests-0101') { + archivesBaseName = "kafka-streams-upgrade-system-tests-0101" + + dependencies { +testCompile libs.kafkaStreams_0101 + } + + systemTestLibs { +dependsOn testJar + } +} + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java index 6094b547bb7..b80dfccf3d9 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java @@ -17,7 +17,9 @@ */ package org.apache.kafka.common.security.authenticator; -import java.util.Map; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.common.security.auth.AuthCallbackHandler; import javax.security.auth.Subject; import javax.security.auth.callback.Callback; @@ -26,10 +28,7 @@ import javax.security.auth.callback.UnsupportedCallbackException; import javax.security.sasl.AuthorizeCallback; import javax.security.sasl.RealmCallback; - -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.network.Mode; -import org.apache.kafka.common.security.auth.AuthCallbackHandler; +import java.util.Map; /** * Callback handler for Sasl clients. The callbacks required for the SASL mechanism diff --git a/docs/streams.html b/docs/streams.html index fe0e84ee3b7..d691e63a432 100644 --- a/docs/streams.html +++ b/docs/streams.html @@ -807,21 +807,50 @@ Upgrade Guid See below a complete list of 0.10.2 API and semantical changes that allow you to advance y
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414673#comment-16414673 ] Cemalettin Koç commented on KAFKA-6711: --- [~guozhang] Would you please check implementation please: [https://github.com/cemo/kafka/commit/0cb2482259fec897f396e8b84ffb1921c4f3f63e] I have done something preliminary. I will add necessary tests as well after your guidance. > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6709) broker failed to handle request due to OOM
[ https://issues.apache.org/jira/browse/KAFKA-6709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414613#comment-16414613 ] Dhruvil Shah commented on KAFKA-6709: - It's hard to estimate how much memory would be consumed by down-conversion on brokers - it is a factor of number of fetch requests being processed at a given point in time, the requested fetch size, amount of time it takes to create and send out the fetch response, and subsequently amount of time it takes for JVM to garbage collect the memory associated with responses that were sent out. In general, memory usage would be much higher when brokers need to perform down-conversion. See here for more details - https://kafka.apache.org/0110/documentation.html#upgrade_11_message_format Let me know if you'd be able to provide the heap dump which should help with analyzing inefficiencies in the existing scheme. > broker failed to handle request due to OOM > -- > > Key: KAFKA-6709 > URL: https://issues.apache.org/jira/browse/KAFKA-6709 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.1 >Reporter: Zou Tao >Priority: Critical > Attachments: kafkaServer-gc.log.0.current.zip, kafkaServer.out.tgz, > normal-kafkaServer-gc.log.0.current.zip, server.properties > > > I have updated to release 1.0.1. > I set up cluster which have four brokers. > you could find the server.properties in the attachment. > There are about 150 topics, and about total 4000 partitions, > ReplicationFactor is 2. > connctors are used to write/read data to/from brokers. > connecotr version is 0.10.1. > The average message size is 500B, and around 6 messages per seconds. > one of the broker keep report OOM, and can't handle request like: > [2018-03-24 12:37:17,449] ERROR [KafkaApi-1001] Error when handling request > {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=voltetraffica.data,partitions=[ > {partition=16,fetch_offset=51198,max_bytes=60728640} > ,\{partition=12,fetch_offset=50984,max_bytes=60728640}]}]} > (kafka.server.KafkaApis) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:101) > at > org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:253) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:525) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$4.apply(KafkaApis.scala:523) > at scala.Option.map(Option.scala:146) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:523) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:513) > at scala.Option.flatMap(Option.scala:171) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:513) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:561) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:560) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse$2(KafkaApis.scala:560) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback$1$1.apply(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$sendResponseMaybeThrottle$1.apply$mcVI$sp(KafkaApis.scala:2041) > at > kafka.server.ClientRequestQuotaManager.maybeRecordAndThrottle(ClientRequestQuotaManager.scala:54) > at > kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2040) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback$1(KafkaApis.scala:574) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback$1$1.apply$mcVI$sp(KafkaApis.scala:593) > at > kafka.server.ClientQuotaManager.maybeRecordAndThrottle(ClientQuotaManager.scala:176) > at > kafka.server.KafkaApis.
[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414606#comment-16414606 ] Guozhang Wang commented on KAFKA-6713: -- Hi [~cemo], I've read your code, and here are some follow-up questions / clarifications: 1. StreamsBuilder.globalTable is expecting a {{Materialized>}} parameter and hence with `Materialized.as` one should only pass in a {{KeyValueBytesStoreSupplier}} that generates a {{KeyValueStore}}. So you do not need to template your {{DelegatingByteStore}} but just let it to use a `{{KeyValueStore delegated}} internally. Using a converter after the serde will unnecessarily calling serdes three times other than one: when a pair is passed in, you woud first use the key/value serde to serialize it into bytes, and then deserialize it in your {{DelegatingByteStore}} implementation with the converter into again, and then when with the in-memory store it will once again serialize it into bytes before putting to cache. 2. In your code you are re-using the same {{DelegatingByteStore}} in your {{KeyValueBytesStoreSupplier}}, i.e. whenever `get()` is called it will always return the same store object. Is it intentional? Note that although it is fine for now since we will only call `get()` once across all threads for global store, this is an internal implementation detail that maybe changed. To be safer I'd suggest you generate a new object in your supplier per each `get()` call. 3. About your invalidation use cases, I'm not sure I can follow completely... could you elaborate a bit more? > Provide an easy way replace store with a custom one on High-Level Streams DSL > - > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore implements KeyValueStore byte[]> { > private BytesTypeConverter converter; > private KeyValueStore delegated; > public DelegatingByteStore(KeyValueStore delegated, > BytesTypeConverter converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > .. > {code} > > Type Converter: > {code:java} > public interface TypeConverter { > IK innerKey(final K key); > IV innerValue(final V value); > List> innerEntries(final List> from); > List> outerEntries(final List> from); > V outerValue(final IV value); > KeyValue outerKeyValue(final KeyValue from); > KeyValueinnerKeyValue(final KeyValue entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-6711: Assignee: Cemalettin Koç > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414569#comment-16414569 ] Guozhang Wang commented on KAFKA-6711: -- Great! I've assigned the ticket to you. > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414555#comment-16414555 ] Cemalettin Koç commented on KAFKA-6711: --- I will fix [~guozhang]. > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414477#comment-16414477 ] Cemalettin Koç commented on KAFKA-6713: --- Today I came across another scenario which is still not very clear for me. :) For my use case I have chosen our "Category" entity since It is updated rarely and it is data cardinality is suitable for a newbie Kafka user. :) GlobalKTable is very nice since it is magically filling our in memory stores. However there are some cases I need to invalidate some computed data which is based whole data of store. Please forgive my ignorance since I have just started to use Kafka but I thought It would be nice to have something like KStream as GlobalKStream. :) I can process and trigger some invalidation in case a new updated category? > Provide an easy way replace store with a custom one on High-Level Streams DSL > - > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore implements KeyValueStore byte[]> { > private BytesTypeConverter converter; > private KeyValueStore delegated; > public DelegatingByteStore(KeyValueStore delegated, > BytesTypeConverter converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > .. > {code} > > Type Converter: > {code:java} > public interface TypeConverter { > IK innerKey(final K key); > IV innerValue(final V value); > List> innerEntries(final List> from); > List> outerEntries(final List> from); > V outerValue(final IV value); > KeyValue outerKeyValue(final KeyValue from); > KeyValueinnerKeyValue(final KeyValue entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()
[ https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5863: -- Description: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} was: Here is the call chain: {code} RestServer.httpRequest(reconfigUrl, "POST", taskProps, null); {code} In httpRequest(): {code} } else if (responseCode >= 200 && responseCode < 300) { InputStream is = connection.getInputStream(); T result = JSON_SERDE.readValue(is, responseFormat); {code} For readValue(): {code} public T readValue(InputStream src, TypeReference valueTypeRef) throws IOException, JsonParseException, JsonMappingException { return (T) _readMapAndClose(_jsonFactory.createParser(src), _typeFactory.constructType(valueTypeRef)); {code} Then there would be NPE in constructType(): {code} public JavaType constructType(TypeReference typeRef) { // 19-Oct-2015, tatu: Simpler variant like so should work return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); {code} > Potential null dereference in DistributedHerder#reconfigureConnector() > -- > > Key: KAFKA-5863 > URL: https://issues.apache.org/jira/browse/KAFKA-5863 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Ted Yu >Priority: Minor > > Here is the call chain: > {code} > RestServer.httpRequest(reconfigUrl, "POST", > taskProps, null); > {code} > In httpRequest(): > {code} > } else if (responseCode >= 200 && responseCode < 300) { > InputStream is = connection.getInputStream(); > T result = JSON_SERDE.readValue(is, responseFormat); > {code} > For readValue(): > {code} > public T readValue(InputStream src, TypeReference valueTypeRef) > throws IOException, JsonParseException, JsonMappingException > { > return (T) _readMapAndClose(_jsonFactory.createParser(src), > _typeFactory.constructType(valueTypeRef)); > {code} > Then there would be NPE in constructType(): > {code} > public JavaType constructType(TypeReference typeRef) > { > // 19-Oct-2015, tatu: Simpler variant like so should work > return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414469#comment-16414469 ] Cemalettin Koç edited comment on KAFKA-6713 at 3/26/18 8:20 PM: Hi [~guozhang], I have copied TypeConverter internal interface and added some other methods for my needs. What I want to use actually using a custom in memory store. Then I wanted to pass this `category in memory store` to my `category service`. {code:java} class CategoryInMemoryStore extends InMemoryKeyValueStore { // implementation } {code} I have created an instance of `CategoryInMemoryStore` and passed into my StreamsBuilder as this: {code:java} public GlobalKTable categoryKGlobalTable(StreamsBuilder streamsBuilder) { KeyValueBytesStoreSupplier supplier = new DelegatingByteStore<>(categoryInMemoryStore, createConvertor()).asSupplier(); return streamsBuilder.globalTable(categoryTopic, Materialized.as(supplier) .withCachingDisabled() .withKeySerde(Serdes.Long()) .withValueSerde(CATEGORY_JSON_SERDE)); } {code} The whole point of the files I have attached is that creating a customized version of in memory key value store implementation. I have also attached my implementations which are used above. was (Author: cemo): Hi [~guozhang], I have copied TypeConverter internal interface and added some other methods for my needs. What I want to use actually using a custom in memory store. Then I wanted to pass this `category in memory store` to my `category service`. {code:java} class CategoryInMemoryStore extends InMemoryKeyValueStore { // implementation } {code} I have created an instance of `CategoryInMemoryStore` and passed into my StreamsBuilder as this: {code:java} public GlobalKTable categoryKGlobalTable(StreamsBuilder streamsBuilder) { KeyValueBytesStoreSupplier supplier = new DelegatingByteStore<>(categoryInMemoryStore, createConvertor()).asSupplier(); return streamsBuilder.globalTable(categoryTopic, Materialized.as(supplier) .withCachingDisabled() .withKeySerde(Serdes.Long()) .withValueSerde(CATEGORY_JSON_SERDE)); } {code} The whole point of the files I have attached to create a my in memory key value store implementation. I have also attached my implementations which are used above. > Provide an easy way replace store with a custom one on High-Level Streams DSL > - > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore implements KeyValueStore byte[]> { > private BytesTypeConverter converter; > private KeyValueStore delegated; > public DelegatingByteStore(KeyValueStore delegated, > BytesTypeConverter converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > .. > {code} > > Type Converter: > {code:java} > public interface TypeConverter { > IK innerKey(final K key); > IV innerValue(final V value); > List> innerEntries(final List> from); > List> outerEntries(final List> from); > V outerValue(final IV value); > KeyValue outerKeyValue(final KeyValue from); > KeyValueinnerKeyValue(final KeyValue entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumber
[jira] [Updated] (KAFKA-6413) ReassignPartitionsCommand#parsePartitionReassignmentData() should give better error message when JSON is malformed
[ https://issues.apache.org/jira/browse/KAFKA-6413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6413: -- Description: In this thread: http://search-hadoop.com/m/Kafka/uyzND1J9Hizcxo0X?subj=Partition+reassignment+data+file+is+empty , Allen gave an example JSON string with extra comma where partitionsToBeReassigned returned by ReassignPartitionsCommand#parsePartitionReassignmentData() was empty. I tried the following example where a right bracket is removed: {code} val (partitionsToBeReassigned, replicaAssignment) = ReassignPartitionsCommand.parsePartitionReassignmentData( "{\"version\":1,\"partitions\":[{\"topic\":\"metrics\",\"partition\":0,\"replicas\":[1,2]},{\"topic\":\"metrics\",\"partition\":1,\"replicas\":[2,3]},}"); {code} The returned partitionsToBeReassigned is empty (and no exception was thrown). The parser should give better error message for malformed JSON string. was: In this thread: http://search-hadoop.com/m/Kafka/uyzND1J9Hizcxo0X?subj=Partition+reassignment+data+file+is+empty , Allen gave an example JSON string with extra comma where partitionsToBeReassigned returned by ReassignPartitionsCommand#parsePartitionReassignmentData() was empty. I tried the following example where a right bracket is removed: {code} val (partitionsToBeReassigned, replicaAssignment) = ReassignPartitionsCommand.parsePartitionReassignmentData( "{\"version\":1,\"partitions\":[{\"topic\":\"metrics\",\"partition\":0,\"replicas\":[1,2]},{\"topic\":\"metrics\",\"partition\":1,\"replicas\":[2,3]},}"); {code} The returned partitionsToBeReassigned is empty (and no exception was thrown). The parser should give better error message for malformed JSON string. > ReassignPartitionsCommand#parsePartitionReassignmentData() should give better > error message when JSON is malformed > -- > > Key: KAFKA-6413 > URL: https://issues.apache.org/jira/browse/KAFKA-6413 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Priority: Minor > Labels: json > > In this thread: > http://search-hadoop.com/m/Kafka/uyzND1J9Hizcxo0X?subj=Partition+reassignment+data+file+is+empty > , Allen gave an example JSON string with extra comma where > partitionsToBeReassigned returned by > ReassignPartitionsCommand#parsePartitionReassignmentData() was empty. > I tried the following example where a right bracket is removed: > {code} > val (partitionsToBeReassigned, replicaAssignment) = > ReassignPartitionsCommand.parsePartitionReassignmentData( > > "{\"version\":1,\"partitions\":[{\"topic\":\"metrics\",\"partition\":0,\"replicas\":[1,2]},{\"topic\":\"metrics\",\"partition\":1,\"replicas\":[2,3]},}"); > {code} > The returned partitionsToBeReassigned is empty (and no exception was thrown). > The parser should give better error message for malformed JSON string. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414469#comment-16414469 ] Cemalettin Koç commented on KAFKA-6713: --- Hi [~guozhang], I have copied TypeConverter internal interface and added some other methods for my needs. What I want to use actually using a custom in memory store. Then I wanted to pass this `category in memory store` to my `category service`. {code:java} class CategoryInMemoryStore extends InMemoryKeyValueStore { // implementation } {code} I have created an instance of `CategoryInMemoryStore` and passed into my StreamsBuilder as this: {code:java} public GlobalKTable categoryKGlobalTable(StreamsBuilder streamsBuilder) { KeyValueBytesStoreSupplier supplier = new DelegatingByteStore<>(categoryInMemoryStore, createConvertor()).asSupplier(); return streamsBuilder.globalTable(categoryTopic, Materialized.as(supplier) .withCachingDisabled() .withKeySerde(Serdes.Long()) .withValueSerde(CATEGORY_JSON_SERDE)); } {code} The whole point of the files I have attached to create a my in memory key value store implementation. I have also attached my implementations which are used above. > Provide an easy way replace store with a custom one on High-Level Streams DSL > - > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore implements KeyValueStore byte[]> { > private BytesTypeConverter converter; > private KeyValueStore delegated; > public DelegatingByteStore(KeyValueStore delegated, > BytesTypeConverter converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > .. > {code} > > Type Converter: > {code:java} > public interface TypeConverter { > IK innerKey(final K key); > IV innerValue(final V value); > List> innerEntries(final List> from); > List> outerEntries(final List> from); > V outerValue(final IV value); > KeyValue outerKeyValue(final KeyValue from); > KeyValueinnerKeyValue(final KeyValue entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Cemalettin Koç updated KAFKA-6713: -- Attachment: TypeConverter.java DelegatingByteStore.java BytesTypeConverter.java > Provide an easy way replace store with a custom one on High-Level Streams DSL > - > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: streaming-api > Attachments: BytesTypeConverter.java, DelegatingByteStore.java, > TypeConverter.java > > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore implements KeyValueStore byte[]> { > private BytesTypeConverter converter; > private KeyValueStore delegated; > public DelegatingByteStore(KeyValueStore delegated, > BytesTypeConverter converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > .. > {code} > > Type Converter: > {code:java} > public interface TypeConverter { > IK innerKey(final K key); > IV innerValue(final V value); > List> innerEntries(final List> from); > List> outerEntries(final List> from); > V outerValue(final IV value); > KeyValue outerKeyValue(final KeyValue from); > KeyValueinnerKeyValue(final KeyValue entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6713) Provide an easy way replace store with a custom one on High-Level Streams DSL
[ https://issues.apache.org/jira/browse/KAFKA-6713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414323#comment-16414323 ] Guozhang Wang commented on KAFKA-6713: -- Hi [~cemo] Thanks for reporting this, is very valuable to us. What's puzzles me is why you'd need to implement the {{TypeConverter}} interface, as it is an internal interface and is not supposed to be enforced to users. Could you share more of your code snippet to help me understand better the cumbersomeness. Note that if you are using DSL and trying to implement the customized store, you should only need to implement the {{KeyValueBytesStoreSupplier}}, and the {{KeyValueStore}} it generates. The serdes will be auto-handled by the streams library. > Provide an easy way replace store with a custom one on High-Level Streams DSL > - > > Key: KAFKA-6713 > URL: https://issues.apache.org/jira/browse/KAFKA-6713 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: streaming-api > > I am trying to use GlobalKTable with a custom store implementation. In my > stores, I would like to store my `Category` entites and I would like to query > them by their name as well. My custom store has some capabilities beyond > `get` such as get by `name`. I also want to get all entries in a hierarchical > way in a lazy fashion. I have other use cases as well. > > In order to accomplish my task I had to implement a custom > `KeyValueBytesStoreSupplier`, `BytesTypeConverter` and > > {code:java} > public class DelegatingByteStore implements KeyValueStore byte[]> { > private BytesTypeConverter converter; > private KeyValueStore delegated; > public DelegatingByteStore(KeyValueStore delegated, > BytesTypeConverter converter) { > this.converter = converter; > this.delegated = delegated; > } > @Override > public void put(Bytes key, byte[] value) { > delegated.put(converter.outerKey(key), > converter.outerValue(value)); > } > @Override > public byte[] putIfAbsent(Bytes key, byte[] value) { > V v = delegated.putIfAbsent(converter.outerKey(key), > converter.outerValue(value)); > return v == null ? null : value; > } > .. > {code} > > Type Converter: > {code:java} > public interface TypeConverter { > IK innerKey(final K key); > IV innerValue(final V value); > List> innerEntries(final List> from); > List> outerEntries(final List> from); > V outerValue(final IV value); > KeyValue outerKeyValue(final KeyValue from); > KeyValueinnerKeyValue(final KeyValue entry); > K outerKey(final IK ik); > } > {code} > > This is unfortunately too cumbersome and hard to maintain. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6711: - Summary: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (was: Checkpoint of InMemoryStore along with GlobalKTable ) > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6711) Checkpoint of InMemoryStore along with GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6711: - Labels: newbie (was: ) > Checkpoint of InMemoryStore along with GlobalKTable > > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > Labels: newbie > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6711) Checkpoint of InMemoryStore along with GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16414308#comment-16414308 ] Guozhang Wang commented on KAFKA-6711: -- Hello [~cemo], it is by-design that in-memory state stores will not persist any data to the local storage engine upon closing, and hence when restarting it will lose all the data and hence will need to re-bootstrap from the topic again. For normal state stores we are having the check that {{if (store.persistent() && storeToChangelogTopic.containsKey(storeName)) // write checkpoint values}}, but I agree with you that in global store implementation, i.e. {{GlobalStateManagerImpl}} we do not have this check, which I think is a bug. I.e. we should do the similar check and hence do not write offsets in the checkpoint file. Would you like to fix this issue? As for your own observed issue, we do have a JIRA open for adding the checkpoint feature for in-memory stores so that data will not be lost upon closing: https://issues.apache.org/jira/browse/KAFKA-3184. > Checkpoint of InMemoryStore along with GlobalKTable > > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Priority: Major > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6716) discardChannel should be released in MockSelector#completeSend
Ted Yu created KAFKA-6716: - Summary: discardChannel should be released in MockSelector#completeSend Key: KAFKA-6716 URL: https://issues.apache.org/jira/browse/KAFKA-6716 Project: Kafka Issue Type: Test Reporter: Ted Yu {code} private void completeSend(Send send) throws IOException { // Consume the send so that we will be able to send more requests to the destination ByteBufferChannel discardChannel = new ByteBufferChannel(send.size()); while (!send.completed()) { send.writeTo(discardChannel); } completedSends.add(send); } {code} The {{discardChannel}} should be closed before returning from the method -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Issue Comment Deleted] (KAFKA-6530) Use actual first offset of messages when rolling log segment for magic v2
[ https://issues.apache.org/jira/browse/KAFKA-6530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-6530: Comment: was deleted (was: Thanks! On Sat, Mar 17, 2018 at 10:31 AM Jason Gustafson (JIRA) ) > Use actual first offset of messages when rolling log segment for magic v2 > - > > Key: KAFKA-6530 > URL: https://issues.apache.org/jira/browse/KAFKA-6530 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Dhruvil Shah >Priority: Major > Fix For: 1.2.0 > > > We've implemented a heuristic to avoid overflowing when rolling a log segment > to determine the base offset of the next segment without decompressing the > message set to find the actual first offset. With the v2 message format, we > can find the first offset without needing decompression, so we can set the > correct base offset exactly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries
[ https://issues.apache.org/jira/browse/KAFKA-6712?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6712. Resolution: Duplicate > Throw a specific exception with wrong topic name for interactive queries > > > Key: KAFKA-6712 > URL: https://issues.apache.org/jira/browse/KAFKA-6712 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Francesco Guardiani >Priority: Major > > When you use the interactive queries state stores with a wrong topic name and > you call the store() method, the client should throw an exception that > explains that you have specified a wrong topic name. Now it throws an > IllegalStateStoreException "the state store may have migrated to another > instance." that is too generic and it's used also when the stream thread is > not ready. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries
[ https://issues.apache.org/jira/browse/KAFKA-6712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413129#comment-16413129 ] Matthias J. Sax edited comment on KAFKA-6712 at 3/26/18 5:30 PM: - Thanks for opening a ticket for this [~slinkydeveloper]. I am wondering if it is a duplicate of KAFKA-5876 though? was (Author: mjsax): Thanks for opening a ticket for this [~slinkydeveloper]. I am wondering if is is a duplicate of KAFKA-5876 though? > Throw a specific exception with wrong topic name for interactive queries > > > Key: KAFKA-6712 > URL: https://issues.apache.org/jira/browse/KAFKA-6712 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Francesco Guardiani >Priority: Major > > When you use the interactive queries state stores with a wrong topic name and > you call the store() method, the client should throw an exception that > explains that you have specified a wrong topic name. Now it throws an > IllegalStateStoreException "the state store may have migrated to another > instance." that is too generic and it's used also when the stream thread is > not ready. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6678) Upgrade dependencies with later release versions
[ https://issues.apache.org/jira/browse/KAFKA-6678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6678: -- Description: {code} The following dependencies have later release versions: - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1] - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59] - com.puppycrawl.tools:checkstyle [6.19 -> 8.8] - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1] - org.ajoberstar:grgit [1.9.3 -> 2.1.1] - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26] - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121] - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121] - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121] - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121] - org.openjdk.jmh:jmh-core [1.19 -> 1.20] - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20] - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20] - org.lz4:lz4-java [1.4 -> 1.4.1] - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3] - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0] - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0] - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3] - org.scala-lang:scala-library [2.11.12 -> 2.12.4] - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0] - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4] - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5] {code} Looks like we can consider upgrading scalatest, jmh-core and checkstyle was: {code} The following dependencies have later release versions: - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1] - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59] - com.puppycrawl.tools:checkstyle [6.19 -> 8.8] - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1] - org.ajoberstar:grgit [1.9.3 -> 2.1.1] - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26] - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121] - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121] - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121] - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121] - org.openjdk.jmh:jmh-core [1.19 -> 1.20] - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20] - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20] - org.lz4:lz4-java [1.4 -> 1.4.1] - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3] - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0] - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0] - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3] - org.scala-lang:scala-library [2.11.12 -> 2.12.4] - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0] - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4] - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5] {code} Looks like we can consider upgrading scalatest, jmh-core and checkstyle > Upgrade dependencies with later release versions > > > Key: KAFKA-6678 > URL: https://issues.apache.org/jira/browse/KAFKA-6678 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Priority: Major > Attachments: k-update.txt > > > {code} > The following dependencies have later release versions: > - net.sourceforge.argparse4j:argparse4j [0.7.0 -> 0.8.1] > - org.bouncycastle:bcpkix-jdk15on [1.58 -> 1.59] > - com.puppycrawl.tools:checkstyle [6.19 -> 8.8] > - org.owasp:dependency-check-gradle [3.0.2 -> 3.1.1] > - org.ajoberstar:grgit [1.9.3 -> 2.1.1] > - org.glassfish.jersey.containers:jersey-container-servlet [2.25.1 -> 2.26] > - org.eclipse.jetty:jetty-client [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-server [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlet [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.eclipse.jetty:jetty-servlets [9.2.24.v20180105 -> 9.4.8.v20171121] > - org.openjdk.jmh:jmh-core [1.19 -> 1.20] > - org.openjdk.jmh:jmh-core-benchmarks [1.19 -> 1.20] > - org.openjdk.jmh:jmh-generator-annprocess [1.19 -> 1.20] > - org.lz4:lz4-java [1.4 -> 1.4.1] > - org.apache.maven:maven-artifact [3.5.2 -> 3.5.3] > - org.jacoco:org.jacoco.agent [0.7.9 -> 0.8.0] > - org.jacoco:org.jacoco.ant [0.7.9 -> 0.8.0] > - org.rocksdb:rocksdbjni [5.7.3 -> 5.11.3] > - org.scala-lang:scala-library [2.11.12 -> 2.12.4] > - com.typesafe.scala-logging:scala-logging_2.11 [3.7.2 -> 3.8.0] > - org.scala-lang:scala-reflect [2.11.12 -> 2.12.4] > - org.scalatest:scalatest_2.11 [3.0.4 -> 3.0.5] > {code} > Looks like we can consider upgrading scalatest, jmh-core and checkstyle -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6531) SocketServerTest#closingChannelException fails sometimes
[ https://issues.apache.org/jira/browse/KAFKA-6531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6531: -- Component/s: core > SocketServerTest#closingChannelException fails sometimes > > > Key: KAFKA-6531 > URL: https://issues.apache.org/jira/browse/KAFKA-6531 > Project: Kafka > Issue Type: Test > Components: core >Reporter: Ted Yu >Priority: Minor > > From > https://builds.apache.org/job/kafka-trunk-jdk9/361/testReport/junit/kafka.network/SocketServerTest/closingChannelException/ > : > {code} > java.lang.AssertionError: Channels not removed > at kafka.utils.TestUtils$.fail(TestUtils.scala:355) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:865) > at > kafka.network.SocketServerTest.assertProcessorHealthy(SocketServerTest.scala:914) > at > kafka.network.SocketServerTest.$anonfun$closingChannelException$1(SocketServerTest.scala:763) > at > kafka.network.SocketServerTest.$anonfun$closingChannelException$1$adapted(SocketServerTest.scala:747) > {code} > Among the test output, I saw: > {code} > [2018-02-04 18:51:15,995] ERROR Processor 0 closed connection from > /127.0.0.1:48261 (kafka.network.SocketServerTest$$anon$5$$anon$1:73) > java.lang.IllegalStateException: There is already a connection for id > 127.0.0.1:1-127.0.0.1:2-0 > at > org.apache.kafka.common.network.Selector.ensureNotRegistered(Selector.java:260) > at org.apache.kafka.common.network.Selector.register(Selector.java:254) > at > kafka.network.SocketServerTest$TestableSelector.super$register(SocketServerTest.scala:1043) > at > kafka.network.SocketServerTest$TestableSelector.$anonfun$register$2(SocketServerTest.scala:1043) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at > kafka.network.SocketServerTest$TestableSelector.runOp(SocketServerTest.scala:1037) > at > kafka.network.SocketServerTest$TestableSelector.register(SocketServerTest.scala:1043) > at > kafka.network.Processor.configureNewConnections(SocketServer.scala:723) > at kafka.network.Processor.run(SocketServer.scala:532) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5943) Reduce dependency on mock in connector tests
[ https://issues.apache.org/jira/browse/KAFKA-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-5943: -- Description: Currently connector tests make heavy use of mock (easymock, power mock). This may hide the real logic behind operations and makes finding bugs difficult. We should reduce the use of mocks so that developers can debug connector code using unit tests. This would shorten the development cycle for connector. was: Currently connector tests make heavy use of mock (easymock, power mock). This may hide the real logic behind operations and makes finding bugs difficult. We should reduce the use of mocks so that developers can debug connector code using unit tests. This would shorten the development cycle for connector. > Reduce dependency on mock in connector tests > > > Key: KAFKA-5943 > URL: https://issues.apache.org/jira/browse/KAFKA-5943 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Priority: Minor > Labels: connector > > Currently connector tests make heavy use of mock (easymock, power mock). > This may hide the real logic behind operations and makes finding bugs > difficult. > We should reduce the use of mocks so that developers can debug connector code > using unit tests. > This would shorten the development cycle for connector. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5802) ScramServerCallbackHandler#handle should check username not being null before calling credentialCache.get()
[ https://issues.apache.org/jira/browse/KAFKA-5802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16302643#comment-16302643 ] Ted Yu edited comment on KAFKA-5802 at 3/26/18 4:43 PM: +1 from me. was (Author: yuzhih...@gmail.com): +1 from me . > ScramServerCallbackHandler#handle should check username not being null before > calling credentialCache.get() > --- > > Key: KAFKA-5802 > URL: https://issues.apache.org/jira/browse/KAFKA-5802 > Project: Kafka > Issue Type: Bug >Reporter: Ted Yu >Priority: Minor > > {code} > String username = null; > for (Callback callback : callbacks) { > if (callback instanceof NameCallback) > username = ((NameCallback) callback).getDefaultName(); > else if (callback instanceof ScramCredentialCallback) > ((ScramCredentialCallback) > callback).scramCredential(credentialCache.get(username)); > {code} > Since ConcurrentHashMap, used by CredentialCache, doesn't allow null keys, we > should check that username is not null before calling credentialCache.get() -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5946) Give connector method parameter better name
[ https://issues.apache.org/jira/browse/KAFKA-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392411#comment-16392411 ] Ted Yu edited comment on KAFKA-5946 at 3/26/18 4:42 PM: Thanks for taking it. was (Author: yuzhih...@gmail.com): Thanks for taking it . > Give connector method parameter better name > --- > > Key: KAFKA-5946 > URL: https://issues.apache.org/jira/browse/KAFKA-5946 > Project: Kafka > Issue Type: Improvement >Reporter: Ted Yu >Assignee: Tanvi Jaywant >Priority: Major > Labels: connector, newbie > > During the development of KAFKA-5657, there were several iterations where > method call didn't match what the connector parameter actually represents. > [~ewencp] had used connType as equivalent to connClass because Type wasn't > used to differentiate source vs sink. > [~ewencp] proposed the following: > {code} > It would help to convert all the uses of connType to connClass first, then > standardize on class == java class, type == source/sink, name == > user-specified name. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified
[ https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191405#comment-16191405 ] Ted Yu edited comment on KAFKA-5846 at 3/26/18 4:42 PM: Patch looks good to me. was (Author: yuzhih...@gmail.com): Patch looks good to me . > Use singleton NoOpConsumerRebalanceListener in subscribe() call where > listener is not specified > --- > > Key: KAFKA-5846 > URL: https://issues.apache.org/jira/browse/KAFKA-5846 > Project: Kafka > Issue Type: Task > Components: clients >Reporter: Ted Yu >Assignee: Kamal Chandraprakash >Priority: Minor > > Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for > each subscribe() call where ConsumerRebalanceListener is not specified: > {code} > public void subscribe(Pattern pattern) { > subscribe(pattern, new NoOpConsumerRebalanceListener()); > {code} > We can create a singleton NoOpConsumerRebalanceListener to be used in such > scenarios. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6303) Potential lack of synchronization in NioEchoServer#AcceptorThread
[ https://issues.apache.org/jira/browse/KAFKA-6303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16333251#comment-16333251 ] Ted Yu edited comment on KAFKA-6303 at 3/26/18 4:41 PM: lgtm was (Author: yuzhih...@gmail.com): +1 > Potential lack of synchronization in NioEchoServer#AcceptorThread > - > > Key: KAFKA-6303 > URL: https://issues.apache.org/jira/browse/KAFKA-6303 > Project: Kafka > Issue Type: Bug > Components: network >Reporter: Ted Yu >Assignee: siva santhalingam >Priority: Minor > > In the run() method: > {code} > SocketChannel socketChannel = > ((ServerSocketChannel) key.channel()).accept(); > socketChannel.configureBlocking(false); > newChannels.add(socketChannel); > {code} > Modification to newChannels should be protected by synchronized block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file
[ https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413904#comment-16413904 ] Andrew Olson commented on KAFKA-5413: - {quote}Is this the same issue as was reported here?{quote} It sounds like a related issue, that may be more rare than what the correction for this one addressed, or a variant that eluded the bug fix here. The stack trace looks the same. > Log cleaner fails due to large offset in segment file > - > > Key: KAFKA-5413 > URL: https://issues.apache.org/jira/browse/KAFKA-5413 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0 >Reporter: Nicholas Ngorok >Assignee: Kelvin Rutt >Priority: Critical > Labels: reliability > Fix For: 0.10.2.2, 0.11.0.0 > > Attachments: .index.cleaned, > .log, .log.cleaned, > .timeindex.cleaned, 002147422683.log, > kafka-5413.patch > > > The log cleaner thread in our brokers is failing with the trace below > {noformat} > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 > 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner) > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp > Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. > (kafka.log.LogCleaner) > [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) > java.lang.IllegalArgumentException: requirement failed: largest offset in > message set can not be safely converted to relative offset. > at scala.Predef$.require(Predef.scala:224) > at kafka.log.LogSegment.append(LogSegment.scala:109) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.clean(LogCleaner.scala:362) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Stopped (kafka.log.LogCleaner) > {noformat} > This seems to point at the specific line [here| > https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92] > in the kafka src where the difference is actually larger than MAXINT as both > baseOffset and offset are of type long. It was introduced in this [pr| > https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631] > These were the outputs of dumping the first two log segments > {noformat} > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0.log > Dumping /kafka-logs/__consumer_offsets-12/.log > Starting offset: 0 > offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: > -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34 > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0002147343575.log > Dumping /kafka-logs/__consumer_offsets-12/002147343575.log > Starting offset: 2147343575 > offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo > adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34 > {noformat} > My guess is that since 2147539884 is larger than MAXINT, we are hitting this > exception. Was there a specific reason, this check was added in 0.10.2? > E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of > "key 1" following, wouldn't we run into this situation whenever the log > cleaner runs? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason
[ https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6715: -- Description: In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker (broker 1). We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log level for the kafka.controller and the state change log is set to TRACE. Though all Brokers are running, the situation does not recover. It sticks in a highly imbalanced leader distribution, in which two brokers are no leader for any partition, and one broker is the leader for almost all partitions. {code:java} kafka-controller Log (Level TRACE): [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 0.0 (kafka.controller.KafkaController) ... [2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) ... [2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) {code} The imbalance was recognized by the controller, but nothing happend. The imbalance for broker 1 seems to be wrong, because this broker has taken the leader role for most of the partitions. In addition it seems that the ReplicaFetcherThreads die without any log message (see attached stack trace), though we think this is not possible... However, we would expect log messages that state, that fetchers for partitions have been removed, as well that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ is set to INFO. In other situations, when a broker is shut down we see such entries in the log files. Besides that, this caused underreplicated partitions. It seems that no broker fetches from the partitions with the newly assigned leaders. Like the situation with the highly imbalanced leader distribution the cluster sticks in this state and does not recover. This is a recurring problem, however we cannot reproduce it. was: In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker. We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log lev
[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason
[ https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6715: -- Description: In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker. We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log level for the kafka.controller and the state change log is set to TRACE. Though all Brokers are running, the situation does not recover. It sticks in a highly imbalanced leader distribution, in which two brokers are no leader for any partition, and one broker is the leader for almost all partitions. {code:java} kafka-controller Log (Level TRACE): [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 0.0 (kafka.controller.KafkaController) ... [2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) ... [2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) {code} The imbalance was recognized by the controller, but nothing happend. In addition it seems that the ReplicaFetcherThreads die without any log message (see attached stack trace), though we think this is not possible... However, we would expect log messages that state, that fetchers for partitions have been removed, as well that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ is set to INFO. In other situations, when a broker is shut down we see such entries in the log files. Besides that, this caused underreplicated partitions. It seems that no broker fetches from the partitions with the newly assigned leaders. Like the situation with the highly imbalanced leader distribution the cluster sticks in this state and does not recover. This is a recurring problem, however we cannot reproduce it. was: In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker. We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log level for the kafka.controller and the state change log is set to TRACE. Though all Brokers are running, the situation does not recover
[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down
[ https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6714: -- Description: In our Kafka cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} kafka.controller Log (Level TRACE): [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) {code} {code:java} state.change.logger Log (Level TRACE): [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) {code} However, we should see the log entry "Shutting down broker n" for all Brokers in the log file, but it is not there. This is a recurring problem, however we cannot reproduce it. was: In our Kafka cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} kafka.controller Log (Level TRACE): [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) {code} {code:java} state.change.logger Log (Level TRACE): [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvail
[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason
[ https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6715: -- Description: In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker. We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log level for the kafka.controller and the state change log is set to TRACE. Though all Brokers are running, the situation does not recover. It sticks in a highly imbalanced leader distribution, in which two brokers are no leader for any partition, and one broker is the leader for almost all partitions. {code:java} kafka-controller Log (Level TRACE): [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 0.0 (kafka.controller.KafkaController) ... [2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) ... [2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) {code} The imbalance was recognized by the controller, but nothing happend. In addition it seems that the ReplicaFetcherThreads die without any log message (see attached stack trace), though we think this is not possible... However, we would expect log messages that state, that fetchers for partitions have been removed, as well that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ is set to INFO. In other situations, when a broker is shut down we see such entries in the log files. Besides that, this caused underreplicated partitions. It seems that no broker fetches from the partitions with the newly assigned leaders. Like the situation with the highly imbalanced leader distribution the cluster sticks in this state and does not recover. was: In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker. We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log level for the kafka.controller and the state change log is set to TRACE. Though all Brokers are running, the situation does not recover. It sticks in a highly imbalanced leader distribution, in wh
[jira] [Updated] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason
[ https://issues.apache.org/jira/browse/KAFKA-6715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6715: -- Attachment: 20180319-1756_kafka01-jvm-stack.dump > Leader transition for all partitions lead by two brokers without visible > reason > --- > > Key: KAFKA-6715 > URL: https://issues.apache.org/jira/browse/KAFKA-6715 > Project: Kafka > Issue Type: Bug > Components: core, replication >Affects Versions: 0.11.0.2 > Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances > with 5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. > The cluster is distributed across 2 availability zones. >Reporter: Uwe Eisele >Priority: Critical > Attachments: 20180319-1756_kafka01-jvm-stack.dump > > > In our cluster we experienced a situation, in which the leader of all > partitions lead by two brokers has been moved mainly to one other broker. > We don't know why this happend. At this time there was not broker outage, nor > a broker shutdown has been initiated. The Zookeeper nodes of the affected > brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this > time. > In addition there are no logs that would indicate a leader transition for the > affected brokers. We would expect to see a "{{sending become-leader > LeaderAndIsr request}}" in the controller log for each partition, as well a > "{{completed LeaderAndIsr request}}" in the state change log of the Kafka > brokers that becomes the new leader and follower. Our log level for the > kafka.controller and the state change log is set to TRACE. > Though all Brokers are running, the situation does not recover. It sticks in > a highly imbalanced leader distribution, in which two brokers are no leader > for any partition, and one broker is the leader for almost all partitions. > {code:java} > kafka-controller Log (Level TRACE): > [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for > broker 5 is 0.0 (kafka.controller.KafkaController) > [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for > broker 1 is 0.0 (kafka.controller.KafkaController) > [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for > broker 2 is 0.0 (kafka.controller.KafkaController) > [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for > broker 3 is 0.0 (kafka.controller.KafkaController) > [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for > broker 4 is 0.0 (kafka.controller.KafkaController) > ... > [2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for > broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) > [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for > broker 1 is 0.0 (kafka.controller.KafkaController) > [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for > broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) > [2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for > broker 3 is 1.0 (kafka.controller.KafkaController) > [2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for > broker 4 is 1.0 (kafka.controller.KafkaController) > ... > [2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for > broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) > [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for > broker 1 is 0.0 (kafka.controller.KafkaController) > [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for > broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) > [2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for > broker 3 is 1.0 (kafka.controller.KafkaController) > [2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for > broker 4 is 1.0 (kafka.controller.KafkaController) > {code} > The imbalance was recognized by the controller, but nothing happend. > In addition it seems that the ReplicaFetcherThreads die without any log > message, though we think this is not possible... However, we would expect log > messages that state, that fetchers for partitions has been removed, as well > that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ > is set to INFO. In other situations, when a broker is shuttdown we see such > entries in the log files. > Besides that, this caused underreplicated partitions. It seems that no broker > fetches from the partitions with the newly assigned leaders. Like the > situation with the highly imbalanced leader distribution the cluster sticks > in this state and does not recover. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6715) Leader transition for all partitions lead by two brokers without visible reason
Uwe Eisele created KAFKA-6715: - Summary: Leader transition for all partitions lead by two brokers without visible reason Key: KAFKA-6715 URL: https://issues.apache.org/jira/browse/KAFKA-6715 Project: Kafka Issue Type: Bug Components: core, replication Affects Versions: 0.11.0.2 Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances with 5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. The cluster is distributed across 2 availability zones. Reporter: Uwe Eisele In our cluster we experienced a situation, in which the leader of all partitions lead by two brokers has been moved mainly to one other broker. We don't know why this happend. At this time there was not broker outage, nor a broker shutdown has been initiated. The Zookeeper nodes of the affected brokers (/brokers/ids/3, /brokers/ids/4) has not been modified during this time. In addition there are no logs that would indicate a leader transition for the affected brokers. We would expect to see a "{{sending become-leader LeaderAndIsr request}}" in the controller log for each partition, as well a "{{completed LeaderAndIsr request}}" in the state change log of the Kafka brokers that becomes the new leader and follower. Our log level for the kafka.controller and the state change log is set to TRACE. Though all Brokers are running, the situation does not recover. It sticks in a highly imbalanced leader distribution, in which two brokers are no leader for any partition, and one broker is the leader for almost all partitions. {code:java} kafka-controller Log (Level TRACE): [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,042] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:03:54,043] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 0.0 (kafka.controller.KafkaController) ... [2018-03-19 17:08:54,049] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,050] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:08:54,051] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:08:54,053] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) ... [2018-03-19 17:23:54,080] TRACE [Controller 3]: Leader imbalance ratio for broker 5 is 0.8054794520547945 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 1 is 0.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,081] TRACE [Controller 3]: Leader imbalance ratio for broker 2 is 0.4807692307692308 (kafka.controller.KafkaController) [2018-03-19 17:23:54,082] TRACE [Controller 3]: Leader imbalance ratio for broker 3 is 1.0 (kafka.controller.KafkaController) [2018-03-19 17:23:54,084] TRACE [Controller 3]: Leader imbalance ratio for broker 4 is 1.0 (kafka.controller.KafkaController) {code} The imbalance was recognized by the controller, but nothing happend. In addition it seems that the ReplicaFetcherThreads die without any log message, though we think this is not possible... However, we would expect log messages that state, that fetchers for partitions has been removed, as well that the ReplicaFetcherThreads are shutting down. The log level for _kafka_ is set to INFO. In other situations, when a broker is shuttdown we see such entries in the log files. Besides that, this caused underreplicated partitions. It seems that no broker fetches from the partitions with the newly assigned leaders. Like the situation with the highly imbalanced leader distribution the cluster sticks in this state and does not recover. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group
[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413789#comment-16413789 ] Narayan Periwal commented on KAFKA-6681: [~yuzhih...@gmail.com], We faced yet another such issue, on server side we found these logs in this case {noformat} [2018-03-23 18:59:16,560] INFO [GroupCoordinator 6]: Stabilized group prod-m10n-event-batcher-billablebeaconams1 generation 6 (kafka.coordinator.GroupCoordinator) [2018-03-23 18:59:46,561] INFO [GroupCoordinator 6]: Preparing to restabilize group prod-m10n-event-batcher-billablebeaconams1 with old generation 6 (kafka.coordinator.GroupCoordinator) [2018-03-23 18:59:46,833] INFO [GroupCoordinator 6]: Stabilized group prod-m10n-event-batcher-billablebeaconams1 generation 7 (kafka.coordinator.GroupCoordinator) {noformat} > Two instances of kafka consumer reading the same partition within a consumer > group > -- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.1 >Reporter: Narayan Periwal >Priority: Critical > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > During this time, we also see the underreplicated partition metrics spiking. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down
[ https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6714: -- Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances with 5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. The cluster is distributed across 2 availability zones. (was: Kafka Cluster on Amazon AWS EC2 r4.2xlarge instances with 5 nodes and a Zookeeper Cluster on r4.2xlarge instances with 3 nodes. The Cluster is distributed across 2 availability zones.) > KafkaController marks all Brokers as "Shutting down", though only one broker > has been shut down > --- > > Key: KAFKA-6714 > URL: https://issues.apache.org/jira/browse/KAFKA-6714 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.11.0.2 > Environment: Kafka cluster on Amazon AWS EC2 r4.2xlarge instances > with 5 nodes and a Zookeeper cluster on r4.2xlarge instances with 3 nodes. > The cluster is distributed across 2 availability zones. >Reporter: Uwe Eisele >Priority: Critical > > In our Kafka Cluster we experienced a situation in wich the Kafka controller > has all Brokers marked as "Shutting down", though indeed only one Broker has > been shut down. > The last log entry about the broker state before the entry that states that > all brokers are shutting down states that no brokers are shutting down. > The consequence of this weird state is, that the Kafka controller is not able > to elect any partition leader. > {code:java} > [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 > (kafka.controller.KafkaController) > [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 > (kafka.controller.KafkaController) > [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 > (kafka.controller.KafkaController) > ... > [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in > the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) > [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in > the cluster: Set() (kafka.controller.KafkaController) > ... > [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 > (kafka.controller.KafkaController) > [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: > 1,5,2,3,4 (kafka.controller.KafkaController) > [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: > (kafka.controller.KafkaController) > ... > [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while > electing leader for partition > [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other > replicas in ISR 1,3,5 for > [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting > down brokers 1,5,2,3,4. (state.change.logger) {code} > The question is why the Kafka controller assumes that all brokers are > shutting down? > The only place in the Kafka code (0.11.0.2) we found in which the shutting > down broker set is changed is in the class _kafka.controller.KafkaControler_ > in line 1407 in the method _doControlledShutdown_. > > {code:java} > info("Shutting down broker " + id) > if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) > throw new BrokerNotAvailableException("Broker id %d does not > exist.".format(id)) > controllerContext.shuttingDownBrokerIds.add(id) > {code} > However, we should see the log entry "Shutting down broker n" for all Brokers > in the log file, but it is not there. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down
[ https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6714: -- Description: In our Kafka cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) {code} However, we should see the log entry "Shutting down broker n" for all Brokers in the log file, but it is not there. was: In our Kafka Cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) {code} However, we should see the log entry "Shutting down broker n" for all Brokers in the log file, but it is not there.
[jira] [Created] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down
Uwe Eisele created KAFKA-6714: - Summary: KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down Key: KAFKA-6714 URL: https://issues.apache.org/jira/browse/KAFKA-6714 Project: Kafka Issue Type: Bug Components: controller, core Affects Versions: 0.11.0.2 Environment: Kafka Cluster on Amazon AWS EC2 r4.2xlarge instances with 5 nodes and a Zookeeper Cluster on r4.2xlarge instances with 3 nodes. The Cluster is distributed across 2 availability zones. Reporter: Uwe Eisele In our Kafka Cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) {code} However, we should see the log entry "Shutting down broker n" for all Brokers in the log file, but it is not there. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down
[ https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6714: -- Priority: Critical (was: Major) > KafkaController marks all Brokers as "Shutting down", though only one broker > has been shut down > --- > > Key: KAFKA-6714 > URL: https://issues.apache.org/jira/browse/KAFKA-6714 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 0.11.0.2 > Environment: Kafka Cluster on Amazon AWS EC2 r4.2xlarge instances > with 5 nodes and a Zookeeper Cluster on r4.2xlarge instances with 3 nodes. > The Cluster is distributed across 2 availability zones. >Reporter: Uwe Eisele >Priority: Critical > > In our Kafka Cluster we experienced a situation in wich the Kafka controller > has all Brokers marked as "Shutting down", though indeed only one Broker has > been shut down. > The last log entry about the broker state before the entry that states that > all brokers are shutting down states that no brokers are shutting down. > The consequence of this weird state is, that the Kafka controller is not able > to elect any partition leader. > {code:java} > [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 > (kafka.controller.KafkaController) > [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 > (kafka.controller.KafkaController) > [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 > (kafka.controller.KafkaController) > ... > [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in > the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) > [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in > the cluster: Set() (kafka.controller.KafkaController) > ... > [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 > (kafka.controller.KafkaController) > [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: > 1,5,2,3,4 (kafka.controller.KafkaController) > [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: > (kafka.controller.KafkaController) > ... > [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while > electing leader for partition > [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other > replicas in ISR 1,3,5 for > [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting > down brokers 1,5,2,3,4. (state.change.logger) {code} > The question is why the Kafka controller assumes that all brokers are > shutting down? > The only place in the Kafka code (0.11.0.2) we found in which the shutting > down broker set is changed is in the class _kafka.controller.KafkaControler_ > in line 1407 in the method _doControlledShutdown_. > > {code:java} > info("Shutting down broker " + id) > if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) > throw new BrokerNotAvailableException("Broker id %d does not > exist.".format(id)) > controllerContext.shuttingDownBrokerIds.add(id) > {code} > However, we should see the log entry "Shutting down broker n" for all Brokers > in the log file, but it is not there. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6714) KafkaController marks all Brokers as "Shutting down", though only one broker has been shut down
[ https://issues.apache.org/jira/browse/KAFKA-6714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Uwe Eisele updated KAFKA-6714: -- Description: In our Kafka cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} kafka.controller Log (Level TRACE): [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) {code} {code:java} state.change.logger Log (Level TRACE): [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) {code} However, we should see the log entry "Shutting down broker n" for all Brokers in the log file, but it is not there. was: In our Kafka cluster we experienced a situation in wich the Kafka controller has all Brokers marked as "Shutting down", though indeed only one Broker has been shut down. The last log entry about the broker state before the entry that states that all brokers are shutting down states that no brokers are shutting down. The consequence of this weird state is, that the Kafka controller is not able to elect any partition leader. {code:java} [2018-03-15 16:28:24,288] INFO [Controller 5]: Shutting down broker 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: All shutting down brokers: 5 (kafka.controller.KafkaController) [2018-03-15 16:28:24,288] DEBUG [Controller 5]: Live brokers: 1,2,3,4 (kafka.controller.KafkaController) ... [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently active brokers in the cluster: Set(1, 2, 3, 4) (kafka.controller.KafkaController) [2018-03-15 16:28:36,846] INFO [Controller 3]: Currently shutting brokers in the cluster: Set() (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,273] INFO [Controller 3]: Shutting down broker 1 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: All shutting down brokers: 1,5,2,3,4 (kafka.controller.KafkaController) [2018-03-19 17:57:22,273] DEBUG [Controller 3]: Live brokers: (kafka.controller.KafkaController) ... [2018-03-19 17:57:22,275] ERROR Controller 3 epoch 83 encountered error while electing leader for partition [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] due to: No other replicas in ISR 1,3,5 for [zughaltphase_v3_intern_intern_partitioned_by_evanummer,6] besides shutting down brokers 1,5,2,3,4. (state.change.logger) {code} The question is why the Kafka controller assumes that all brokers are shutting down? The only place in the Kafka code (0.11.0.2) we found in which the shutting down broker set is changed is in the class _kafka.controller.KafkaControler_ in line 1407 in the method _doControlledShutdown_. {code:java} info("Shutting down broker " + id) if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id)) controllerContext.shuttingDownBrokerIds.add(id) {code} However, we should see the log entry "
[jira] [Commented] (KAFKA-6712) Throw a specific exception with wrong topic name for interactive queries
[ https://issues.apache.org/jira/browse/KAFKA-6712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413518#comment-16413518 ] Francesco Guardiani commented on KAFKA-6712: Yep > Throw a specific exception with wrong topic name for interactive queries > > > Key: KAFKA-6712 > URL: https://issues.apache.org/jira/browse/KAFKA-6712 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Francesco Guardiani >Priority: Major > > When you use the interactive queries state stores with a wrong topic name and > you call the store() method, the client should throw an exception that > explains that you have specified a wrong topic name. Now it throws an > IllegalStateStoreException "the state store may have migrated to another > instance." that is too generic and it's used also when the stream thread is > not ready. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6681) Two instances of kafka consumer reading the same partition within a consumer group
[ https://issues.apache.org/jira/browse/KAFKA-6681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16413500#comment-16413500 ] Narayan Periwal commented on KAFKA-6681: [~yuzhih...@gmail.com], any update on this? > Two instances of kafka consumer reading the same partition within a consumer > group > -- > > Key: KAFKA-6681 > URL: https://issues.apache.org/jira/browse/KAFKA-6681 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.1 >Reporter: Narayan Periwal >Priority: Critical > Attachments: server-1.log, server-2.log > > > We have seen this issue with the Kafka consumer, the new library that got > introduced in 0.9 > With this new client, the group management is done by kafka coordinator, > which is one of the kafka broker. > We are using Kafka broker 0.10.2.1 and consumer client version is also > 0.10.2.1 > The issue that we have faced is that, after rebalancing, some of the > partitions gets consumed by 2 instances within a consumer group, leading to > duplication of the entire partition data. Both the instances continue to read > until the next rebalancing, or the restart of those clients. > It looks like that a particular consumer goes on fetching the data from a > partition, but the broker is not able to identify this "stale" consumer > instance. > During this time, we also see the underreplicated partition metrics spiking. > We have hit this twice in production. Please look at it the earliest. -- This message was sent by Atlassian JIRA (v7.6.3#76005)