Repository: kafka Updated Branches: refs/heads/trunk 6eb061fa8 -> e403b3c4b
KAFKA-3318: clean up consumer logging and error messages Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma Closes #1036 from hachikuji/KAFKA-3318 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e403b3c4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e403b3c4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e403b3c4 Branch: refs/heads/trunk Commit: e403b3c4bf8ca308fe180b093da20700f4db73c5 Parents: 6eb061f Author: Jason Gustafson <[email protected]> Authored: Thu Mar 10 11:29:08 2016 -0800 Committer: Gwen Shapira <[email protected]> Committed: Thu Mar 10 11:29:08 2016 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 120 +++++++++---------- .../consumer/internals/ConsumerCoordinator.java | 81 +++++++------ .../clients/consumer/internals/Fetcher.java | 6 +- .../main/java/org/apache/kafka/common/Node.java | 2 +- .../apache/kafka/common/protocol/Errors.java | 10 ++ 5 files changed, 115 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index c6492bc..c79d8e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -174,7 +174,7 @@ public abstract class AbstractCoordinator implements Closeable { */ public void ensureCoordinatorKnown() { while (coordinatorUnknown()) { - RequestFuture<Void> future = sendGroupMetadataRequest(); + RequestFuture<Void> future = sendGroupCoordinatorRequest(); client.poll(future); if (future.failed()) { @@ -216,7 +216,7 @@ public abstract class AbstractCoordinator implements Closeable { continue; } - RequestFuture<ByteBuffer> future = performGroupJoin(); + RequestFuture<ByteBuffer> future = sendJoinGroupRequest(); client.poll(future); if (future.succeeded()) { @@ -299,12 +299,12 @@ public abstract class AbstractCoordinator implements Closeable { * elected leader by the coordinator. * @return A request future which wraps the assignment returned from the group leader */ - private RequestFuture<ByteBuffer> performGroupJoin() { + private RequestFuture<ByteBuffer> sendJoinGroupRequest() { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); // send a join group request to the coordinator - log.debug("(Re-)joining group {}", groupId); + log.info("(Re-)joining group {}", groupId); JoinGroupRequest request = new JoinGroupRequest( groupId, this.sessionTimeoutMs, @@ -312,8 +312,7 @@ public abstract class AbstractCoordinator implements Closeable { protocolType(), metadata()); - // create the request for the coordinator - log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id()); + log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator); return client.send(coordinator, ApiKeys.JOIN_GROUP, request) .compose(new JoinGroupResponseHandler()); } @@ -328,10 +327,9 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { - // process the response - short errorCode = joinResponse.errorCode(); - if (errorCode == Errors.NONE.code()) { - log.debug("Joined group: {}", joinResponse.toStruct()); + Errors error = Errors.forCode(joinResponse.errorCode()); + if (error == Errors.NONE) { + log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct()); AbstractCoordinator.this.memberId = joinResponse.memberId(); AbstractCoordinator.this.generation = joinResponse.generationId(); AbstractCoordinator.this.rejoinNeeded = false; @@ -342,37 +340,33 @@ public abstract class AbstractCoordinator implements Closeable { } else { onJoinFollower().chain(future); } - } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { - log.debug("Attempt to join group {} rejected since coordinator is loading the group.", groupId); + } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { + log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId, + coordinator); // backoff and retry - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { + future.raise(error); + } else if (error == Errors.UNKNOWN_MEMBER_ID) { // reset the member id and retry immediately AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; - log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.", - groupId); + log.debug("Attempt to join group {} failed due to unknown member id.", groupId); future.raise(Errors.UNKNOWN_MEMBER_ID); - } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry with backoff coordinatorDead(); - log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", - groupId); - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code() - || errorCode == Errors.INVALID_SESSION_TIMEOUT.code() - || errorCode == Errors.INVALID_GROUP_ID.code()) { + log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message()); + future.raise(error); + } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL + || error == Errors.INVALID_SESSION_TIMEOUT + || error == Errors.INVALID_GROUP_ID) { // log the error and re-throw the exception - Errors error = Errors.forCode(errorCode); - log.error("Attempt to join group {} failed due to: {}", - groupId, error.exception().getMessage()); + log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message()); future.raise(error); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { // unexpected error, throw the exception - future.raise(new KafkaException("Unexpected error in join group response: " - + Errors.forCode(joinResponse.errorCode()).exception().getMessage())); + future.raise(new KafkaException("Unexpected error in join group response: " + error.message())); } } } @@ -381,7 +375,7 @@ public abstract class AbstractCoordinator implements Closeable { // send follower's sync group with an empty assignment SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, Collections.<String, ByteBuffer>emptyMap()); - log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id()); + log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request); return sendSyncGroupRequest(request); } @@ -392,7 +386,7 @@ public abstract class AbstractCoordinator implements Closeable { joinResponse.members()); SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment); - log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id()); + log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request); return sendSyncGroupRequest(request); } catch (RuntimeException e) { return RequestFuture.failure(e); @@ -418,7 +412,7 @@ public abstract class AbstractCoordinator implements Closeable { RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(syncResponse.errorCode()); if (error == Errors.NONE) { - log.debug("Received successful sync group response for group {}: {}", groupId, syncResponse.toStruct()); + log.info("Successfully joined group {} with generation {}", groupId, generation); sensors.syncLatency.record(response.requestLatencyMs()); future.complete(syncResponse.memberAssignment()); } else { @@ -426,20 +420,20 @@ public abstract class AbstractCoordinator implements Closeable { if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.info("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId); + log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId); future.raise(error); } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) { - log.info("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error); + log.debug("SyncGroup for group {} failed due to {}", groupId, error); AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; future.raise(error); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { - log.info("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error); + log.debug("SyncGroup for group {} failed due to {}", groupId, error); coordinatorDead(); future.raise(error); } else { - future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.exception().getMessage())); + future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message())); } } } @@ -450,7 +444,7 @@ public abstract class AbstractCoordinator implements Closeable { * one of the brokers. The returned future should be polled to get the result of the request. * @return A request future which indicates the completion of the metadata request */ - private RequestFuture<Void> sendGroupMetadataRequest() { + private RequestFuture<Void> sendGroupCoordinatorRequest() { // initiate the group metadata request // find a node to ask about the coordinator Node node = this.client.leastLoadedNode(); @@ -460,7 +454,7 @@ public abstract class AbstractCoordinator implements Closeable { return RequestFuture.noBrokersAvailable(); } else { // create a group metadata request - log.debug("Issuing group metadata request to broker {}", node.id()); + log.debug("Sending coordinator request for group {} to broker {}", groupId, node); GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) .compose(new RequestFutureAdapter<ClientResponse, Void>() { @@ -473,7 +467,7 @@ public abstract class AbstractCoordinator implements Closeable { } private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { - log.debug("Group metadata response {}", resp); + log.debug("Received group coordinator response {}", resp); if (!coordinatorUnknown()) { // We already found the coordinator, so ignore the request @@ -483,22 +477,24 @@ public abstract class AbstractCoordinator implements Closeable { // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 - short errorCode = groupCoordinatorResponse.errorCode(); - if (errorCode == Errors.NONE.code()) { + Errors error = Errors.forCode(groupCoordinatorResponse.errorCode()); + if (error == Errors.NONE) { this.coordinator = new Node(Integer.MAX_VALUE - groupCoordinatorResponse.node().id(), groupCoordinatorResponse.node().host(), groupCoordinatorResponse.node().port()); + log.info("Discovered coordinator {} for group {}.", coordinator, groupId); + client.tryConnect(coordinator); // start sending heartbeats only if we have a valid generation if (generation > 0) heartbeatTask.reset(); future.complete(null); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(Errors.forCode(errorCode)); + future.raise(error); } } } @@ -524,7 +520,7 @@ public abstract class AbstractCoordinator implements Closeable { */ protected void coordinatorDead() { if (this.coordinator != null) { - log.info("Marking the coordinator {} dead.", this.coordinator.id()); + log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId); this.coordinator = null; } } @@ -566,7 +562,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void onFailure(RuntimeException e) { - log.info("LeaveGroup request failed with error", e); + log.debug("LeaveGroup request for group {} failed with error", groupId, e); } }); @@ -608,33 +604,33 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); - short errorCode = heartbeatResponse.errorCode(); - if (errorCode == Errors.NONE.code()) { - log.debug("Received successful heartbeat response."); + Errors error = Errors.forCode(heartbeatResponse.errorCode()); + if (error == Errors.NONE) { + log.debug("Received successful heartbeat response for group {}", groupId); future.complete(null); - } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() - || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { - log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead."); + } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE + || error == Errors.NOT_COORDINATOR_FOR_GROUP) { + log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", + groupId, coordinator); coordinatorDead(); - future.raise(Errors.forCode(errorCode)); - } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) { - log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group."); + future.raise(error); + } else if (error == Errors.REBALANCE_IN_PROGRESS) { + log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.REBALANCE_IN_PROGRESS); - } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) { - log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group."); + } else if (error == Errors.ILLEGAL_GENERATION) { + log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.ILLEGAL_GENERATION); - } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { - log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group."); + } else if (error == Errors.UNKNOWN_MEMBER_ID) { + log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; AbstractCoordinator.this.rejoinNeeded = true; future.raise(Errors.UNKNOWN_MEMBER_ID); - } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) { + } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { future.raise(new GroupAuthorizationException(groupId)); } else { - future.raise(new KafkaException("Unexpected errorCode in heartbeat response: " - + Errors.forCode(errorCode).exception().getMessage())); + future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message())); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index aa39e11..b6b46c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -188,15 +188,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // execute the user's callback after rebalance ConsumerRebalanceListener listener = subscriptions.listener(); - log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions()); + log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsAssigned(assigned); } catch (WakeupException e) { throw e; } catch (Exception e) { - log.error("User provided listener " + listener.getClass().getName() - + " failed on partition assignment: ", e); + log.error("User provided listener {} for group {} failed on partition assignment", + listener.getClass().getName(), groupId, e); } } @@ -222,11 +222,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator { metadata.setTopics(this.subscriptions.groupSubscription()); client.ensureFreshMetadata(); - log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions); + log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", + groupId, assignor.name(), subscriptions); Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions); - log.debug("Finished assignment: {}", assignment); + log.debug("Finished assignment for group {}: {}", groupId, assignment); Map<String, ByteBuffer> groupAssignment = new HashMap<>(); for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) { @@ -244,15 +245,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // execute the user's callback before rebalance ConsumerRebalanceListener listener = subscriptions.listener(); - log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions()); + log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId); try { Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions()); listener.onPartitionsRevoked(revoked); } catch (WakeupException e) { throw e; } catch (Exception e) { - log.error("User provided listener " + listener.getClass().getName() - + " failed on partition revocation: ", e); + log.error("User provided listener {} for group {} failed on partition revocation", + listener.getClass().getName(), groupId, e); } subscriptions.needReassignment(); @@ -410,7 +411,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { return; if (coordinatorUnknown()) { - log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff"); + log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId); client.schedule(this, now + retryBackoffMs); return; } @@ -423,10 +424,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (exception == null) { reschedule(now + interval); } else if (exception instanceof SendFailedException) { - log.debug("Failed to send automatic offset commit, will retry immediately"); + log.debug("Failed to send automatic offset commit for group {}", groupId); reschedule(now); } else { - log.warn("Auto offset commit failed: {}", exception.getMessage()); + log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); reschedule(now + interval); } } @@ -447,7 +448,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Auto offset commit failed: ", e.getMessage()); + log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage()); } } } @@ -481,7 +482,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { OffsetCommitRequest.DEFAULT_RETENTION_TIME, offsetData); - log.trace("Sending offset-commit request with {} to {}", offsets, coordinator); + log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req) .compose(new OffsetCommitResponseHandler(offsets)); @@ -520,12 +521,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator { Errors error = Errors.forCode(entry.getValue()); if (error == Errors.NONE) { - log.debug("Committed offset {} for partition {}", offset, tp); + log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp); if (subscriptions.isAssigned(tp)) // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - log.error("Unauthorized to commit for group {}", groupId); + log.error("Not authorized to commit offsets for group {}", groupId); future.raise(new GroupAuthorizationException(groupId)); return; } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) { @@ -533,18 +534,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } else if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) { // raise the error to the user - log.info("Offset commit for group {} failed on partition {} due to {}, will retry", groupId, tp, error); + log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry - log.info("Offset commit for group {} failed due to {}, will retry", groupId, error); + log.debug("Offset commit for group {} failed: {}", groupId, error.message()); future.raise(error); return; } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) { - log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error); + log.debug("Offset commit for group {} failed: {}", groupId, error.message()); coordinatorDead(); future.raise(error); return; @@ -552,19 +553,24 @@ public final class ConsumerCoordinator extends AbstractCoordinator { || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) { // need to re-join group - log.error("Error {} occurred while committing offsets for group {}", error, groupId); + log.debug("Offset commit for group {} failed: {}", groupId, error.message()); subscriptions.needReassignment(); - future.raise(new CommitFailedException("Commit cannot be completed due to group rebalance")); + future.raise(new CommitFailedException("Commit cannot be completed since the group has already " + + "rebalanced and assigned the partitions to another member. This means that the time " + + "between subsequent calls to poll() was longer than the configured session.timeout.ms, " + + "which typically implies that the poll loop is spending too much time message processing. " + + "You can address this either by increasing the session timeout or by reducing the maximum " + + "size of batches returned in poll() with max.poll.records.")); return; } else { - log.error("Error committing partition {} at offset {}: {}", tp, offset, error.exception().getMessage()); - future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage())); + log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); + future.raise(new KafkaException("Unexpected error in commit: " + error.message())); return; } } if (!unauthorizedTopics.isEmpty()) { - log.error("Unauthorized to commit to topics {}", unauthorizedTopics); + log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId); future.raise(new TopicAuthorizationException(unauthorizedTopics)); } else { future.complete(null); @@ -583,9 +589,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (coordinatorUnknown()) return RequestFuture.coordinatorNotAvailable(); - log.debug("Fetching committed offsets for partitions: {}", partitions); + log.debug("Group {} fetching committed offsets for partitions: {}", groupId, partitions); // construct the request - OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions)); + OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<>(partitions)); // send the request with a callback return client.send(coordinator, ApiKeys.OFFSET_FETCH, request) @@ -606,31 +612,30 @@ public final class ConsumerCoordinator extends AbstractCoordinator { TopicPartition tp = entry.getKey(); OffsetFetchResponse.PartitionData data = entry.getValue(); if (data.hasError()) { - log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode) - .exception() - .getMessage()); - if (data.errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { + Errors error = Errors.forCode(data.errorCode); + log.debug("Group {} failed to fetch offset for partition {}: {}", groupId, tp, error.message()); + + if (error == Errors.GROUP_LOAD_IN_PROGRESS) { // just retry - future.raise(Errors.GROUP_LOAD_IN_PROGRESS); - } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { + future.raise(error); + } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) { // re-discover the coordinator and retry coordinatorDead(); - future.raise(Errors.NOT_COORDINATOR_FOR_GROUP); - } else if (data.errorCode == Errors.UNKNOWN_MEMBER_ID.code() - || data.errorCode == Errors.ILLEGAL_GENERATION.code()) { + future.raise(error); + } else if (error == Errors.UNKNOWN_MEMBER_ID + || error == Errors.ILLEGAL_GENERATION) { // need to re-join group subscriptions.needReassignment(); - future.raise(Errors.forCode(data.errorCode)); + future.raise(error); } else { - future.raise(new KafkaException("Unexpected error in fetch offset response: " - + Errors.forCode(data.errorCode).exception().getMessage())); + future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message())); } return; } else if (data.offset >= 0) { // record the position with the offset (-1 indicates no committed offset to fetch) offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata)); } else { - log.debug("No committed offset for partition " + tp); + log.debug("Group {} has no committed offset for partition {}", groupId, tp); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 7a1a720..b4d5c02 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -501,12 +501,12 @@ public class Fetcher<K, V> { future.complete(offset); } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", + log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); future.raise(Errors.forCode(errorCode)); } else { - log.error("Attempt to fetch offsets for partition {} failed due to: {}", - topicPartition, Errors.forCode(errorCode).exception().getMessage()); + log.warn("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, Errors.forCode(errorCode).message()); future.raise(new StaleMetadataException()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/common/Node.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/Node.java b/clients/src/main/java/org/apache/kafka/common/Node.java index 644cd71..24cf6f4 100644 --- a/clients/src/main/java/org/apache/kafka/common/Node.java +++ b/clients/src/main/java/org/apache/kafka/common/Node.java @@ -96,7 +96,7 @@ public class Node { @Override public String toString() { - return "Node(" + id + ", " + host + ", " + port + ")"; + return host + ":" + port + " (id: " + idString + ")"; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e403b3c4/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ab299af..90be014 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -183,6 +183,16 @@ public enum Errors { } /** + * Get a friendly description of the error (if one is available). + * @return the error message + */ + public String message() { + if (exception != null) + return exception.getMessage(); + return toString(); + } + + /** * Throw the exception if there is one */ public static Errors forCode(short code) {
