kafka git commit: KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics

2017-08-08 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 d0be27b4e -> 3bc5ee7dd


KAFKA-5704: Corrected Connect distributed startup behavior to allow older 
brokers to auto-create topics

When a Connect distributed worker starts up talking with broker versions 
0.10.1.0 and later, it will use the AdminClient to look for the internal topics 
and attempt to create them if they are missing. Although the AdminClient was 
added in 0.11.0.0, the AdminClient uses APIs to create topics that existed in 
0.10.1.0 and later. This feature works as expected when Connect uses a broker 
version 0.10.1.0 or later.

However, when a Connect distributed worker starts up using a broker older than 
0.10.1.0, the AdminClient is not able to find the required APIs and thus will 
throw an UnsupportedVersionException. Unfortunately, this exception is not 
caught and instead causes the Connect worker to fail even when the topics 
already exist.

This change handles the UnsupportedVersionException by logging a debug message 
and doing nothing. The existing producer logic will get information about the 
topics, which will cause the broker to create them if they don’t exist and 
broker auto-creation of topics is enabled. This is the same behavior that 
existed prior to 0.11.0.0, and so this change restores that behavior for 
brokers older than 0.10.1.0.

This change also adds a system test that verifies Connect works with a variety 
of brokers and is able to run source and sink connectors. The test verifies 
that Connect can read from the internal topics when the connectors are 
restarted.

Author: Randall Hauch 

Reviewers: Ewen Cheslack-Postava 

Closes #3641 from rhauch/kafka-5704

(cherry picked from commit 1a653c813c842c0b67f26fb119d7727e272cf834)
Signed-off-by: Ewen Cheslack-Postava 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3bc5ee7d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3bc5ee7d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3bc5ee7d

Branch: refs/heads/0.11.0
Commit: 3bc5ee7dd119864e0bbfc6bc095b2e0bc57c32b6
Parents: d0be27b
Author: Randall Hauch 
Authored: Tue Aug 8 20:20:41 2017 -0700
Committer: Ewen Cheslack-Postava 
Committed: Tue Aug 8 20:20:54 2017 -0700

--
 .../apache/kafka/connect/util/TopicAdmin.java   | 10 +++---
 .../kafka/connect/util/TopicAdminTest.java  | 15 +++-
 .../tests/connect/connect_distributed_test.py   | 36 ++--
 3 files changed, 44 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/3bc5ee7d/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index adc3378..5da4f2d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -195,13 +195,14 @@ public class TopicAdmin implements AutoCloseable {
  * are excluded from the result.
  * 
  * If multiple topic definitions have the same topic name, the last one 
with that name will be used.
- * 
+ * 
+ * Apache Kafka added support for creating topics in 0.10.1.0, so this 
method works as expected with that and later versions.
+ * With brokers older than 0.10.1.0, this method is unable to create 
topics and always returns an empty set.
  *
  * @param topics the specifications of the topics
  * @return the names of the topics that were created by this operation; 
never null but possibly empty
  * @throws ConnectExceptionif an error occurs, the operation 
takes too long, or the thread is interrupted while
  * attempting to perform this operation
- * @throws UnsupportedVersionException if the broker does not support the 
necessary APIs to perform this request
  */
 public Set createTopics(NewTopic... topics) {
 Map topicsByName = new HashMap<>();
@@ -233,8 +234,9 @@ public class TopicAdmin implements AutoCloseable {
 continue;
 }
 if (cause instanceof UnsupportedVersionException) {
-log.error("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}", topicNameList, 
bootstrapServers);
-throw (UnsupportedVersionException) cause;
+log.debug("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at 

kafka git commit: KAFKA-5704: Corrected Connect distributed startup behavior to allow older brokers to auto-create topics

2017-08-08 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/trunk b8cf97686 -> 1a653c813


KAFKA-5704: Corrected Connect distributed startup behavior to allow older 
brokers to auto-create topics

When a Connect distributed worker starts up talking with broker versions 
0.10.1.0 and later, it will use the AdminClient to look for the internal topics 
and attempt to create them if they are missing. Although the AdminClient was 
added in 0.11.0.0, the AdminClient uses APIs to create topics that existed in 
0.10.1.0 and later. This feature works as expected when Connect uses a broker 
version 0.10.1.0 or later.

However, when a Connect distributed worker starts up using a broker older than 
0.10.1.0, the AdminClient is not able to find the required APIs and thus will 
throw an UnsupportedVersionException. Unfortunately, this exception is not 
caught and instead causes the Connect worker to fail even when the topics 
already exist.

This change handles the UnsupportedVersionException by logging a debug message 
and doing nothing. The existing producer logic will get information about the 
topics, which will cause the broker to create them if they don’t exist and 
broker auto-creation of topics is enabled. This is the same behavior that 
existed prior to 0.11.0.0, and so this change restores that behavior for 
brokers older than 0.10.1.0.

This change also adds a system test that verifies Connect works with a variety 
of brokers and is able to run source and sink connectors. The test verifies 
that Connect can read from the internal topics when the connectors are 
restarted.

Author: Randall Hauch 

Reviewers: Ewen Cheslack-Postava 

Closes #3641 from rhauch/kafka-5704


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a653c81
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a653c81
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a653c81

Branch: refs/heads/trunk
Commit: 1a653c813c842c0b67f26fb119d7727e272cf834
Parents: b8cf976
Author: Randall Hauch 
Authored: Tue Aug 8 20:20:41 2017 -0700
Committer: Ewen Cheslack-Postava 
Committed: Tue Aug 8 20:20:41 2017 -0700

--
 .../apache/kafka/connect/util/TopicAdmin.java   | 10 +++---
 .../kafka/connect/util/TopicAdminTest.java  | 15 +++-
 .../tests/connect/connect_distributed_test.py   | 36 ++--
 3 files changed, 44 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a653c81/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
--
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index adc3378..5da4f2d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -195,13 +195,14 @@ public class TopicAdmin implements AutoCloseable {
  * are excluded from the result.
  * 
  * If multiple topic definitions have the same topic name, the last one 
with that name will be used.
- * 
+ * 
+ * Apache Kafka added support for creating topics in 0.10.1.0, so this 
method works as expected with that and later versions.
+ * With brokers older than 0.10.1.0, this method is unable to create 
topics and always returns an empty set.
  *
  * @param topics the specifications of the topics
  * @return the names of the topics that were created by this operation; 
never null but possibly empty
  * @throws ConnectExceptionif an error occurs, the operation 
takes too long, or the thread is interrupted while
  * attempting to perform this operation
- * @throws UnsupportedVersionException if the broker does not support the 
necessary APIs to perform this request
  */
 public Set createTopics(NewTopic... topics) {
 Map topicsByName = new HashMap<>();
@@ -233,8 +234,9 @@ public class TopicAdmin implements AutoCloseable {
 continue;
 }
 if (cause instanceof UnsupportedVersionException) {
-log.error("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}", topicNameList, 
bootstrapServers);
-throw (UnsupportedVersionException) cause;
+log.debug("Unable to use Kafka admin client to create 
topic descriptions for '{}' using the brokers at {}," +
+  "falling back to assume topic(s) exist 
or will be auto-created by the broker", 

kafka git commit: MINOR: support retrieving cluster_id in system tests

2017-08-08 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 0bf34e715 -> d0be27b4e


MINOR: support retrieving cluster_id in system tests

ewencp would be great to cherry-pick this back into 0.11.x if possible

Author: Xavier Léauté 

Reviewers: Ewen Cheslack-Postava 

Closes #3645 from xvrl/system-test-cluster-id

(cherry picked from commit b8cf976865f3559b4e5c45eeb261444cd1d7383f)
Signed-off-by: Ewen Cheslack-Postava 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0be27b4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0be27b4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0be27b4

Branch: refs/heads/0.11.0
Commit: d0be27b4e075ec604ed4636c5e7799513a52ef74
Parents: 0bf34e7
Author: Xavier Léauté 
Authored: Tue Aug 8 19:58:47 2017 -0700
Committer: Ewen Cheslack-Postava 
Committed: Tue Aug 8 19:59:12 2017 -0700

--
 tests/kafkatest/services/kafka/kafka.py | 10 ++
 1 file changed, 10 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0be27b4/tests/kafkatest/services/kafka/kafka.py
--
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index b22b518..cd71e5e 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -488,6 +488,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 self.logger.info("Leader for topic %s and partition %d is now: %d" % 
(topic, partition, leader_idx))
 return self.get_node(leader_idx)
 
+def cluster_id(self):
+""" Get the current cluster id
+"""
+self.logger.debug("Querying ZooKeeper to retrieve cluster id")
+cluster = json.loads(self.zk.query("/cluster/id"))
+if cluster is None:
+raise Exception("Error querying ZK for cluster id.")
+
+return cluster['id']
+
 def list_consumer_groups(self, node=None, new_consumer=True, 
command_config=None):
 """ Get list of consumer groups.
 """



kafka git commit: MINOR: support retrieving cluster_id in system tests

2017-08-08 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/trunk fb8edbdc6 -> b8cf97686


MINOR: support retrieving cluster_id in system tests

ewencp would be great to cherry-pick this back into 0.11.x if possible

Author: Xavier Léauté 

Reviewers: Ewen Cheslack-Postava 

Closes #3645 from xvrl/system-test-cluster-id


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8cf9768
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8cf9768
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8cf9768

Branch: refs/heads/trunk
Commit: b8cf976865f3559b4e5c45eeb261444cd1d7383f
Parents: fb8edbd
Author: Xavier Léauté 
Authored: Tue Aug 8 19:58:47 2017 -0700
Committer: Ewen Cheslack-Postava 
Committed: Tue Aug 8 19:58:47 2017 -0700

--
 tests/kafkatest/services/kafka/kafka.py | 10 ++
 1 file changed, 10 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8cf9768/tests/kafkatest/services/kafka/kafka.py
--
diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index e941a3d..ee60bab 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -531,6 +531,16 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 self.logger.info("Leader for topic %s and partition %d is now: %d" % 
(topic, partition, leader_idx))
 return self.get_node(leader_idx)
 
+def cluster_id(self):
+""" Get the current cluster id
+"""
+self.logger.debug("Querying ZooKeeper to retrieve cluster id")
+cluster = json.loads(self.zk.query("/cluster/id"))
+if cluster is None:
+raise Exception("Error querying ZK for cluster id.")
+
+return cluster['id']
+
 def list_consumer_groups(self, node=None, new_consumer=True, 
command_config=None):
 """ Get list of consumer groups.
 """



kafka git commit: MINOR: Update dependencies for 1.0.0 release

2017-08-08 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 649276abb -> fb8edbdc6


MINOR: Update dependencies for 1.0.0 release

Notable updates:

1. Gradle 4.1 includes a number of performance and
CLI improvements as well as initial Java 9 support.

2. Scala 2.12.3 has substantial compilation time
improvements.

3. lz4-java 1.4 allows us to remove a workaround in
KafkaLZ4BlockInputStream (not done in this PR).

4. snappy-java 1.1.4 improved performance of compression (5%)
and decompression (20%). There was a slight increase in the
compressed size in one of our tests.

Not updated:

1. PowerMock due to a couple of regressions. I investigated one of them
and filed https://github.com/powermock/powermock/issues/828.

2. Jackson, which will be done via #3631.

3. Rocksdb, which will be done via #3519.

Author: Ismael Juma 

Reviewers: Jason Gustafson 

Closes #3619 from ijuma/update-deps-for-1.0.0


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fb8edbdc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fb8edbdc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fb8edbdc

Branch: refs/heads/trunk
Commit: fb8edbdc6314411e0ed006a76e9658460a99208a
Parents: 649276a
Author: Ismael Juma 
Authored: Wed Aug 9 01:11:39 2017 +0100
Committer: Ismael Juma 
Committed: Wed Aug 9 01:11:39 2017 +0100

--
 build.gradle| 20 +++---
 .../scala/kafka/security/minikdc/MiniKdc.scala  | 13 ++---
 .../kafka/message/MessageCompressionTest.scala  |  5 ++--
 gradle/dependencies.gradle  | 28 
 4 files changed, 38 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/fb8edbdc/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 53c0e8d..0e36a85 100644
--- a/build.gradle
+++ b/build.gradle
@@ -24,10 +24,10 @@ buildscript {
 
   dependencies {
 // For Apache Rat plugin to ignore non-Git files
-classpath "org.ajoberstar:grgit:1.9.2"
-classpath 'com.github.ben-manes:gradle-versions-plugin:0.14.0'
+classpath "org.ajoberstar:grgit:1.9.3"
+classpath 'com.github.ben-manes:gradle-versions-plugin:0.15.0'
 classpath 'org.scoverage:gradle-scoverage:2.1.0'
-classpath 'com.github.jengelman.gradle.plugins:shadow:1.2.4'
+classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.1'
   }
 }
 
@@ -77,7 +77,7 @@ allprojects {
 }
 
 ext {
-  gradleVersion = "3.5"
+  gradleVersion = "4.1"
   buildVersionFileName = "kafka-version.properties"
 
   maxPermSizeArgs = []
@@ -548,7 +548,7 @@ project(':core') {
 testCompile libs.apachedsMavibotPartition
 testCompile libs.apachedsJdbmPartition
 testCompile libs.junit
-testCompile libs.scalaTest
+testCompile libs.scalatest
 testCompile libs.jfreechart
 
 scoverage libs.scoveragePlugin
@@ -935,11 +935,11 @@ project(':jmh-benchmarks') {
   }
 
   dependencies {
-  compile project(':clients')
-  compile project(':streams')
-  compile 'org.openjdk.jmh:jmh-core:1.18'
-  compile 'org.openjdk.jmh:jmh-generator-annprocess:1.18'
-  compile 'org.openjdk.jmh:jmh-core-benchmarks:1.18'
+compile project(':clients')
+compile project(':streams')
+compile libs.jmhCore
+compile libs.jmhGeneratorAnnProcess
+compile libs.jmhCoreBenchmarks
   }
 
   jar {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fb8edbdc/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
--
diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala 
b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
index c7b8973..2ff281b 100644
--- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
+++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
@@ -28,7 +28,6 @@ import java.util.{Locale, Properties, UUID}
 import kafka.utils.{CoreUtils, Exit, Logging}
 
 import scala.collection.JavaConverters._
-import org.apache.commons.io.IOUtils
 import org.apache.commons.lang.text.StrSubstitutor
 import org.apache.directory.api.ldap.model.entry.{DefaultEntry, Entry}
 import org.apache.directory.api.ldap.model.ldif.LdifReader
@@ -198,9 +197,15 @@ class MiniKdc(config: Properties, workDir: File) extends 
Logging {
 "3" -> orgDomain.toUpperCase(Locale.ENGLISH),
 "4" -> bindAddress
   )
-  val inputStream = MiniKdc.getResourceAsStream("minikdc.ldiff")
-  try 
addEntriesToDirectoryService(StrSubstitutor.replace(IOUtils.toString(inputStream),
 map.asJava))
-  finally CoreUtils.swallow(inputStream.close())
+  val reader = new BufferedReader(new 

kafka git commit: KAFKA-2360; Extract producer-specific configs out of the common PerfConfig

2017-08-08 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 98d4a4833 -> 649276abb


KAFKA-2360; Extract producer-specific configs out of the common PerfConfig

Separate `batch.size`, `message-size` and `compression-code` from PerfConfig to 
a newly-created ProducerPerfConfig in order to hide them in ConsumerPerf tool.

Author: huxihx 

Reviewers: Ismael Juma 

Closes #3613 from huxihx/KAFKA-2360


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/649276ab
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/649276ab
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/649276ab

Branch: refs/heads/trunk
Commit: 649276abb284c4f71504e9f028c5ab1450849fae
Parents: 98d4a48
Author: huxihx 
Authored: Tue Aug 8 23:22:57 2017 +0100
Committer: Ismael Juma 
Committed: Tue Aug 8 23:22:57 2017 +0100

--
 .../scala/kafka/tools/ConsumerPerformance.scala|  2 ++
 core/src/main/scala/kafka/tools/PerfConfig.scala   | 17 -
 .../scala/kafka/tools/ProducerPerformance.scala| 15 +++
 .../kafka/tools/SimpleConsumerPerformance.scala|  2 ++
 4 files changed, 19 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/649276ab/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
--
diff --git a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
index 2880c94..ed1b440 100644
--- a/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerPerformance.scala
@@ -240,6 +240,8 @@ object ConsumerPerformance {
   .describedAs("config file")
   .ofType(classOf[String])
 val printMetricsOpt = parser.accepts("print-metrics", "Print out the 
metrics. This only applies to new consumer.")
+val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, 
stats are reported for each reporting " +
+  "interval as configured by reporting-interval")
 
 val options = parser.parse(args: _*)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/649276ab/core/src/main/scala/kafka/tools/PerfConfig.scala
--
diff --git a/core/src/main/scala/kafka/tools/PerfConfig.scala 
b/core/src/main/scala/kafka/tools/PerfConfig.scala
index a285a1c..264ae6a 100644
--- a/core/src/main/scala/kafka/tools/PerfConfig.scala
+++ b/core/src/main/scala/kafka/tools/PerfConfig.scala
@@ -37,23 +37,6 @@ class PerfConfig(args: Array[String]) {
 .describedAs("date format")
 .ofType(classOf[String])
 .defaultsTo("-MM-dd HH:mm:ss:SSS")
-  val showDetailedStatsOpt = parser.accepts("show-detailed-stats", "If set, 
stats are reported for each reporting " +
-"interval as configured by reporting-interval")
   val hideHeaderOpt = parser.accepts("hide-header", "If set, skips printing 
the header for the stats ")
-  val messageSizeOpt = parser.accepts("message-size", "The size of each 
message.")
-.withRequiredArg
-.describedAs("size")
-.ofType(classOf[java.lang.Integer])
-.defaultsTo(100)
-  val batchSizeOpt = parser.accepts("batch-size", "Number of messages to write 
in a single batch.")
-.withRequiredArg
-.describedAs("size")
-.ofType(classOf[java.lang.Integer])
-.defaultsTo(200)
-  val compressionCodecOpt = parser.accepts("compression-codec", "If set, 
messages are sent compressed")
-.withRequiredArg
-.describedAs("supported codec: NoCompressionCodec as 0, 
GZIPCompressionCodec as 1, SnappyCompressionCodec as 2, LZ4CompressionCodec as 
3")
-.ofType(classOf[java.lang.Integer])
-.defaultsTo(0)
   val helpOpt = parser.accepts("help", "Print usage.")
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/649276ab/core/src/main/scala/kafka/tools/ProducerPerformance.scala
--
diff --git a/core/src/main/scala/kafka/tools/ProducerPerformance.scala 
b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
index f14253b..0f21831 100644
--- a/core/src/main/scala/kafka/tools/ProducerPerformance.scala
+++ b/core/src/main/scala/kafka/tools/ProducerPerformance.scala
@@ -125,6 +125,21 @@ object ProducerPerformance extends Logging {
   .describedAs("metrics directory")
   .ofType(classOf[java.lang.String])
 val useNewProducerOpt = parser.accepts("new-producer", "Use the new 
producer implementation.")
+val messageSizeOpt = parser.accepts("message-size", "The size of each 
message.")
+  .withRequiredArg
+  .describedAs("size")
+  .ofType(classOf[java.lang.Integer])
+  .defaultsTo(100)
+val batchSizeOpt = 

kafka git commit: MINOR: Remove unused GroupState.state field

2017-08-08 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk c9ffab162 -> 98d4a4833


MINOR: Remove unused GroupState.state field

This field doesn't seem to be used and the value for
`AwaitingSync` seems to be wrong (it seems like it
should have been `2` instead of `5`).

Author: Ismael Juma 

Reviewers: Jason Gustafson , Guozhang Wang 


Closes #3572 from ijuma/remove-unused-group-state-field


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/98d4a483
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/98d4a483
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/98d4a483

Branch: refs/heads/trunk
Commit: 98d4a483382dacad280e0cd25f10bccca51328a5
Parents: c9ffab1
Author: Ismael Juma 
Authored: Tue Aug 8 23:15:57 2017 +0100
Committer: Ismael Juma 
Committed: Tue Aug 8 23:15:57 2017 +0100

--
 .../scala/kafka/coordinator/group/GroupMetadata.scala   | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/98d4a483/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
--
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 35a1fc7..18096bb 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Seq, immutable, mutable}
 
-private[group] sealed trait GroupState { def state: Byte }
+private[group] sealed trait GroupState
 
 /**
  * Group is preparing to rebalance
@@ -39,7 +39,7 @@ private[group] sealed trait GroupState { def state: Byte }
  * all members have left the group => Empty
  * group is removed by partition emigration => Dead
  */
-private[group] case object PreparingRebalance extends GroupState { val state: 
Byte = 1 }
+private[group] case object PreparingRebalance extends GroupState
 
 /**
  * Group is awaiting state assignment from the leader
@@ -54,7 +54,7 @@ private[group] case object PreparingRebalance extends 
GroupState { val state: By
  * member failure detected => PreparingRebalance
  * group is removed by partition emigration => Dead
  */
-private[group] case object AwaitingSync extends GroupState { val state: Byte = 
5}
+private[group] case object AwaitingSync extends GroupState
 
 /**
  * Group is stable
@@ -70,7 +70,7 @@ private[group] case object AwaitingSync extends GroupState { 
val state: Byte = 5
  * follower join-group with new metadata => PreparingRebalance
  * group is removed by partition emigration => Dead
  */
-private[group] case object Stable extends GroupState { val state: Byte = 3 }
+private[group] case object Stable extends GroupState
 
 /**
  * Group has no more members and its metadata is being removed
@@ -83,7 +83,7 @@ private[group] case object Stable extends GroupState { val 
state: Byte = 3 }
  * allow offset fetch requests
  * transition: Dead is a final state before group metadata is cleaned up, so 
there are no transitions
  */
-private[group] case object Dead extends GroupState { val state: Byte = 4 }
+private[group] case object Dead extends GroupState
 
 /**
   * Group has no more members, but lingers until all offsets have expired. 
This state
@@ -100,7 +100,7 @@ private[group] case object Dead extends GroupState { val 
state: Byte = 4 }
   * group is removed by partition emigration => Dead
   * group is removed by expiration => Dead
   */
-private[group] case object Empty extends GroupState { val state: Byte = 5 }
+private[group] case object Empty extends GroupState
 
 
 private object GroupMetadata {



kafka git commit: KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions

2017-08-08 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/trunk 57770dd23 -> c9ffab162


KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual 
BrokerNotAvailableExceptions

The AdminClient does not properly clear calls from the callsInFlight structure.
Later, in an effort to clear the lingering call objects, it closes the 
connection
they are associated with. This disrupts new incoming calls, which then get
BrokerNotAvailableException.

This patch fixes this bug by properly removing completed calls from the
callsInFlight structure. It also adds the Call#aborted flag, which
ensures that we throw the right exception (TimeoutException instead of
DisconnectException) and only abort a connection once -- even if there
is a similar bug in the future which causes old Call objects to linger.

Author: Colin P. Mccabe 

Reviewers: Ismael Juma 

Closes #3584 from cmccabe/KAFKA-5658


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9ffab16
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9ffab16
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9ffab16

Branch: refs/heads/trunk
Commit: c9ffab16228ecd5d931b58d93dfa3f49287d2909
Parents: 57770dd
Author: Colin P. Mccabe 
Authored: Tue Aug 8 09:38:15 2017 +0100
Committer: Ismael Juma 
Committed: Tue Aug 8 09:38:22 2017 +0100

--
 checkstyle/suppressions.xml |  2 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 36 +
 .../clients/admin/KafkaAdminClientTest.java | 57 
 .../clients/admin/MockKafkaAdminClientEnv.java  | 18 +--
 4 files changed, 99 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9ffab16/checkstyle/suppressions.xml
--
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6d2b559..cca1429 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -61,7 +61,7 @@
   
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
 
 
+  
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9ffab16/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2129059..ed32bff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -322,9 +322,8 @@ public class KafkaAdminClient extends AdminClient {
 }
 }
 
-static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata) {
+static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata, Time time) {
 Metrics metrics = null;
-Time time = Time.SYSTEM;
 String clientId = generateClientId(config);
 
 try {
@@ -441,6 +440,7 @@ public class KafkaAdminClient extends AdminClient {
 private final long deadlineMs;
 private final NodeProvider nodeProvider;
 private int tries = 0;
+private boolean aborted = false;
 
 Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
 this.callName = callName;
@@ -459,6 +459,14 @@ public class KafkaAdminClient extends AdminClient {
  * @param throwable The failure exception.
  */
 final void fail(long now, Throwable throwable) {
+if (aborted) {
+if (log.isDebugEnabled()) {
+log.debug("{} aborted at {} after {} attempt(s)", this, 
now, tries,
+new Exception(prettyPrintException(throwable)));
+}
+handleFailure(new TimeoutException("Aborted due to timeout."));
+return;
+}
 // If this is an UnsupportedVersionException that we can retry, do 
so.
 if ((throwable instanceof UnsupportedVersionException) &&
  
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
@@ -792,12 +800,17 @@ public class KafkaAdminClient extends AdminClient {
 // only one we need to check the timeout for.
 Call call = contexts.get(0);
 if (processor.callHasExpired(call)) {

kafka git commit: KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual BrokerNotAvailableExceptions

2017-08-08 Thread ijuma
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 557001f9f -> 0bf34e715


KAFKA-5658; Fix AdminClient request timeout handling bug resulting in continual 
BrokerNotAvailableExceptions

The AdminClient does not properly clear calls from the callsInFlight structure.
Later, in an effort to clear the lingering call objects, it closes the 
connection
they are associated with. This disrupts new incoming calls, which then get
BrokerNotAvailableException.

This patch fixes this bug by properly removing completed calls from the
callsInFlight structure. It also adds the Call#aborted flag, which
ensures that we throw the right exception (TimeoutException instead of
DisconnectException) and only abort a connection once -- even if there
is a similar bug in the future which causes old Call objects to linger.

Author: Colin P. Mccabe 

Reviewers: Ismael Juma 

Closes #3584 from cmccabe/KAFKA-5658

(cherry picked from commit c9ffab16228ecd5d931b58d93dfa3f49287d2909)
Signed-off-by: Ismael Juma 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0bf34e71
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0bf34e71
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0bf34e71

Branch: refs/heads/0.11.0
Commit: 0bf34e7157319bdd12ae907788bc40c70f9c7ad0
Parents: 557001f
Author: Colin P. Mccabe 
Authored: Tue Aug 8 09:38:15 2017 +0100
Committer: Ismael Juma 
Committed: Tue Aug 8 09:40:17 2017 +0100

--
 checkstyle/suppressions.xml |  2 +-
 .../kafka/clients/admin/KafkaAdminClient.java   | 36 +
 .../clients/admin/KafkaAdminClientTest.java | 57 
 .../clients/admin/MockKafkaAdminClientEnv.java  | 18 +--
 4 files changed, 99 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/checkstyle/suppressions.xml
--
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7f98820..0a1616a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -61,7 +61,7 @@
   
files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager|KafkaAdminClient)Test.java"/>
 
 
+  
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|KafkaAdminClient)Test.java"/>
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0bf34e71/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 2129059..ed32bff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -322,9 +322,8 @@ public class KafkaAdminClient extends AdminClient {
 }
 }
 
-static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata) {
+static KafkaAdminClient createInternal(AdminClientConfig config, 
KafkaClient client, Metadata metadata, Time time) {
 Metrics metrics = null;
-Time time = Time.SYSTEM;
 String clientId = generateClientId(config);
 
 try {
@@ -441,6 +440,7 @@ public class KafkaAdminClient extends AdminClient {
 private final long deadlineMs;
 private final NodeProvider nodeProvider;
 private int tries = 0;
+private boolean aborted = false;
 
 Call(String callName, long deadlineMs, NodeProvider nodeProvider) {
 this.callName = callName;
@@ -459,6 +459,14 @@ public class KafkaAdminClient extends AdminClient {
  * @param throwable The failure exception.
  */
 final void fail(long now, Throwable throwable) {
+if (aborted) {
+if (log.isDebugEnabled()) {
+log.debug("{} aborted at {} after {} attempt(s)", this, 
now, tries,
+new Exception(prettyPrintException(throwable)));
+}
+handleFailure(new TimeoutException("Aborted due to timeout."));
+return;
+}
 // If this is an UnsupportedVersionException that we can retry, do 
so.
 if ((throwable instanceof UnsupportedVersionException) &&
  
handleUnsupportedVersionException((UnsupportedVersionException) throwable)) {
@@ -792,12 +800,17 @@ public class KafkaAdminClient extends AdminClient {
 // only one we need to check