kafka git commit: MINOR: Logging improvements in consumer internals

2017-01-31 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 61024c9d2 -> a50635219


MINOR: Logging improvements in consumer internals

Author: Jason Gustafson 

Reviewers: Manikumar reddy O , Ewen Cheslack-Postava 
, Ismael Juma 

Closes #2469 from hachikuji/improve-consumer-logging

(cherry picked from commit 5afe959647fcad9d01365427f4b455e1586b1fd5)
Signed-off-by: Jason Gustafson 


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5063521
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5063521
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5063521

Branch: refs/heads/0.10.2
Commit: a5063521943f8d0f6940b18d4f0d57045aa395ae
Parents: 61024c9
Author: Jason Gustafson 
Authored: Tue Jan 31 12:27:00 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 31 12:28:36 2017 -0800

--
 .../org/apache/kafka/clients/NetworkClient.java |  9 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 ++--
 .../consumer/internals/AbstractCoordinator.java | 33 ++--
 .../consumer/internals/ConsumerCoordinator.java | 41 
 .../internals/ConsumerNetworkClient.java|  9 +++--
 .../clients/consumer/internals/Fetcher.java |  5 ++-
 .../apache/kafka/common/utils/KafkaThread.java  |  9 +
 .../clients/consumer/KafkaConsumerTest.java |  2 -
 .../internals/ConsumerCoordinatorTest.java  | 30 +++---
 9 files changed, 90 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
--
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0eb7670..3a75288 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient {
 int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.idString());
 if (currInflight == 0 && 
this.connectionStates.isReady(node.idString())) {
 // if we find an established connection with no in-flight 
requests we can stop right away
+log.trace("Found least loaded node {} connected with no 
in-flight requests", node);
 return node;
 } else if (!this.connectionStates.isBlackedOut(node.idString(), 
now) && currInflight < inflight) {
 // otherwise if this is the best we have found so far, record 
that
 inflight = currInflight;
 found = node;
+} else if (log.isTraceEnabled()) {
+log.trace("Removing node {} from least loaded node selection: 
is-blacked-out: {}, in-flight-requests: {}",
+node, 
this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
 }
 }
 
+if (found != null)
+log.trace("Found least loaded node {}", found);
+else
+log.trace("Least loaded node selection failed to find an available 
node");
+
 return found;
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6064c39..ed3d607 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -78,8 +78,8 @@ import java.util.regex.Pattern;
  * Cross-Version Compatibility
  * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
  * certain features. For example, 0.10.0 brokers do not support 
offsetsForTimes, because this feature was added
- * in version 0.10.1. You will receive an UnsupportedVersionException when 
invoking an API that is not available on the
- * running broker version.
+ * in version 0.10.1. You will receive an {@link 
org.apache.kafka.common.errors.UnsupportedVersionException}
+ * when invoking an API that is not available on the running broker version.
  * 
  *
  * Offsets and Consumer Position
@@ -685,7 +685,6 @@ public class KafkaConsumer implements Consumer {
 metricGrpPrefix,
  

kafka git commit: MINOR: Logging improvements in consumer internals

2017-01-31 Thread jgus
Repository: kafka
Updated Branches:
  refs/heads/trunk b948f4327 -> 5afe95964


MINOR: Logging improvements in consumer internals

Author: Jason Gustafson 

Reviewers: Manikumar reddy O , Ewen Cheslack-Postava 
, Ismael Juma 

Closes #2469 from hachikuji/improve-consumer-logging


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5afe9596
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5afe9596
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5afe9596

Branch: refs/heads/trunk
Commit: 5afe959647fcad9d01365427f4b455e1586b1fd5
Parents: b948f43
Author: Jason Gustafson 
Authored: Tue Jan 31 12:27:00 2017 -0800
Committer: Jason Gustafson 
Committed: Tue Jan 31 12:27:00 2017 -0800

--
 .../org/apache/kafka/clients/NetworkClient.java |  9 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 ++--
 .../consumer/internals/AbstractCoordinator.java | 33 ++--
 .../consumer/internals/ConsumerCoordinator.java | 41 
 .../internals/ConsumerNetworkClient.java|  9 +++--
 .../clients/consumer/internals/Fetcher.java |  5 ++-
 .../apache/kafka/common/utils/KafkaThread.java  |  9 +
 .../clients/consumer/KafkaConsumerTest.java |  2 -
 .../internals/ConsumerCoordinatorTest.java  | 30 +++---
 9 files changed, 90 insertions(+), 56 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kafka/blob/5afe9596/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
--
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0eb7670..3a75288 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient {
 int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.idString());
 if (currInflight == 0 && 
this.connectionStates.isReady(node.idString())) {
 // if we find an established connection with no in-flight 
requests we can stop right away
+log.trace("Found least loaded node {} connected with no 
in-flight requests", node);
 return node;
 } else if (!this.connectionStates.isBlackedOut(node.idString(), 
now) && currInflight < inflight) {
 // otherwise if this is the best we have found so far, record 
that
 inflight = currInflight;
 found = node;
+} else if (log.isTraceEnabled()) {
+log.trace("Removing node {} from least loaded node selection: 
is-blacked-out: {}, in-flight-requests: {}",
+node, 
this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
 }
 }
 
+if (found != null)
+log.trace("Found least loaded node {}", found);
+else
+log.trace("Least loaded node selection failed to find an available 
node");
+
 return found;
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5afe9596/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
--
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 23e7ed6..89844f6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -78,8 +78,8 @@ import java.util.regex.Pattern;
  * Cross-Version Compatibility
  * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
  * certain features. For example, 0.10.0 brokers do not support 
offsetsForTimes, because this feature was added
- * in version 0.10.1. You will receive an UnsupportedVersionException when 
invoking an API that is not available on the
- * running broker version.
+ * in version 0.10.1. You will receive an {@link 
org.apache.kafka.common.errors.UnsupportedVersionException}
+ * when invoking an API that is not available on the running broker version.
  * 
  *
  * Offsets and Consumer Position
@@ -685,7 +685,6 @@ public class KafkaConsumer implements Consumer {
 metricGrpPrefix,
 this.time,
 retryBackoffMs,
-new