kafka git commit: KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics
Repository: kafka Updated Branches: refs/heads/0.11.0 d0be27b4e -> 3bc5ee7dd KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics When a Connect distributed worker starts up talking with broker versions 0.10.1.0 and later, it will use the AdminClient to look for the internal topics and attempt to create them if they are missing. Although the AdminClient was added in 0.11.0.0, the AdminClient uses APIs to create topics that existed in 0.10.1.0 and later. This feature works as expected when Connect uses a broker version 0.10.1.0 or later. However, when a Connect distributed worker starts up using a broker older than 0.10.1.0, the AdminClient is not able to find the required APIs and thus will throw an UnsupportedVersionException. Unfortunately, this exception is not caught and instead causes the Connect worker to fail even when the topics already exist. This change handles the UnsupportedVersionException by logging a debug message and doing nothing. The existing producer logic will get information about the topics, which will cause the broker to create them if they donât exist and broker auto-creation of topics is enabled. This is the same behavior that existed prior to 0.11.0.0, and so this change restores that behavior for brokers older than 0.10.1.0. This change also adds a system test that verifies Connect works with a variety of brokers and is able to run source and sink connectors. The test verifies that Connect can read from the internal topics when the connectors are restarted. Author: Randall HauchReviewers: Ewen Cheslack-Postava Closes #3641 from rhauch/kafka-5704 (cherry picked from commit 1a653c813c842c0b67f26fb119d7727e272cf834) Signed-off-by: Ewen Cheslack-Postava Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bc5ee7d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bc5ee7d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bc5ee7d Branch: refs/heads/0.11.0 Commit: 3bc5ee7dd119864e0bbfc6bc095b2e0bc57c32b6 Parents: d0be27b Author: Randall Hauch Authored: Tue Aug 8 20:20:41 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Tue Aug 8 20:20:54 2017 -0700 -- .../apache/kafka/connect/util/TopicAdmin.java | 10 +++--- .../kafka/connect/util/TopicAdminTest.java | 15 +++- .../tests/connect/connect_distributed_test.py | 36 ++-- 3 files changed, 44 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc5ee7d/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index adc3378..5da4f2d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -195,13 +195,14 @@ public class TopicAdmin implements AutoCloseable { * are excluded from the result. * * If multiple topic definitions have the same topic name, the last one with that name will be used. - * + * + * Apache Kafka added support for creating topics in 0.10.1.0, so this method works as expected with that and later versions. + * With brokers older than 0.10.1.0, this method is unable to create topics and always returns an empty set. * * @param topics the specifications of the topics * @return the names of the topics that were created by this operation; never null but possibly empty * @throws ConnectExceptionif an error occurs, the operation takes too long, or the thread is interrupted while * attempting to perform this operation - * @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request */ public Set createTopics(NewTopic... topics) { Map topicsByName = new HashMap<>(); @@ -233,8 +234,9 @@ public class TopicAdmin implements AutoCloseable { continue; } if (cause instanceof UnsupportedVersionException) { -log.error("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}", topicNameList, bootstrapServers); -throw (UnsupportedVersionException) cause; +log.debug("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at
kafka git commit: KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics
Repository: kafka Updated Branches: refs/heads/trunk b8cf97686 -> 1a653c813 KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics When a Connect distributed worker starts up talking with broker versions 0.10.1.0 and later, it will use the AdminClient to look for the internal topics and attempt to create them if they are missing. Although the AdminClient was added in 0.11.0.0, the AdminClient uses APIs to create topics that existed in 0.10.1.0 and later. This feature works as expected when Connect uses a broker version 0.10.1.0 or later. However, when a Connect distributed worker starts up using a broker older than 0.10.1.0, the AdminClient is not able to find the required APIs and thus will throw an UnsupportedVersionException. Unfortunately, this exception is not caught and instead causes the Connect worker to fail even when the topics already exist. This change handles the UnsupportedVersionException by logging a debug message and doing nothing. The existing producer logic will get information about the topics, which will cause the broker to create them if they donât exist and broker auto-creation of topics is enabled. This is the same behavior that existed prior to 0.11.0.0, and so this change restores that behavior for brokers older than 0.10.1.0. This change also adds a system test that verifies Connect works with a variety of brokers and is able to run source and sink connectors. The test verifies that Connect can read from the internal topics when the connectors are restarted. Author: Randall HauchReviewers: Ewen Cheslack-Postava Closes #3641 from rhauch/kafka-5704 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a653c81 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a653c81 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a653c81 Branch: refs/heads/trunk Commit: 1a653c813c842c0b67f26fb119d7727e272cf834 Parents: b8cf976 Author: Randall Hauch Authored: Tue Aug 8 20:20:41 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Tue Aug 8 20:20:41 2017 -0700 -- .../apache/kafka/connect/util/TopicAdmin.java | 10 +++--- .../kafka/connect/util/TopicAdminTest.java | 15 +++- .../tests/connect/connect_distributed_test.py | 36 ++-- 3 files changed, 44 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/1a653c81/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java -- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index adc3378..5da4f2d 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -195,13 +195,14 @@ public class TopicAdmin implements AutoCloseable { * are excluded from the result. * * If multiple topic definitions have the same topic name, the last one with that name will be used. - * + * + * Apache Kafka added support for creating topics in 0.10.1.0, so this method works as expected with that and later versions. + * With brokers older than 0.10.1.0, this method is unable to create topics and always returns an empty set. * * @param topics the specifications of the topics * @return the names of the topics that were created by this operation; never null but possibly empty * @throws ConnectExceptionif an error occurs, the operation takes too long, or the thread is interrupted while * attempting to perform this operation - * @throws UnsupportedVersionException if the broker does not support the necessary APIs to perform this request */ public Set createTopics(NewTopic... topics) { Map topicsByName = new HashMap<>(); @@ -233,8 +234,9 @@ public class TopicAdmin implements AutoCloseable { continue; } if (cause instanceof UnsupportedVersionException) { -log.error("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}", topicNameList, bootstrapServers); -throw (UnsupportedVersionException) cause; +log.debug("Unable to use Kafka admin client to create topic descriptions for '{}' using the brokers at {}," + + "falling back to assume topic(s) exist or will be auto-created by the broker",
kafka git commit: MINOR: support retrieving cluster_id in system tests
Repository: kafka Updated Branches: refs/heads/0.11.0 0bf34e715 -> d0be27b4e MINOR: support retrieving cluster_id in system tests ewencp would be great to cherry-pick this back into 0.11.x if possible Author: Xavier LéautéReviewers: Ewen Cheslack-Postava Closes #3645 from xvrl/system-test-cluster-id (cherry picked from commit b8cf976865f3559b4e5c45eeb261444cd1d7383f) Signed-off-by: Ewen Cheslack-Postava Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0be27b4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0be27b4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0be27b4 Branch: refs/heads/0.11.0 Commit: d0be27b4e075ec604ed4636c5e7799513a52ef74 Parents: 0bf34e7 Author: Xavier Léauté Authored: Tue Aug 8 19:58:47 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Tue Aug 8 19:59:12 2017 -0700 -- tests/kafkatest/services/kafka/kafka.py | 10 ++ 1 file changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/d0be27b4/tests/kafkatest/services/kafka/kafka.py -- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index b22b518..cd71e5e 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -488,6 +488,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) +def cluster_id(self): +""" Get the current cluster id +""" +self.logger.debug("Querying ZooKeeper to retrieve cluster id") +cluster = json.loads(self.zk.query("/cluster/id")) +if cluster is None: +raise Exception("Error querying ZK for cluster id.") + +return cluster['id'] + def list_consumer_groups(self, node=None, new_consumer=True, command_config=None): """ Get list of consumer groups. """
kafka git commit: MINOR: support retrieving cluster_id in system tests
Repository: kafka Updated Branches: refs/heads/trunk fb8edbdc6 -> b8cf97686 MINOR: support retrieving cluster_id in system tests ewencp would be great to cherry-pick this back into 0.11.x if possible Author: Xavier LéautéReviewers: Ewen Cheslack-Postava Closes #3645 from xvrl/system-test-cluster-id Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8cf9768 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8cf9768 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8cf9768 Branch: refs/heads/trunk Commit: b8cf976865f3559b4e5c45eeb261444cd1d7383f Parents: fb8edbd Author: Xavier Léauté Authored: Tue Aug 8 19:58:47 2017 -0700 Committer: Ewen Cheslack-Postava Committed: Tue Aug 8 19:58:47 2017 -0700 -- tests/kafkatest/services/kafka/kafka.py | 10 ++ 1 file changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/b8cf9768/tests/kafkatest/services/kafka/kafka.py -- diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index e941a3d..ee60bab 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -531,6 +531,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Leader for topic %s and partition %d is now: %d" % (topic, partition, leader_idx)) return self.get_node(leader_idx) +def cluster_id(self): +""" Get the current cluster id +""" +self.logger.debug("Querying ZooKeeper to retrieve cluster id") +cluster = json.loads(self.zk.query("/cluster/id")) +if cluster is None: +raise Exception("Error querying ZK for cluster id.") + +return cluster['id'] + def list_consumer_groups(self, node=None, new_consumer=True, command_config=None): """ Get list of consumer groups. """
kafka git commit: MINOR: Update dependencies for 1.0.0 release
Repository: kafka Updated Branches: refs/heads/trunk 649276abb -> fb8edbdc6 MINOR: Update dependencies for 1.0.0 release Notable updates: 1. Gradle 4.1 includes a number of performance and CLI improvements as well as initial Java 9 support. 2. Scala 2.12.3 has substantial compilation time improvements. 3. lz4-java 1.4 allows us to remove a workaround in KafkaLZ4BlockInputStream (not done in this PR). 4. snappy-java 1.1.4 improved performance of compression (5%) and decompression (20%). There was a slight increase in the compressed size in one of our tests. Not updated: 1. PowerMock due to a couple of regressions. I investigated one of them and filed https://github.com/powermock/powermock/issues/828. 2. Jackson, which will be done via #3631. 3. Rocksdb, which will be done via #3519. Author: Ismael JumaReviewers: Jason Gustafson Closes #3619 from ijuma/update-deps-for-1.0.0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb8edbdc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb8edbdc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb8edbdc Branch: refs/heads/trunk Commit: fb8edbdc6314411e0ed006a76e9658460a99208a Parents: 649276a Author: Ismael Juma Authored: Wed Aug 9 01:11:39 2017 +0100 Committer: Ismael Juma Committed: Wed Aug 9 01:11:39 2017 +0100 -- build.gradle| 20 +++--- .../scala/kafka/security/minikdc/MiniKdc.scala | 13 ++--- .../kafka/message/MessageCompressionTest.scala | 5 ++-- gradle/dependencies.gradle | 28 4 files changed, 38 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/fb8edbdc/build.gradle -- diff --git a/build.gradle b/build.gradle index 53c0e8d..0e36a85 100644 --- a/build.gradle +++ b/build.gradle @@ -24,10 +24,10 @@ buildscript { dependencies { // For Apache Rat plugin to ignore non-Git files -classpath "org.ajoberstar:grgit:1.9.2" -classpath 'com.github.ben-manes:gradle-versions-plugin:0.14.0' +classpath "org.ajoberstar:grgit:1.9.3" +classpath 'com.github.ben-manes:gradle-versions-plugin:0.15.0' classpath 'org.scoverage:gradle-scoverage:2.1.0' -classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.4' +classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.1' } } @@ -77,7 +77,7 @@ allprojects { } ext { - gradleVersion = "3.5" + gradleVersion = "4.1" buildVersionFileName = "kafka-version.properties" maxPermSizeArgs = [] @@ -548,7 +548,7 @@ project(':core') { testCompile libs.apachedsMavibotPartition testCompile libs.apachedsJdbmPartition testCompile libs.junit -testCompile libs.scalaTest +testCompile libs.scalatest testCompile libs.jfreechart scoverage libs.scoveragePlugin @@ -935,11 +935,11 @@ project(':jmh-benchmarks') { } dependencies { - compile project(':clients') - compile project(':streams') - compile 'org.openjdk.jmh:jmh-core:1.18' - compile 'org.openjdk.jmh:jmh-generator-annprocess:1.18' - compile 'org.openjdk.jmh:jmh-core-benchmarks:1.18' +compile project(':clients') +compile project(':streams') +compile libs.jmhCore +compile libs.jmhGeneratorAnnProcess +compile libs.jmhCoreBenchmarks } jar { http://git-wip-us.apache.org/repos/asf/kafka/blob/fb8edbdc/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala -- diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index c7b8973..2ff281b 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -28,7 +28,6 @@ import java.util.{Locale, Properties, UUID} import kafka.utils.{CoreUtils, Exit, Logging} import scala.collection.JavaConverters._ -import org.apache.commons.io.IOUtils import org.apache.commons.lang.text.StrSubstitutor import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry} import org.apache.directory.api.ldap.model.ldif.LdifReader @@ -198,9 +197,15 @@ class MiniKdc(config: Properties, workDir: File) extends Logging { "3" -> orgDomain.toUpperCase(Locale.ENGLISH), "4" -> bindAddress ) - val inputStream = MiniKdc.getResourceAsStream("minikdc.ldiff") - try addEntriesToDirectoryService(StrSubstitutor.replace(IOUtils.toString(inputStream), map.asJava)) - finally CoreUtils.swallow(inputStream.close()) + val reader = new BufferedReader(new
kafka git commit: KAFKA-2360; Extract producer-specific configs out of the common PerfConfig
Repository: kafka Updated Branches: refs/heads/trunk 98d4a4833 -> 649276abb KAFKA-2360; Extract producer-specific configs out of the common PerfConfig Separate `batch.size`, `message-size` and `compression-code` from PerfConfig to a newly-created ProducerPerfConfig in order to hide them in ConsumerPerf tool. Author: huxihxReviewers: Ismael Juma Closes #3613 from huxihx/KAFKA-2360 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/649276ab Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/649276ab Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/649276ab Branch: refs/heads/trunk Commit: 649276abb284c4f71504e9f028c5ab1450849fae Parents: 98d4a48 Author: huxihx Authored: Tue Aug 8 23:22:57 2017 +0100 Committer: Ismael Juma Committed: Tue Aug 8 23:22:57 2017 +0100 -- .../scala/kafka/tools/ConsumerPerformance.scala| 2 ++ core/src/main/scala/kafka/tools/PerfConfig.scala | 17 - .../scala/kafka/tools/ProducerPerformance.scala| 15 +++ .../kafka/tools/SimpleConsumerPerformance.scala| 2 ++ 4 files changed, 19 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/649276ab/core/src/main/scala/kafka/tools/ConsumerPerformance.scala -- diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala index 2880c94..ed1b440 100644 --- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala @@ -240,6 +240,8 @@ object ConsumerPerformance { .describedAs("config file") .ofType(classOf[String]) val printMetricsOpt = parser.accepts("print-metrics", "Print out the metrics. This only applies to new consumer.") +val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + + "interval as configured by reporting-interval") val options = parser.parse(args: _*) http://git-wip-us.apache.org/repos/asf/kafka/blob/649276ab/core/src/main/scala/kafka/tools/PerfConfig.scala -- diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala b/core/src/main/scala/kafka/tools/PerfConfig.scala index a285a1c..264ae6a 100644 --- a/core/src/main/scala/kafka/tools/PerfConfig.scala +++ b/core/src/main/scala/kafka/tools/PerfConfig.scala @@ -37,23 +37,6 @@ class PerfConfig(args: Array[String]) { .describedAs("date format") .ofType(classOf[String]) .defaultsTo("-MM-dd HH:mm:ss:SSS") - val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, stats are reported for each reporting " + -"interval as configured by reporting-interval") val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing the header for the stats ") - val messageSizeOpt = parser.accepts("message-size", "The size of each message.") -.withRequiredArg -.describedAs("size") -.ofType(classOf[java.lang.Integer]) -.defaultsTo(100) - val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write in a single batch.") -.withRequiredArg -.describedAs("size") -.ofType(classOf[java.lang.Integer]) -.defaultsTo(200) - val compressionCodecOpt = parser.accepts("compression-codec", "If set, messages are sent compressed") -.withRequiredArg -.describedAs("supported codec: NoCompressionCodec as 0, GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 3") -.ofType(classOf[java.lang.Integer]) -.defaultsTo(0) val helpOpt = parser.accepts("help", "Print usage.") } http://git-wip-us.apache.org/repos/asf/kafka/blob/649276ab/core/src/main/scala/kafka/tools/ProducerPerformance.scala -- diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala b/core/src/main/scala/kafka/tools/ProducerPerformance.scala index f14253b..0f21831 100644 --- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala +++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala @@ -125,6 +125,21 @@ object ProducerPerformance extends Logging { .describedAs("metrics directory") .ofType(classOf[java.lang.String]) val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.") +val messageSizeOpt = parser.accepts("message-size", "The size of each message.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(100) +val batchSizeOpt =
kafka git commit: MINOR: Remove unused GroupState.state field
Repository: kafka Updated Branches: refs/heads/trunk c9ffab162 -> 98d4a4833 MINOR: Remove unused GroupState.state field This field doesn't seem to be used and the value for `AwaitingSync` seems to be wrong (it seems like it should have been `2` instead of `5`). Author: Ismael JumaReviewers: Jason Gustafson , Guozhang Wang Closes #3572 from ijuma/remove-unused-group-state-field Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98d4a483 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98d4a483 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98d4a483 Branch: refs/heads/trunk Commit: 98d4a483382dacad280e0cd25f10bccca51328a5 Parents: c9ffab1 Author: Ismael Juma Authored: Tue Aug 8 23:15:57 2017 +0100 Committer: Ismael Juma Committed: Tue Aug 8 23:15:57 2017 +0100 -- .../scala/kafka/coordinator/group/GroupMetadata.scala | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/98d4a483/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala -- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 35a1fc7..18096bb 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -24,7 +24,7 @@ import org.apache.kafka.common.TopicPartition import scala.collection.{Seq, immutable, mutable} -private[group] sealed trait GroupState { def state: Byte } +private[group] sealed trait GroupState /** * Group is preparing to rebalance @@ -39,7 +39,7 @@ private[group] sealed trait GroupState { def state: Byte } * all members have left the group => Empty * group is removed by partition emigration => Dead */ -private[group] case object PreparingRebalance extends GroupState { val state: Byte = 1 } +private[group] case object PreparingRebalance extends GroupState /** * Group is awaiting state assignment from the leader @@ -54,7 +54,7 @@ private[group] case object PreparingRebalance extends GroupState { val state: By * member failure detected => PreparingRebalance * group is removed by partition emigration => Dead */ -private[group] case object AwaitingSync extends GroupState { val state: Byte = 5} +private[group] case object AwaitingSync extends GroupState /** * Group is stable @@ -70,7 +70,7 @@ private[group] case object AwaitingSync extends GroupState { val state: Byte = 5 * follower join-group with new metadata => PreparingRebalance * group is removed by partition emigration => Dead */ -private[group] case object Stable extends GroupState { val state: Byte = 3 } +private[group] case object Stable extends GroupState /** * Group has no more members and its metadata is being removed @@ -83,7 +83,7 @@ private[group] case object Stable extends GroupState { val state: Byte = 3 } * allow offset fetch requests * transition: Dead is a final state before group metadata is cleaned up, so there are no transitions */ -private[group] case object Dead extends GroupState { val state: Byte = 4 } +private[group] case object Dead extends GroupState /** * Group has no more members, but lingers until all offsets have expired. This state @@ -100,7 +100,7 @@ private[group] case object Dead extends GroupState { val state: Byte = 4 } * group is removed by partition emigration => Dead * group is removed by expiration => Dead */ -private[group] case object Empty extends GroupState { val state: Byte = 5 } +private[group] case object Empty extends GroupState private object GroupMetadata {
kafka git commit: KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions
Repository: kafka Updated Branches: refs/heads/trunk 57770dd23 -> c9ffab162 KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions The AdminClient does not properly clear calls from the callsInFlight structure. Later, in an effort to clear the lingering call objects, it closes the connection they are associated with. This disrupts new incoming calls, which then get BrokerNotAvailableException. This patch fixes this bug by properly removing completed calls from the callsInFlight structure. It also adds the Call#aborted flag, which ensures that we throw the right exception (TimeoutException instead of DisconnectException) and only abort a connection once -- even if there is a similar bug in the future which causes old Call objects to linger. Author: Colin P. MccabeReviewers: Ismael Juma Closes #3584 from cmccabe/KAFKA-5658 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9ffab16 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9ffab16 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9ffab16 Branch: refs/heads/trunk Commit: c9ffab16228ecd5d931b58d93dfa3f49287d2909 Parents: 57770dd Author: Colin P. Mccabe Authored: Tue Aug 8 09:38:15 2017 +0100 Committer: Ismael Juma Committed: Tue Aug 8 09:38:22 2017 +0100 -- checkstyle/suppressions.xml | 2 +- .../kafka/clients/admin/KafkaAdminClient.java | 36 + .../clients/admin/KafkaAdminClientTest.java | 57 .../clients/admin/MockKafkaAdminClientEnv.java | 18 +-- 4 files changed, 99 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/c9ffab16/checkstyle/suppressions.xml -- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 6d2b559..cca1429 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -61,7 +61,7 @@ files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/> + files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/c9ffab16/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2129059..ed32bff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -322,9 +322,8 @@ public class KafkaAdminClient extends AdminClient { } } -static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata) { +static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata, Time time) { Metrics metrics = null; -Time time = Time.SYSTEM; String clientId = generateClientId(config); try { @@ -441,6 +440,7 @@ public class KafkaAdminClient extends AdminClient { private final long deadlineMs; private final NodeProvider nodeProvider; private int tries = 0; +private boolean aborted = false; Call(String callName, long deadlineMs, NodeProvider nodeProvider) { this.callName = callName; @@ -459,6 +459,14 @@ public class KafkaAdminClient extends AdminClient { * @param throwable The failure exception. */ final void fail(long now, Throwable throwable) { +if (aborted) { +if (log.isDebugEnabled()) { +log.debug("{} aborted at {} after {} attempt(s)", this, now, tries, +new Exception(prettyPrintException(throwable))); +} +handleFailure(new TimeoutException("Aborted due to timeout.")); +return; +} // If this is an UnsupportedVersionException that we can retry, do so. if ((throwable instanceof UnsupportedVersionException) && handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { @@ -792,12 +800,17 @@ public class KafkaAdminClient extends AdminClient { // only one we need to check the timeout for. Call call = contexts.get(0); if (processor.callHasExpired(call)) {
kafka git commit: KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions
Repository: kafka Updated Branches: refs/heads/0.11.0 557001f9f -> 0bf34e715 KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions The AdminClient does not properly clear calls from the callsInFlight structure. Later, in an effort to clear the lingering call objects, it closes the connection they are associated with. This disrupts new incoming calls, which then get BrokerNotAvailableException. This patch fixes this bug by properly removing completed calls from the callsInFlight structure. It also adds the Call#aborted flag, which ensures that we throw the right exception (TimeoutException instead of DisconnectException) and only abort a connection once -- even if there is a similar bug in the future which causes old Call objects to linger. Author: Colin P. MccabeReviewers: Ismael Juma Closes #3584 from cmccabe/KAFKA-5658 (cherry picked from commit c9ffab16228ecd5d931b58d93dfa3f49287d2909) Signed-off-by: Ismael Juma Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bf34e71 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bf34e71 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bf34e71 Branch: refs/heads/0.11.0 Commit: 0bf34e7157319bdd12ae907788bc40c70f9c7ad0 Parents: 557001f Author: Colin P. Mccabe Authored: Tue Aug 8 09:38:15 2017 +0100 Committer: Ismael Juma Committed: Tue Aug 8 09:40:17 2017 +0100 -- checkstyle/suppressions.xml | 2 +- .../kafka/clients/admin/KafkaAdminClient.java | 36 + .../clients/admin/KafkaAdminClientTest.java | 57 .../clients/admin/MockKafkaAdminClientEnv.java | 18 +-- 4 files changed, 99 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/checkstyle/suppressions.xml -- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 7f98820..0a1616a 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -61,7 +61,7 @@ files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/> + files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2129059..ed32bff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -322,9 +322,8 @@ public class KafkaAdminClient extends AdminClient { } } -static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata) { +static KafkaAdminClient createInternal(AdminClientConfig config, KafkaClient client, Metadata metadata, Time time) { Metrics metrics = null; -Time time = Time.SYSTEM; String clientId = generateClientId(config); try { @@ -441,6 +440,7 @@ public class KafkaAdminClient extends AdminClient { private final long deadlineMs; private final NodeProvider nodeProvider; private int tries = 0; +private boolean aborted = false; Call(String callName, long deadlineMs, NodeProvider nodeProvider) { this.callName = callName; @@ -459,6 +459,14 @@ public class KafkaAdminClient extends AdminClient { * @param throwable The failure exception. */ final void fail(long now, Throwable throwable) { +if (aborted) { +if (log.isDebugEnabled()) { +log.debug("{} aborted at {} after {} attempt(s)", this, now, tries, +new Exception(prettyPrintException(throwable))); +} +handleFailure(new TimeoutException("Aborted due to timeout.")); +return; +} // If this is an UnsupportedVersionException that we can retry, do so. if ((throwable instanceof UnsupportedVersionException) && handleUnsupportedVersionException((UnsupportedVersionException) throwable)) { @@ -792,12 +800,17 @@ public class KafkaAdminClient extends AdminClient { // only one we need to check