kafka git commit: KAFKA-2955; Add a simple ">" prompt to console producer

2017-01-31 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk e108a8b4e -> b948f4327


KAFKA-2955; Add a simple ">" prompt to console producer

Author: Manikumar reddy O 

Reviewers: Ismael Juma, Guozhang Wang, Gwen Shapira

Closes #1233 from omkreddy/KAFKA-2955


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b948f432
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b948f432
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b948f432

Branch: refs/heads/trunk
Commit: b948f43278e7641977c07df0069158a61fd3b443
Parents: e108a8b
Author: Manikumar reddy O 
Authored: Tue Jan 31 09:30:29 2017 -0800
Committer: Guozhang Wang 
Committed: Tue Jan 31 09:30:29 2017 -0800

--
 core/src/main/scala/kafka/tools/ConsoleProducer.scala | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/b948f432/core/src/main/scala/kafka/tools/ConsoleProducer.scala
--
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala 
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index c6dcfce..93454d6 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -306,6 +306,7 @@ object ConsoleProducer {
 
 override def readMessage() = {
   lineNumber += 1
+  print(">")
   (reader.readLine(), parseKey) match {
 case (null, _) => null
 case (line, true) =>



kafka git commit: MINOR: Logging improvements in consumer internals

2017-01-31 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk b948f4327 -> 5afe95964


MINOR: Logging improvements in consumer internals

Author: Jason Gustafson 

Reviewers: Manikumar reddy O , Ewen Cheslack-Postava 
, Ismael Juma 

Closes #2469 from hachikuji/improve-consumer-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5afe9596
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5afe9596
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5afe9596

Branch: refs/heads/trunk
Commit: 5afe959647fcad9d01365427f4b455e1586b1fd5
Parents: b948f43
Author: Jason Gustafson 
Authored: Tue Jan 31 12:27:00 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 31 12:27:00 2017 -0800

--
 .../org/apache/kafka/clients/NetworkClient.java |  9 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 ++--
 .../consumer/internals/AbstractCoordinator.java | 33 ++--
 .../consumer/internals/ConsumerCoordinator.java | 41 
 .../internals/ConsumerNetworkClient.java|  9 +++--
 .../clients/consumer/internals/Fetcher.java |  5 ++-
 .../apache/kafka/common/utils/KafkaThread.java  |  9 +
 .../clients/consumer/KafkaConsumerTest.java |  2 -
 .../internals/ConsumerCoordinatorTest.java  | 30 +++---
 9 files changed, 90 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/5afe9596/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
--
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0eb7670..3a75288 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient {
 int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.idString());
 if (currInflight == 0 && 
this.connectionStates.isReady(node.idString())) {
 // if we find an established connection with no in-flight 
requests we can stop right away
+log.trace("Found least loaded node {} connected with no 
in-flight requests", node);
 return node;
 } else if (!this.connectionStates.isBlackedOut(node.idString(), 
now) && currInflight < inflight) {
 // otherwise if this is the best we have found so far, record 
that
 inflight = currInflight;
 found = node;
+} else if (log.isTraceEnabled()) {
+log.trace("Removing node {} from least loaded node selection: 
is-blacked-out: {}, in-flight-requests: {}",
+node, 
this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
 }
 }
 
+if (found != null)
+log.trace("Found least loaded node {}", found);
+else
+log.trace("Least loaded node selection failed to find an available 
node");
+
 return found;
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afe9596/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 23e7ed6..89844f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -78,8 +78,8 @@ import java.util.regex.Pattern;
  * Cross-Version Compatibility
  * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
  * certain features. For example, 0.10.0 brokers do not support 
offsetsForTimes, because this feature was added
- * in version 0.10.1. You will receive an UnsupportedVersionException when 
invoking an API that is not available on the
- * running broker version.
+ * in version 0.10.1. You will receive an {@link 
org.apache.kafka.common.errors.UnsupportedVersionException}
+ * when invoking an API that is not available on the running broker version.
  * 
  *
  * Offsets and Consumer Position
@@ -685,7 +685,6 @@ public class KafkaConsumer implements Consumer {
 metricGrpPrefix,
 this.time,
 retryBackoffMs,
-new ConsumerCoordinator.DefaultOffsetCommitCallback(),
 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
 
config.getInt(ConsumerConfig.

kafka git commit: MINOR: Logging improvements in consumer internals

2017-01-31 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 61024c9d2 -> a50635219


MINOR: Logging improvements in consumer internals

Author: Jason Gustafson 

Reviewers: Manikumar reddy O , Ewen Cheslack-Postava 
, Ismael Juma 

Closes #2469 from hachikuji/improve-consumer-logging

(cherry picked from commit 5afe959647fcad9d01365427f4b455e1586b1fd5)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5063521
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5063521
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5063521

Branch: refs/heads/0.10.2
Commit: a5063521943f8d0f6940b18d4f0d57045aa395ae
Parents: 61024c9
Author: Jason Gustafson 
Authored: Tue Jan 31 12:27:00 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 31 12:28:36 2017 -0800

--
 .../org/apache/kafka/clients/NetworkClient.java |  9 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 ++--
 .../consumer/internals/AbstractCoordinator.java | 33 ++--
 .../consumer/internals/ConsumerCoordinator.java | 41 
 .../internals/ConsumerNetworkClient.java|  9 +++--
 .../clients/consumer/internals/Fetcher.java |  5 ++-
 .../apache/kafka/common/utils/KafkaThread.java  |  9 +
 .../clients/consumer/KafkaConsumerTest.java |  2 -
 .../internals/ConsumerCoordinatorTest.java  | 30 +++---
 9 files changed, 90 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
--
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0eb7670..3a75288 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient {
 int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.idString());
 if (currInflight == 0 && 
this.connectionStates.isReady(node.idString())) {
 // if we find an established connection with no in-flight 
requests we can stop right away
+log.trace("Found least loaded node {} connected with no 
in-flight requests", node);
 return node;
 } else if (!this.connectionStates.isBlackedOut(node.idString(), 
now) && currInflight < inflight) {
 // otherwise if this is the best we have found so far, record 
that
 inflight = currInflight;
 found = node;
+} else if (log.isTraceEnabled()) {
+log.trace("Removing node {} from least loaded node selection: 
is-blacked-out: {}, in-flight-requests: {}",
+node, 
this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
 }
 }
 
+if (found != null)
+log.trace("Found least loaded node {}", found);
+else
+log.trace("Least loaded node selection failed to find an available 
node");
+
 return found;
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6064c39..ed3d607 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -78,8 +78,8 @@ import java.util.regex.Pattern;
  * Cross-Version Compatibility
  * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
  * certain features. For example, 0.10.0 brokers do not support 
offsetsForTimes, because this feature was added
- * in version 0.10.1. You will receive an UnsupportedVersionException when 
invoking an API that is not available on the
- * running broker version.
+ * in version 0.10.1. You will receive an {@link 
org.apache.kafka.common.errors.UnsupportedVersionException}
+ * when invoking an API that is not available on the running broker version.
  * 
  *
  * Offsets and Consumer Position
@@ -685,7 +685,6 @@ public class KafkaConsumer implements Consumer {
 metricGrpPrefix,
 this.time,
 retryBackoffMs,
-new ConsumerCoordinator.DefaultOffsetCommitCallback(),
 
config.g

kafka git commit: KAFKA-4717: Use absolute paths to files in root directory so all jars include LICENSE and NOTICE files

2017-01-31 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 a50635219 -> 54512cbaf


KAFKA-4717: Use absolute paths to files in root directory so all jars include 
LICENSE and NOTICE files

Author: Ewen Cheslack-Postava 

Reviewers: Ismael Juma 

Closes #2473 from ewencp/kafka-4717-connect-license-and-notice-files

(cherry picked from commit 82744414d50d28a7bf7b0084118b68a4411c5781)
Signed-off-by: Ewen Cheslack-Postava 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/54512cba
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/54512cba
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/54512cba

Branch: refs/heads/0.10.2
Commit: 54512cbaf7b624103cb1ad43bec455bbb0709c55
Parents: a506352
Author: Ewen Cheslack-Postava 
Authored: Tue Jan 31 16:06:10 2017 -0800
Committer: Ewen Cheslack-Postava 
Committed: Tue Jan 31 16:06:30 2017 -0800

--
 build.gradle | 42 +-
 1 file changed, 21 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/54512cba/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 7a34c25..32ce14a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -177,21 +177,21 @@ subprojects {
   }
 
   jar {
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
   }
 
   task srcJar(type: Jar) {
 classifier = 'sources'
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from sourceSets.main.allSource
   }
 
   task javadocJar(type: Jar, dependsOn: javadoc) {
 classifier 'javadoc'
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from javadoc.destinationDir
   }
 
@@ -213,15 +213,15 @@ subprojects {
   if(!sourceSets.test.allSource.isEmpty()) {
 task testJar(type: Jar) {
   classifier = 'test'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from sourceSets.test.output
 }
 
 task testSrcJar(type: Jar, dependsOn: testJar) {
   classifier = 'test-sources'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from sourceSets.test.allSource
 }
 
@@ -234,8 +234,8 @@ subprojects {
   plugins.withType(ScalaPlugin) {
 task scaladocJar(type:Jar) {
   classifier = 'scaladoc'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from scaladoc.destinationDir
 }
 
@@ -511,7 +511,7 @@ project(':core') {
':streams:genStreamsConfigDocs'], type: Tar) {
 classifier = 'site-docs'
 compression = Compression.GZIP
-from project.file("../docs")
+from project.file("$rootDir/docs")
 into 'site-docs'
 duplicatesStrategy 'exclude'
   }
@@ -519,10 +519,10 @@ project(':core') {
   tasks.create(name: "releaseTarGz", dependsOn: 
configurations.archives.artifacts, type: Tar) {
 into "kafka_${versions.baseScala}-${version}"
 compression = Compression.GZIP
-from(project.file("../bin")) { into "bin/" }
-from(project.file("../config")) { into "config/" }
-from '../LICENSE'
-from '../NOTICE'
+from(project.file("$rootDir/bin")) { into "bin/" }
+from(project.file("$rootDir/config")) { into "config/" }
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from(configurations.runtime) { into("libs/") }
 from(configurations.archives.artifacts.files) { into("libs/") }
 from(project.siteDocsTar) { into("site-docs/") }
@@ -606,12 +606,12 @@ project(':clients') {
   task determineCommitId {
 ext.commitId = "unknown"
 def takeFromHash = 16
-if (file("../.git/HEAD").exists()) {
-  def headRef = file("../.git/HEAD").text
+if (file("$rootDir/.git/HEAD").exists()) {
+  def headRef = file("$rootDir/.git/HEAD").text
   if (headRef.contains('ref: ')) {
 headRef = headRef.replaceAll('ref: ', '').trim()
-if (file("../.git/$headRef").exists()) {
-commitId = file("../.git/$headRef").text.trim().take(takeFromHash)
+if (file("$rootDir/.git/$headRef").exists()) {
+commitId = 
file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
 }
   } else {
 commitId = headRef.trim().take(takeFromHash)



kafka git commit: KAFKA-4717: Use absolute paths to files in root directory so all jars include LICENSE and NOTICE files

2017-01-31 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/trunk 5afe95964 -> 82744414d


KAFKA-4717: Use absolute paths to files in root directory so all jars include 
LICENSE and NOTICE files

Author: Ewen Cheslack-Postava 

Reviewers: Ismael Juma 

Closes #2473 from ewencp/kafka-4717-connect-license-and-notice-files


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82744414
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82744414
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82744414

Branch: refs/heads/trunk
Commit: 82744414d50d28a7bf7b0084118b68a4411c5781
Parents: 5afe959
Author: Ewen Cheslack-Postava 
Authored: Tue Jan 31 16:06:10 2017 -0800
Committer: Ewen Cheslack-Postava 
Committed: Tue Jan 31 16:06:10 2017 -0800

--
 build.gradle | 42 +-
 1 file changed, 21 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/82744414/build.gradle
--
diff --git a/build.gradle b/build.gradle
index 7a34c25..32ce14a 100644
--- a/build.gradle
+++ b/build.gradle
@@ -177,21 +177,21 @@ subprojects {
   }
 
   jar {
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
   }
 
   task srcJar(type: Jar) {
 classifier = 'sources'
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from sourceSets.main.allSource
   }
 
   task javadocJar(type: Jar, dependsOn: javadoc) {
 classifier 'javadoc'
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from javadoc.destinationDir
   }
 
@@ -213,15 +213,15 @@ subprojects {
   if(!sourceSets.test.allSource.isEmpty()) {
 task testJar(type: Jar) {
   classifier = 'test'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from sourceSets.test.output
 }
 
 task testSrcJar(type: Jar, dependsOn: testJar) {
   classifier = 'test-sources'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from sourceSets.test.allSource
 }
 
@@ -234,8 +234,8 @@ subprojects {
   plugins.withType(ScalaPlugin) {
 task scaladocJar(type:Jar) {
   classifier = 'scaladoc'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from scaladoc.destinationDir
 }
 
@@ -511,7 +511,7 @@ project(':core') {
':streams:genStreamsConfigDocs'], type: Tar) {
 classifier = 'site-docs'
 compression = Compression.GZIP
-from project.file("../docs")
+from project.file("$rootDir/docs")
 into 'site-docs'
 duplicatesStrategy 'exclude'
   }
@@ -519,10 +519,10 @@ project(':core') {
   tasks.create(name: "releaseTarGz", dependsOn: 
configurations.archives.artifacts, type: Tar) {
 into "kafka_${versions.baseScala}-${version}"
 compression = Compression.GZIP
-from(project.file("../bin")) { into "bin/" }
-from(project.file("../config")) { into "config/" }
-from '../LICENSE'
-from '../NOTICE'
+from(project.file("$rootDir/bin")) { into "bin/" }
+from(project.file("$rootDir/config")) { into "config/" }
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from(configurations.runtime) { into("libs/") }
 from(configurations.archives.artifacts.files) { into("libs/") }
 from(project.siteDocsTar) { into("site-docs/") }
@@ -606,12 +606,12 @@ project(':clients') {
   task determineCommitId {
 ext.commitId = "unknown"
 def takeFromHash = 16
-if (file("../.git/HEAD").exists()) {
-  def headRef = file("../.git/HEAD").text
+if (file("$rootDir/.git/HEAD").exists()) {
+  def headRef = file("$rootDir/.git/HEAD").text
   if (headRef.contains('ref: ')) {
 headRef = headRef.replaceAll('ref: ', '').trim()
-if (file("../.git/$headRef").exists()) {
-commitId = file("../.git/$headRef").text.trim().take(takeFromHash)
+if (file("$rootDir/.git/$headRef").exists()) {
+commitId = 
file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
 }
   } else {
 commitId = headRef.trim().take(takeFromHash)



kafka git commit: KAFKA-4717: Use absolute paths to files in root directory so all jars include LICENSE and NOTICE files

2017-01-31 Thread ewencp
Repository: kafka
Updated Branches:
  refs/heads/0.10.1 6ba7470ed -> 850ceb74c


KAFKA-4717: Use absolute paths to files in root directory so all jars include 
LICENSE and NOTICE files

Author: Ewen Cheslack-Postava 

Reviewers: Ismael Juma 

Closes #2473 from ewencp/kafka-4717-connect-license-and-notice-files

(cherry picked from commit 82744414d50d28a7bf7b0084118b68a4411c5781)
Signed-off-by: Ewen Cheslack-Postava 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/850ceb74
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/850ceb74
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/850ceb74

Branch: refs/heads/0.10.1
Commit: 850ceb74ca8bafc2c22735612feb67dc9c46fc36
Parents: 6ba7470
Author: Ewen Cheslack-Postava 
Authored: Tue Jan 31 16:06:10 2017 -0800
Committer: Ewen Cheslack-Postava 
Committed: Tue Jan 31 16:07:20 2017 -0800

--
 build.gradle | 42 +-
 1 file changed, 21 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/850ceb74/build.gradle
--
diff --git a/build.gradle b/build.gradle
index fded48e..d221d96 100644
--- a/build.gradle
+++ b/build.gradle
@@ -175,21 +175,21 @@ subprojects {
   }
 
   jar {
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
   }
 
   task srcJar(type: Jar) {
 classifier = 'sources'
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from sourceSets.main.allSource
   }
 
   task javadocJar(type: Jar, dependsOn: javadoc) {
 classifier 'javadoc'
-from '../LICENSE'
-from '../NOTICE'
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from javadoc.destinationDir
   }
 
@@ -211,15 +211,15 @@ subprojects {
   if(!sourceSets.test.allSource.isEmpty()) {
 task testJar(type: Jar) {
   classifier = 'test'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from sourceSets.test.output
 }
 
 task testSrcJar(type: Jar, dependsOn: testJar) {
   classifier = 'test-sources'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from sourceSets.test.allSource
 }
 
@@ -232,8 +232,8 @@ subprojects {
   plugins.withType(ScalaPlugin) {
 task scaladocJar(type:Jar) {
   classifier = 'scaladoc'
-  from '../LICENSE'
-  from '../NOTICE'
+  from "$rootDir/LICENSE"
+  from "$rootDir/NOTICE"
   from scaladoc.destinationDir
 }
 
@@ -509,7 +509,7 @@ project(':core') {
':streams:genStreamsConfigDocs'], type: Tar) {
 classifier = 'site-docs'
 compression = Compression.GZIP
-from project.file("../docs")
+from project.file("$rootDir/docs")
 into 'site-docs'
 duplicatesStrategy 'exclude'
   }
@@ -517,10 +517,10 @@ project(':core') {
   tasks.create(name: "releaseTarGz", dependsOn: 
configurations.archives.artifacts, type: Tar) {
 into "kafka_${versions.baseScala}-${version}"
 compression = Compression.GZIP
-from(project.file("../bin")) { into "bin/" }
-from(project.file("../config")) { into "config/" }
-from '../LICENSE'
-from '../NOTICE'
+from(project.file("$rootDir/bin")) { into "bin/" }
+from(project.file("$rootDir/config")) { into "config/" }
+from "$rootDir/LICENSE"
+from "$rootDir/NOTICE"
 from(configurations.runtime) { into("libs/") }
 from(configurations.archives.artifacts.files) { into("libs/") }
 from(project.siteDocsTar) { into("site-docs/") }
@@ -602,12 +602,12 @@ project(':clients') {
   task determineCommitId {
 ext.commitId = "unknown"
 def takeFromHash = 16
-if (file("../.git/HEAD").exists()) {
-  def headRef = file("../.git/HEAD").text
+if (file("$rootDir/.git/HEAD").exists()) {
+  def headRef = file("$rootDir/.git/HEAD").text
   if (headRef.contains('ref: ')) {
 headRef = headRef.replaceAll('ref: ', '').trim()
-if (file("../.git/$headRef").exists()) {
-commitId = file("../.git/$headRef").text.trim().take(takeFromHash)
+if (file("$rootDir/.git/$headRef").exists()) {
+commitId = 
file("$rootDir/.git/$headRef").text.trim().take(takeFromHash)
 }
   } else {
 commitId = headRef.trim().take(takeFromHash)



kafka-site git commit: Add Ewen's GPG key.

2017-01-31 Thread ewencp
Repository: kafka-site
Updated Branches:
  refs/heads/asf-site 60a70f379 -> 9a36603f4


Add Ewen's GPG key.


Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/9a36603f
Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/9a36603f
Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/9a36603f

Branch: refs/heads/asf-site
Commit: 9a36603f4855890929659a123ca33a00130a1165
Parents: 60a70f3
Author: Ewen Cheslack-Postava 
Authored: Tue Jan 31 18:26:47 2017 -0800
Committer: Ewen Cheslack-Postava 
Committed: Tue Jan 31 18:26:47 2017 -0800

--
 KEYS | 110 ++
 1 file changed, 110 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/kafka-site/blob/9a36603f/KEYS
--
diff --git a/KEYS b/KEYS
index b07dee0..2efca24 100644
--- a/KEYS
+++ b/KEYS
@@ -601,3 +601,113 @@ 
qoZ5VNSFiHWPXq6WyetJmAubfsJxwIWybfqJ8Dy3yivj3aCfdH4WSD2k6yqoI988
 =cvL8
 -END PGP PUBLIC KEY BLOCK-
 
+pub   4096R/BE7EFC73 2015-01-14
+uid   [ultimate] Ewen Cheslack-Postava 
+sig 3BE7EFC73 2015-01-14  Ewen Cheslack-Postava 
+sub   4096R/08EA97A4 2015-01-14
+sig  BE7EFC73 2015-01-14  Ewen Cheslack-Postava 
+
+pub   4096R/5234D94F 2014-08-22
+uid   [ultimate] Ewen Cheslack-Postava 
+sig 35234D94F 2014-08-22  Ewen Cheslack-Postava 
+sub   4096R/964F16A8 2014-08-22
+sig  5234D94F 2014-08-22  Ewen Cheslack-Postava 
+
+-BEGIN PGP PUBLIC KEY BLOCK-
+Version: GnuPG v2
+
+mQINBFS2qt4BEACksfgjdHfD2io0WZsc8jXHzx6oXE6lWnARo4hLgf1q32adgeRz
+ofGV5AdiP1N5Pwus0o8PEa2UN9/rOIXavw4W0xMlWRiZlavYw6wR7zOGuFINRZJm
+LFvPyIuTtAwdrsuUdOqAcw4hrz0j/V7xg7l79CalVpzO4JKnBYRtNSbg0FeVpk+0
+AEt6e3nlA/5WZLRnNwJTkSpwVkgRgmIjJh889xCtXtxRkjNoTq0H+ks12xdQyruL
+nFRzQmrBCsIS9C+VCmVu8r/OBVVtwulaIYCcIxu213a23XmTmlfmHQXnvwdMPBT3
+me/2ubm6Xqon92Nc08AhxzzWp/rh9SceNWBkFzMdYnXpCIwWGRcvzJ4M+WhL5ho2
+SjaPoKj1LqFhzpeNA5F4SRPOfqvkEkGIl3L3qxXlrax2yOuJMv9sb6op01JIrQY6
+c0oQCzX5vxRdDimZfHnrcZnlbn7Wzs3N8hNwPn0+0ZGLTn//5/esNWtUkf/5cK8R
+afqe0bntiNFLxvR1ysMYJf02FXRJ7rVaS02KZISrc/64bpE5GG5v6iklHrx3SJ4B
+HykbITfN9n0uO2B4iivb86OnV/X5bjv52aJ3llwmsQtTyMrr+JOeccjYv+bw6SK+
+xi/t8BWlwfxa7+//5XrTgNaINE96R9EtDlMMnEU+zk0EFFQxXmTx2384DwARAQAB
+tClFd2VuIENoZXNsYWNrLVBvc3RhdmEgPGV3ZW5AY29uZmx1ZW50LmlvPokCOAQT
+AQIAIgUCVLaq3gIbAwYLCQgHAwIGFQgCCQoLBBYCAwECHgECF4AACgkQk07sfb5+
+/HNJIg/9GW2s+0EpKyIcvyp46Hw4y0ruOKz++JZs/vbHGWCbYQbC80gKDHxCQxvH
+FgK+s4xFe9hsbD/36FBKE9Jbit581F983/KPgoD0bvX2yum6X6T/TaGFSfPkO6hw
+qCbLSswLo5QJV527yrvbKe2dpL7v6E6ZOLZiP8vYJMIS30UyV40gPJp7q6yyT8gH
+xEKW5aGDuCHCzRlRB+jNFUzsB6Q5MeOTVHfTvjPhHLbFhGBmU7buqxtT7EREAze5
+8I8JUFJkHtdf32a463a+2qjll6AEkWt2iBVuRilkgOnxuX/xNdb7H/6iFCT6UNCb
+pMrM3RLzh0iklT5BNeZ5mMXY8FnOqBy/fFGvWedb6T9Im4ERgWrVq14X8b+uUaP9
+URRvBczXvTfSb4Lajt5BvOvgleOF7iQhzFO1byzo2HD5CArv0HJEIP7GYNDnp14g
+GjQehaqmHP7/RBRDw/Hg0gy828KmfwD3K6VSWpwHOPaas84O6waxRsCKW6Ng/PpM
+FWF5NfPm4sVsuK/QiCpc/hpDzpDVxLL9Z7cOG6APZKFv+g9isUTtP2sn7tDoas0Y
+BvgpPz9gpASF2Y3/3wgyitzY7YO1w36jwgSqkTud8Lknc6tGOStGqclqmt4Ep/OB
+43i7nbYTo12Udn37WotuDstOm8NVZWxsjXVBb7QW8Rn/fJnolZq5Ag0EVLaq3gEQ
+AKN94fP55s47wo31KQhRgQH7AsLQcMh2bWcJDchIvMsF6oB582T7uHqU3AYFvJNP
+FSkPc0k4XetyZTBTtbnw0bRM1UQECDvQnB4M+HpqlAM3LJce3oPzyVYZ0vxpQRcc
+Y/bCyEc9j3U2BxtbHfZgCTv+keSNsE6i5dpZ92KRw/PQAyth6fDDM40DR8SiXJR2
+tIaTmXoa0JOoECPwDLQqccXmpL+5Vl4xcjrPBZ9BIiA0WBm2dvSk4Nt+8xz8FjNh
+M+slsTUiI20yfKEXN9mM9pPsS7gnngZR96QJBWOBersf3L/aO1z1ywZORyYFED86
+U+I67IIHHHYVC+CsRTOLHEeSNGnYKAmuY9aQnnFkE5fkGQ46mx2eE05Od9xiciy7
+U+XKJ0Pz/yBaiSoLg5TMHo+33giHCh/ESrc+aiohjXKA6Z6cqkmQIg19sisNmZqJ
+0IBNZrvNJHCxolN8VSvz1lfAyEkbYQ6L4JGdnp5Gu0DWaaJcQ2BoE/i2uD0xjNCv
+o+GuEF66YI2Gm0d1oePsy+jkgrw0sZetaCccX08dJOcNklKz6nQYjFT2C43zSlzP
+jkDEA9sgEabDN0SyYAnbDR+ZW0g+/B/VrWS+j+apNQAckYWAIQCzyMSbkrYJRAdO
+vEXKo+rQxm/LyBIq5pMwqy1nlx87LP9h/AgtGJN6Qc0PABEBAAGJAh8EGAECAAkF
+AlS2qt4CGwwACgkQk07sfb5+/HOqHw/+MhfNz5I8n7xmBvVln7sujPkiA6heWbxe
+Ga+zZnqRTFFsULaMgz4dIdXwXI2Sgym3WKangiJZr3OEpLPlXSM/FS+GAruRH+/y
+2CPfkqxd1jG13Q1/BWBHbtxJizWuZ+UtK+/wqWc099/4oTnkckchtZBJQ5gdGoio
+swqZdj9rguXROAtW2Fmc/5K+vxgGTuE8cu4TxjA0ZEWdMaJYI6dlLg9LjPpGzs4L
+jDn2dItasnsZ6irhXISYhV9WJbus2H9kYMjZxdJhY6HzkWVaUVrdtiGlWjf6fQ9N
+e0X6DN9kd3YMNhiJ8JExxrB//2JPr/3B5nc/Z0zKmwn1znUz04WWKQ1QM9Irszrr
+5fWAFy0LgsiT0922hH0f9puss5+irQUSIXDaW2iaCdNszZiTZxNwJDYu8m0pj7eP
+KgkKutptC/1on/mE9IeOkQ5acj2XfgKvljMP9VBX5PrRoRjJ/PMxinZPbzAHZy20
+7TYMbJH5dsqFIeh66jycn2IDKElvk7A9Wk98XbaxEFq8KhxblG+glV+kz2NWm3x6
+wJ/WFx5uXvKXuXrir92HGgzC6hy+v7m1DPCBlqM3+qrXTcyhbqeC52pjj4AAFnaF
+G9VPAuAnKuZSCkxGuiEgxxjDdikTkYmxbqpel+m0T6DLLgNX58M2yURAIaDajLW/
+gozBq5tLHNiZAg0EU/a4MQEQAMohxV5xUHkknnH4h3xr6tOczInCW3xObdUlsZh9
+mLhiU4+0kzQtNbjKK2qgNgmQzL1p+VYuQ2XQJmhxynXAcc9sFag7kOnzrVvC+HGb
+gZRr79YrtGqEFwU/a3YommEVAFdVvBe0HM+24pbBMoiHV1OaabnSmAYX2Fmpuh+t
+xo7FvPZL6/X

kafka git commit: KAFKA-4677: Avoid unnecessary task movement across threads during rebalance

2017-01-31 Thread guozhang
Repository: kafka
Updated Branches:
  refs/heads/trunk 82744414d -> 0b48ea1c8


KAFKA-4677: Avoid unnecessary task movement across threads during rebalance

Makes task assignment more sticky by preferring to assign tasks to clients that 
had previously had the task as active task. If there are no clients with the 
task previously active, then search for a standby. Finally falling back to the 
least loaded client.

Author: Damian Guy 

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2429 from dguy/kafka-4677


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0b48ea1c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0b48ea1c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0b48ea1c

Branch: refs/heads/trunk
Commit: 0b48ea1c81f22465cf32a19c012e0fb3c849afcc
Parents: 8274441
Author: Damian Guy 
Authored: Tue Jan 31 20:16:47 2017 -0800
Committer: Guozhang Wang 
Committed: Tue Jan 31 20:16:47 2017 -0800

--
 .../internals/StreamPartitionAssignor.java  |  34 +-
 .../processor/internals/StreamThread.java   |  12 +-
 .../internals/assignment/ClientState.java   | 116 -
 .../assignment/StickyTaskAssignor.java  | 283 ++
 .../internals/assignment/TaskAssignor.java  | 208 +---
 .../kstream/internals/KTableAggregateTest.java  |   5 +
 .../internals/StreamPartitionAssignorTest.java  |  20 +-
 .../processor/internals/StreamThreadTest.java   |   4 +-
 .../internals/assignment/ClientStateTest.java   | 151 ++
 .../assignment/StickyTaskAssignorTest.java  | 515 +++
 .../internals/assignment/TaskAssignorTest.java  | 312 ---
 11 files changed, 1085 insertions(+), 575 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/0b48ea1c/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
--
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 1ad6dbc..e17d96b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -33,8 +33,8 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.ClientState;
+import 
org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
 import 
org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
-import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
 import org.apache.kafka.streams.state.HostInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -105,13 +105,10 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
 }
 
 void addConsumer(final String consumerMemberId, final SubscriptionInfo 
info) {
-
 consumers.add(consumerMemberId);
-
-state.prevActiveTasks.addAll(info.prevTasks);
-state.prevAssignedTasks.addAll(info.prevTasks);
-state.prevAssignedTasks.addAll(info.standbyTasks);
-state.capacity = state.capacity + 1d;
+state.addPreviousActiveTasks(info.prevTasks);
+state.addPreviousStandbyTasks(info.standbyTasks);
+state.incrementCapacity();
 }
 
 @Override
@@ -228,10 +225,10 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
 // 2. Task ids of previously running tasks
 // 3. Task ids of valid local states on the client's state directory.
 
-Set prevTasks = streamThread.prevTasks();
+final Set previousActiveTasks = streamThread.prevActiveTasks();
 Set standbyTasks = streamThread.cachedTasks();
-standbyTasks.removeAll(prevTasks);
-SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, 
prevTasks, standbyTasks, this.userEndPoint);
+standbyTasks.removeAll(previousActiveTasks);
+SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, 
previousActiveTasks, standbyTasks, this.userEndPoint);
 
 if (streamThread.builder.sourceTopicPattern() != null) {
 SubscriptionUpdates subscriptionUpdates = new 
SubscriptionUpdates();
@@ -461,7 +458,8 @@ public class StreamPartitionAssignor implements 
PartitionAssignor, Configurable
 log.debug("stream-thread [{}] Assigning tasks {} to clients {} with 
number of replicas {