kafka git commit: MINOR: Logging improvements in consumer internals
Repository: kafka Updated Branches: refs/heads/0.10.2 61024c9d2 -> a50635219 MINOR: Logging improvements in consumer internals Author: Jason GustafsonReviewers: Manikumar reddy O , Ewen Cheslack-Postava , Ismael Juma Closes #2469 from hachikuji/improve-consumer-logging (cherry picked from commit 5afe959647fcad9d01365427f4b455e1586b1fd5) Signed-off-by: Jason Gustafson Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5063521 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5063521 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5063521 Branch: refs/heads/0.10.2 Commit: a5063521943f8d0f6940b18d4f0d57045aa395ae Parents: 61024c9 Author: Jason Gustafson Authored: Tue Jan 31 12:27:00 2017 -0800 Committer: Jason Gustafson Committed: Tue Jan 31 12:28:36 2017 -0800 -- .../org/apache/kafka/clients/NetworkClient.java | 9 + .../kafka/clients/consumer/KafkaConsumer.java | 8 ++-- .../consumer/internals/AbstractCoordinator.java | 33 ++-- .../consumer/internals/ConsumerCoordinator.java | 41 .../internals/ConsumerNetworkClient.java| 9 +++-- .../clients/consumer/internals/Fetcher.java | 5 ++- .../apache/kafka/common/utils/KafkaThread.java | 9 + .../clients/consumer/KafkaConsumerTest.java | 2 - .../internals/ConsumerCoordinatorTest.java | 30 +++--- 9 files changed, 90 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 0eb7670..3a75288 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient { int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString()); if (currInflight == 0 && this.connectionStates.isReady(node.idString())) { // if we find an established connection with no in-flight requests we can stop right away +log.trace("Found least loaded node {} connected with no in-flight requests", node); return node; } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; +} else if (log.isTraceEnabled()) { +log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}", +node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight); } } +if (found != null) +log.trace("Found least loaded node {}", found); +else +log.trace("Least loaded node selection failed to find an available node"); + return found; } http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6064c39..ed3d607 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -78,8 +78,8 @@ import java.util.regex.Pattern; * Cross-Version Compatibility * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support * certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added - * in version 0.10.1. You will receive an UnsupportedVersionException when invoking an API that is not available on the - * running broker version. + * in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException} + * when invoking an API that is not available on the running broker version. * * * Offsets and Consumer Position @@ -685,7 +685,6 @@ public class KafkaConsumer implements Consumer { metricGrpPrefix,
kafka git commit: MINOR: Logging improvements in consumer internals
Repository: kafka Updated Branches: refs/heads/trunk b948f4327 -> 5afe95964 MINOR: Logging improvements in consumer internals Author: Jason GustafsonReviewers: Manikumar reddy O , Ewen Cheslack-Postava , Ismael Juma Closes #2469 from hachikuji/improve-consumer-logging Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5afe9596 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5afe9596 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5afe9596 Branch: refs/heads/trunk Commit: 5afe959647fcad9d01365427f4b455e1586b1fd5 Parents: b948f43 Author: Jason Gustafson Authored: Tue Jan 31 12:27:00 2017 -0800 Committer: Jason Gustafson Committed: Tue Jan 31 12:27:00 2017 -0800 -- .../org/apache/kafka/clients/NetworkClient.java | 9 + .../kafka/clients/consumer/KafkaConsumer.java | 8 ++-- .../consumer/internals/AbstractCoordinator.java | 33 ++-- .../consumer/internals/ConsumerCoordinator.java | 41 .../internals/ConsumerNetworkClient.java| 9 +++-- .../clients/consumer/internals/Fetcher.java | 5 ++- .../apache/kafka/common/utils/KafkaThread.java | 9 + .../clients/consumer/KafkaConsumerTest.java | 2 - .../internals/ConsumerCoordinatorTest.java | 30 +++--- 9 files changed, 90 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kafka/blob/5afe9596/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 0eb7670..3a75288 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient { int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString()); if (currInflight == 0 && this.connectionStates.isReady(node.idString())) { // if we find an established connection with no in-flight requests we can stop right away +log.trace("Found least loaded node {} connected with no in-flight requests", node); return node; } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; +} else if (log.isTraceEnabled()) { +log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}", +node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight); } } +if (found != null) +log.trace("Found least loaded node {}", found); +else +log.trace("Least loaded node selection failed to find an available node"); + return found; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5afe9596/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java -- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 23e7ed6..89844f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -78,8 +78,8 @@ import java.util.regex.Pattern; * Cross-Version Compatibility * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support * certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added - * in version 0.10.1. You will receive an UnsupportedVersionException when invoking an API that is not available on the - * running broker version. + * in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException} + * when invoking an API that is not available on the running broker version. * * * Offsets and Consumer Position @@ -685,7 +685,6 @@ public class KafkaConsumer implements Consumer { metricGrpPrefix, this.time, retryBackoffMs, -new