Repository: kafka
Updated Branches:
  refs/heads/0.10.2 61024c9d2 -> a50635219


MINOR: Logging improvements in consumer internals

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Manikumar reddy O <manikumar.re...@gmail.com>, Ewen Cheslack-Postava 
<e...@confluent.io>, Ismael Juma <ism...@juma.me.uk>

Closes #2469 from hachikuji/improve-consumer-logging

(cherry picked from commit 5afe959647fcad9d01365427f4b455e1586b1fd5)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5063521
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5063521
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5063521

Branch: refs/heads/0.10.2
Commit: a5063521943f8d0f6940b18d4f0d57045aa395ae
Parents: 61024c9
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jan 31 12:27:00 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue Jan 31 12:28:36 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java |  9 +++++
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 ++--
 .../consumer/internals/AbstractCoordinator.java | 33 ++++++++++------
 .../consumer/internals/ConsumerCoordinator.java | 41 ++++++++++++--------
 .../internals/ConsumerNetworkClient.java        |  9 +++--
 .../clients/consumer/internals/Fetcher.java     |  5 ++-
 .../apache/kafka/common/utils/KafkaThread.java  |  9 +++++
 .../clients/consumer/KafkaConsumerTest.java     |  2 -
 .../internals/ConsumerCoordinatorTest.java      | 30 +++++++-------
 9 files changed, 90 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 0eb7670..3a75288 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient {
             int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.idString());
             if (currInflight == 0 && 
this.connectionStates.isReady(node.idString())) {
                 // if we find an established connection with no in-flight 
requests we can stop right away
+                log.trace("Found least loaded node {} connected with no 
in-flight requests", node);
                 return node;
             } else if (!this.connectionStates.isBlackedOut(node.idString(), 
now) && currInflight < inflight) {
                 // otherwise if this is the best we have found so far, record 
that
                 inflight = currInflight;
                 found = node;
+            } else if (log.isTraceEnabled()) {
+                log.trace("Removing node {} from least loaded node selection: 
is-blacked-out: {}, in-flight-requests: {}",
+                        node, 
this.connectionStates.isBlackedOut(node.idString(), now), currInflight);
             }
         }
 
+        if (found != null)
+            log.trace("Found least loaded node {}", found);
+        else
+            log.trace("Least loaded node selection failed to find an available 
node");
+
         return found;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6064c39..ed3d607 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -78,8 +78,8 @@ import java.util.regex.Pattern;
  * <h3>Cross-Version Compatibility</h3>
  * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
  * certain features. For example, 0.10.0 brokers do not support 
offsetsForTimes, because this feature was added
- * in version 0.10.1. You will receive an UnsupportedVersionException when 
invoking an API that is not available on the
- * running broker version.
+ * in version 0.10.1. You will receive an {@link 
org.apache.kafka.common.errors.UnsupportedVersionException}
+ * when invoking an API that is not available on the running broker version.
  * <p>
  *
  * <h3>Offsets and Consumer Position</h3>
@@ -685,7 +685,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metricGrpPrefix,
                     this.time,
                     retryBackoffMs,
-                    new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                     
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                     this.interceptors,
@@ -1443,7 +1442,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *         than or equal to the target timestamp. {@code null} will be 
returned for the partition if there is no
      *         such message.
      * @throws IllegalArgumentException if the target timestamp is negative.
-     * @throws UnsupportedVersionException if the broker does not support 
looking up the offsets by timestamp.
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if 
the broker does not support looking up
+     *         the offsets by timestamp.
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 7d77c0b..6eea045 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -211,6 +212,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                     if (remainingMs <= 0)
                         break;
 
+                    log.debug("Coordinator discovery failed for group {}, 
refreshing metadata", groupId);
                     client.awaitMetadataUpdate(remainingMs);
                 } else
                     throw future.exception();
@@ -236,6 +238,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
             if (node == null) {
                 // TODO: If there are no brokers left, perhaps we should use 
the bootstrap set
                 // from configuration?
+                log.debug("No broker available to send GroupCoordinator 
request for group {}", groupId);
                 return RequestFuture.noBrokersAvailable();
             } else
                 findCoordinatorFuture = sendGroupCoordinatorRequest(node);
@@ -419,7 +422,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         public void handle(JoinGroupResponse joinResponse, 
RequestFuture<ByteBuffer> future) {
             Errors error = Errors.forCode(joinResponse.errorCode());
             if (error == Errors.NONE) {
-                log.debug("Received successful join group response for group 
{}: {}", groupId, joinResponse);
+                log.debug("Received successful JoinGroup response for group 
{}: {}", groupId, joinResponse);
                 sensors.joinLatency.record(response.requestLatencyMs());
 
                 synchronized (AbstractCoordinator.this) {
@@ -542,7 +545,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
      */
     private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        log.debug("Sending coordinator request for group {} to broker {}", 
groupId, node);
+        log.debug("Sending GroupCoordinator request for group {} to broker 
{}", groupId, node);
         GroupCoordinatorRequest.Builder requestBuilder =
                 new GroupCoordinatorRequest.Builder(this.groupId);
         return client.send(node, requestBuilder)
@@ -553,7 +556,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
         @Override
         public void onSuccess(ClientResponse resp, RequestFuture<Void> future) 
{
-            log.debug("Received group coordinator response {}", resp);
+            log.debug("Received GroupCoordinator response {} for group {}", 
resp, groupId);
 
             GroupCoordinatorResponse groupCoordinatorResponse = 
(GroupCoordinatorResponse) resp.responseBody();
             // use MAX_VALUE - node.id as the coordinator id to mimic separate 
connections
@@ -671,6 +674,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         if (!coordinatorUnknown() && state != MemberState.UNJOINED && 
generation != Generation.NO_GENERATION) {
             // this is a minimal effort attempt to leave the group. we do not
             // attempt any resending if the request fails or times out.
+            log.debug("Sending LeaveGroup request to coordinator {} for group 
{}", coordinator, groupId);
             LeaveGroupRequest.Builder request =
                     new LeaveGroupRequest.Builder(groupId, 
generation.memberId);
             client.send(coordinator, request)
@@ -697,6 +701,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
     // visible for testing
     synchronized RequestFuture<Void> sendHeartbeatRequest() {
+        log.debug("Sending Heartbeat request for group {} to coordinator {}", 
groupId, coordinator);
         HeartbeatRequest.Builder requestBuilder =
                 new HeartbeatRequest.Builder(this.groupId, 
this.generation.generationId, this.generation.memberId);
         return client.send(coordinator, requestBuilder)
@@ -709,24 +714,24 @@ public abstract class AbstractCoordinator implements 
Closeable {
             sensors.heartbeatLatency.record(response.requestLatencyMs());
             Errors error = Errors.forCode(heartbeatResponse.errorCode());
             if (error == Errors.NONE) {
-                log.debug("Received successful heartbeat response for group 
{}", groupId);
+                log.debug("Received successful Heartbeat response for group 
{}", groupId);
                 future.complete(null);
             } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
-                log.debug("Attempt to heart beat failed for group {} since 
coordinator {} is either not started or not valid.",
+                log.debug("Attempt to heartbeat failed for group {} since 
coordinator {} is either not started or not valid.",
                         groupId, coordinator());
                 coordinatorDead();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.debug("Attempt to heart beat failed for group {} since it 
is rebalancing.", groupId);
+                log.debug("Attempt to heartbeat failed for group {} since it 
is rebalancing.", groupId);
                 requestRejoin();
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION) {
-                log.debug("Attempt to heart beat failed for group {} since 
generation id is not legal.", groupId);
+                log.debug("Attempt to heartbeat failed for group {} since 
generation id is not legal.", groupId);
                 resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.debug("Attempt to heart beat failed for group {} since 
member id is not valid.", groupId);
+                log.debug("Attempt to heartbeat failed for group {} since 
member id is not valid.", groupId);
                 resetGeneration();
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
@@ -818,18 +823,18 @@ public abstract class AbstractCoordinator implements 
Closeable {
         }
     }
 
-    private class HeartbeatThread extends Thread {
+    private class HeartbeatThread extends KafkaThread {
         private boolean enabled = false;
         private boolean closed = false;
         private AtomicReference<RuntimeException> failed = new 
AtomicReference<>(null);
 
-        HeartbeatThread() {
-            super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? 
"" : " | " + groupId));
-            setDaemon(true);
+        private HeartbeatThread() {
+            super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? 
"" : " | " + groupId), true);
         }
 
         public void enable() {
             synchronized (AbstractCoordinator.this) {
+                log.trace("Enabling heartbeat thread for group {}", groupId);
                 this.enabled = true;
                 heartbeat.resetTimeouts(time.milliseconds());
                 AbstractCoordinator.this.notify();
@@ -838,6 +843,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
         public void disable() {
             synchronized (AbstractCoordinator.this) {
+                log.trace("Disabling heartbeat thread for group {}", groupId);
                 this.enabled = false;
             }
         }
@@ -860,6 +866,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         @Override
         public void run() {
             try {
+                log.debug("Heartbeat thread for group {} started", groupId);
                 while (true) {
                     synchronized (AbstractCoordinator.this) {
                         if (closed)
@@ -936,6 +943,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
             } catch (RuntimeException e) {
                 log.error("Heartbeat thread for group {} failed due to 
unexpected error" , groupId, e);
                 this.failed.set(e);
+            } finally {
+                log.debug("Heartbeat thread for group {} has closed", groupId);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 03b767a..8669527 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -100,7 +100,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                String metricGrpPrefix,
                                Time time,
                                long retryBackoffMs,
-                               OffsetCommitCallback 
defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
                                int autoCommitIntervalMs,
                                ConsumerInterceptors<?, ?> interceptors,
@@ -117,7 +116,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         this.metadata = metadata;
         this.metadataSnapshot = new MetadataSnapshot(subscriptions, 
metadata.fetch());
         this.subscriptions = subscriptions;
-        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
+        this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback();
         this.autoCommitEnabled = autoCommitEnabled;
         this.autoCommitIntervalMs = autoCommitIntervalMs;
         this.assignors = assignors;
@@ -355,13 +354,15 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (!assignedTopics.containsAll(allSubscribedTopics)) {
             Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics);
             notAssignedTopics.removeAll(assignedTopics);
-            log.warn("The following subscribed topics are not assigned to any 
members in the group {} : {} ", groupId, notAssignedTopics);
+            log.warn("The following subscribed topics are not assigned to any 
members in the group {} : {} ", groupId,
+                    notAssignedTopics);
         }
 
         if (!allSubscribedTopics.containsAll(assignedTopics)) {
             Set<String> newlyAddedTopics = new HashSet<>(assignedTopics);
             newlyAddedTopics.removeAll(allSubscribedTopics);
-            log.info("The following not-subscribed topics are assigned to 
group {}, and their metadata will be fetched from the brokers : {}", groupId, 
newlyAddedTopics);
+            log.info("The following not-subscribed topics are assigned to 
group {}, and their metadata will be " +
+                    "fetched from the brokers : {}", groupId, 
newlyAddedTopics);
 
             allSubscribedTopics.addAll(assignedTopics);
             this.subscriptions.groupSubscribe(allSubscribedTopics);
@@ -487,7 +488,6 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         }
     }
 
-
     public void commitOffsetsAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         invokeCompletedOffsetCommitCallbacks();
 
@@ -612,15 +612,19 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     }
 
     private void doAutoCommitOffsetsAsync() {
-        commitOffsetsAsync(subscriptions.allConsumed(), new 
OffsetCommitCallback() {
+        Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
+        log.debug("Sending asynchronous auto-commit of offsets {} for group 
{}", allConsumedOffsets, groupId);
+
+        commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
             @Override
             public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
offsets, Exception exception) {
                 if (exception != null) {
-                    log.warn("Auto offset commit failed for group {}: {}", 
groupId, exception.getMessage());
+                    log.warn("Auto-commit of offsets {} failed for group {}: 
{}", offsets, groupId,
+                            exception.getMessage());
                     if (exception instanceof RetriableException)
                         nextAutoCommitDeadline = Math.min(time.milliseconds() 
+ retryBackoffMs, nextAutoCommitDeadline);
                 } else {
-                    log.debug("Completed autocommit of offsets {} for group 
{}", offsets, groupId);
+                    log.debug("Completed auto-commit of offsets {} for group 
{}", offsets, groupId);
                 }
             }
         });
@@ -628,25 +632,30 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
     private void maybeAutoCommitOffsetsSync(long timeoutMs) {
         if (autoCommitEnabled) {
+            Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = 
subscriptions.allConsumed();
             try {
-                if (!commitOffsetsSync(subscriptions.allConsumed(), timeoutMs))
-                    log.debug("Automatic commit of offsets {} for group {} 
timed out before completion", subscriptions.allConsumed(), groupId);
+                log.debug("Sending synchronous auto-commit of offsets {} for 
group {}", allConsumedOffsets, groupId);
+                if (!commitOffsetsSync(allConsumedOffsets, timeoutMs))
+                    log.debug("Auto-commit of offsets {} for group {} timed 
out before completion",
+                            allConsumedOffsets, groupId);
             } catch (WakeupException | InterruptException e) {
-                log.debug("Automatic commit of offsets {} for group {} was 
interrupted before completion", subscriptions.allConsumed(), groupId);
+                log.debug("Auto-commit of offsets {} for group {} was 
interrupted before completion",
+                        allConsumedOffsets, groupId);
                 // rethrow wakeups since they are triggered by the user
                 throw e;
             } catch (Exception e) {
                 // consistent with async auto-commit failures, we do not 
propagate the exception
-                log.warn("Auto offset commit failed for group {}: {}", 
groupId, e.getMessage());
+                log.warn("Auto-commit of offsets {} failed for group {}: {}", 
allConsumedOffsets, groupId,
+                        e.getMessage());
             }
         }
     }
 
-    public static class DefaultOffsetCommitCallback implements 
OffsetCommitCallback {
+    private class DefaultOffsetCommitCallback implements OffsetCommitCallback {
         @Override
         public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, 
Exception exception) {
             if (exception != null)
-                log.error("Offset commit failed.", exception);
+                log.error("Offset commit with offsets {} failed for group {}", 
offsets, groupId, exception);
         }
     }
 
@@ -694,7 +703,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                         setMemberId(generation.memberId).
                         
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
 
-        log.trace("Sending offset-commit request with {} to coordinator {} for 
group {}", offsets, coordinator, groupId);
+        log.trace("Sending OffsetCommit request with {} to coordinator {} for 
group {}", offsets, coordinator, groupId);
 
         return client.send(coordinator, builder)
                 .compose(new OffsetCommitResponseHandler(offsets));
@@ -704,7 +713,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
-        public OffsetCommitResponseHandler(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+        private OffsetCommitResponseHandler(Map<TopicPartition, 
OffsetAndMetadata> offsets) {
             this.offsets = offsets;
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index ea92ab7..e5c5cf6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -90,8 +90,7 @@ public class ConsumerNetworkClient implements Closeable {
      * @param requestBuilder A builder for the request payload
      * @return A future which indicates the result of the send.
      */
-    public RequestFuture<ClientResponse> send(Node node,
-                                              AbstractRequest.Builder<?> 
requestBuilder) {
+    public RequestFuture<ClientResponse> send(Node node, 
AbstractRequest.Builder<?> requestBuilder) {
         long now = time.milliseconds();
         RequestFutureCompletionHandler completionHandler = new 
RequestFutureCompletionHandler();
         ClientRequest clientRequest = client.newClientRequest(node.idString(), 
requestBuilder, now, true,
@@ -157,6 +156,7 @@ public class ConsumerNetworkClient implements Closeable {
     public void wakeup() {
         // wakeup should be safe without holding the client lock since it 
simply delegates to
         // Selector's wakeup, which is threadsafe
+        log.trace("Received user wakeup");
         this.wakeup.set(true);
         this.client.wakeup();
     }
@@ -406,6 +406,7 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void maybeTriggerWakeup() {
         if (wakeupDisabledCount == 0 && wakeup.get()) {
+            log.trace("Raising wakeup exception in response to user wakeup");
             wakeup.set(false);
             throw new WakeupException();
         }
@@ -446,7 +447,7 @@ public class ConsumerNetworkClient implements Closeable {
 
     /**
      * Find whether a previous connection has failed. Note that the failure 
state will persist until either
-     * {@link #tryConnect(Node)} or {@link #send(Node, ApiKeys, 
AbstractRequest)} has been called.
+     * {@link #tryConnect(Node)} or {@link #send(Node, 
AbstractRequest.Builder)} has been called.
      * @param node Node to connect to if possible
      */
     public boolean connectionFailed(Node node) {
@@ -457,7 +458,7 @@ public class ConsumerNetworkClient implements Closeable {
 
     /**
      * Initiate a connection if currently possible. This is only really useful 
for resetting the failed
-     * status of a socket. If there is an actual request to send, then {@link 
#send(Node, ApiKeys, AbstractRequest)}
+     * status of a socket. If there is an actual request to send, then {@link 
#send(Node, AbstractRequest.Builder)}
      * should be used.
      * @param node The node to connect to
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index d4ecfc6..6a13d46 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -177,6 +177,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
             final FetchRequest.Builder request = fetchEntry.getValue();
             final Node fetchTarget = fetchEntry.getKey();
 
+            log.debug("Sending fetch for partitions {} to broker {}", 
request.fetchData().keySet(), fetchTarget);
             client.send(fetchTarget, request)
                     .addListener(new RequestFutureListener<ClientResponse>() {
                         @Override
@@ -208,7 +209,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
 
                         @Override
                         public void onFailure(RuntimeException e) {
-                            log.debug("Fetch request to {} failed", 
fetchTarget, e);
+                            log.debug("Fetch request to {} for partitions {} 
failed", fetchTarget, request.fetchData().keySet(), e);
                         }
                     });
         }
@@ -722,7 +723,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
 
                 long position = this.subscriptions.position(partition);
                 fetch.put(partition, new FetchRequest.PartitionData(position, 
this.fetchSize));
-                log.trace("Added fetch request for partition {} at offset {}", 
partition, position);
+                log.trace("Added fetch request for partition {} at offset {} 
to node {}", partition, position, node);
             } else {
                 log.trace("Skipping fetch for partition {} because there is an 
in-flight request to {}", partition, node);
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java 
b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
index 57247c8..faca685 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java
@@ -22,8 +22,17 @@ public class KafkaThread extends Thread {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    public KafkaThread(final String name, boolean daemon) {
+        super(name);
+        configureThread(name, daemon);
+    }
+
     public KafkaThread(final String name, Runnable runnable, boolean daemon) {
         super(runnable, name);
+        configureThread(name, daemon);
+    }
+
+    private void configureThread(final String name, boolean daemon) {
         setDaemon(daemon);
         setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
             public void uncaughtException(Thread t, Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8346e93..4aaa172 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1486,7 +1486,6 @@ public class KafkaConsumerTest {
         Deserializer<String> valueDeserializer = new StringDeserializer();
 
         OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST;
-        OffsetCommitCallback defaultCommitCallback = new 
ConsumerCoordinator.DefaultOffsetCommitCallback();
         List<PartitionAssignor> assignors = Arrays.asList(assignor);
         ConsumerInterceptors<String, String> interceptors = null;
 
@@ -1506,7 +1505,6 @@ public class KafkaConsumerTest {
                 metricGroupPrefix,
                 time,
                 retryBackoffMs,
-                defaultCommitCallback,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 interceptors,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index e13d49f..e11bf30 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -112,10 +112,9 @@ public class ConsumerCoordinatorTest {
     private Metrics metrics;
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
-    private MockCommitCallback defaultOffsetCommitCallback;
+    private MockCommitCallback mockOffsetCommitCallback;
     private ConsumerCoordinator coordinator;
 
-
     @Before
     public void setup() {
         this.time = new MockTime();
@@ -126,7 +125,7 @@ public class ConsumerCoordinatorTest {
         this.consumerClient = new ConsumerNetworkClient(client, metadata, 
time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
-        this.defaultOffsetCommitCallback = new MockCommitCallback();
+        this.mockOffsetCommitCallback = new MockCommitCallback();
         this.partitionAssignor.clear();
 
         client.setNode(node);
@@ -1010,14 +1009,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncWithDefaultCallback() {
-        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, 
Errors.NONE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), null);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
-        assertEquals(invokedBeforeTest + 1, 
defaultOffsetCommitCallback.invoked);
-        assertNull(defaultOffsetCommitCallback.exception);
+        assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
+        assertNull(mockOffsetCommitCallback.exception);
     }
 
     @Test
@@ -1059,14 +1058,14 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitOffsetAsyncFailedWithDefaultCallback() {
-        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
         coordinator.ensureCoordinatorReady();
         
client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, 
Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code())));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), null);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
-        assertEquals(invokedBeforeTest + 1, 
defaultOffsetCommitCallback.invoked);
-        assertTrue(defaultOffsetCommitCallback.exception instanceof 
RetriableCommitFailedException);
+        assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
+        assertTrue(mockOffsetCommitCallback.exception instanceof 
RetriableCommitFailedException);
     }
 
     @Test
@@ -1220,12 +1219,12 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testCommitAsyncNegativeOffset() {
-        int invokedBeforeTest = defaultOffsetCommitCallback.invoked;
+        int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE.code()));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(-1L)), null);
+        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new 
OffsetAndMetadata(-1L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
-        assertEquals(invokedBeforeTest + 1, 
defaultOffsetCommitCallback.invoked);
-        assertTrue(defaultOffsetCommitCallback.exception instanceof 
IllegalArgumentException);
+        assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
+        assertTrue(mockOffsetCommitCallback.exception instanceof 
IllegalArgumentException);
     }
 
     @Test
@@ -1531,7 +1530,6 @@ public class ConsumerCoordinatorTest {
                 "consumer" + groupId,
                 time,
                 retryBackoffMs,
-                defaultOffsetCommitCallback,
                 autoCommitEnabled,
                 autoCommitIntervalMs,
                 null,

Reply via email to