Repository: kafka Updated Branches: refs/heads/0.10.2 61024c9d2 -> a50635219
MINOR: Logging improvements in consumer internals Author: Jason Gustafson <ja...@confluent.io> Reviewers: Manikumar reddy O <manikumar.re...@gmail.com>, Ewen Cheslack-Postava <e...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #2469 from hachikuji/improve-consumer-logging (cherry picked from commit 5afe959647fcad9d01365427f4b455e1586b1fd5) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a5063521 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a5063521 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a5063521 Branch: refs/heads/0.10.2 Commit: a5063521943f8d0f6940b18d4f0d57045aa395ae Parents: 61024c9 Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Jan 31 12:27:00 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Jan 31 12:28:36 2017 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 9 +++++ .../kafka/clients/consumer/KafkaConsumer.java | 8 ++-- .../consumer/internals/AbstractCoordinator.java | 33 ++++++++++------ .../consumer/internals/ConsumerCoordinator.java | 41 ++++++++++++-------- .../internals/ConsumerNetworkClient.java | 9 +++-- .../clients/consumer/internals/Fetcher.java | 5 ++- .../apache/kafka/common/utils/KafkaThread.java | 9 +++++ .../clients/consumer/KafkaConsumerTest.java | 2 - .../internals/ConsumerCoordinatorTest.java | 30 +++++++------- 9 files changed, 90 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 0eb7670..3a75288 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -427,14 +427,23 @@ public class NetworkClient implements KafkaClient { int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString()); if (currInflight == 0 && this.connectionStates.isReady(node.idString())) { // if we find an established connection with no in-flight requests we can stop right away + log.trace("Found least loaded node {} connected with no in-flight requests", node); return node; } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) { // otherwise if this is the best we have found so far, record that inflight = currInflight; found = node; + } else if (log.isTraceEnabled()) { + log.trace("Removing node {} from least loaded node selection: is-blacked-out: {}, in-flight-requests: {}", + node, this.connectionStates.isBlackedOut(node.idString(), now), currInflight); } } + if (found != null) + log.trace("Found least loaded node {}", found); + else + log.trace("Least loaded node selection failed to find an available node"); + return found; } http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6064c39..ed3d607 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -78,8 +78,8 @@ import java.util.regex.Pattern; * <h3>Cross-Version Compatibility</h3> * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support * certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added - * in version 0.10.1. You will receive an UnsupportedVersionException when invoking an API that is not available on the - * running broker version. + * in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException} + * when invoking an API that is not available on the running broker version. * <p> * * <h3>Offsets and Consumer Position</h3> @@ -685,7 +685,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { metricGrpPrefix, this.time, retryBackoffMs, - new ConsumerCoordinator.DefaultOffsetCommitCallback(), config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, @@ -1443,7 +1442,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. - * @throws UnsupportedVersionException if the broker does not support looking up the offsets by timestamp. + * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up + * the offsets by timestamp. */ @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 7d77c0b..6eea045 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.LeaveGroupResponse; import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -211,6 +212,7 @@ public abstract class AbstractCoordinator implements Closeable { if (remainingMs <= 0) break; + log.debug("Coordinator discovery failed for group {}, refreshing metadata", groupId); client.awaitMetadataUpdate(remainingMs); } else throw future.exception(); @@ -236,6 +238,7 @@ public abstract class AbstractCoordinator implements Closeable { if (node == null) { // TODO: If there are no brokers left, perhaps we should use the bootstrap set // from configuration? + log.debug("No broker available to send GroupCoordinator request for group {}", groupId); return RequestFuture.noBrokersAvailable(); } else findCoordinatorFuture = sendGroupCoordinatorRequest(node); @@ -419,7 +422,7 @@ public abstract class AbstractCoordinator implements Closeable { public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { - log.debug("Received successful join group response for group {}: {}", groupId, joinResponse); + log.debug("Received successful JoinGroup response for group {}: {}", groupId, joinResponse); sensors.joinLatency.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { @@ -542,7 +545,7 @@ public abstract class AbstractCoordinator implements Closeable { */ private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) { // initiate the group metadata request - log.debug("Sending coordinator request for group {} to broker {}", groupId, node); + log.debug("Sending GroupCoordinator request for group {} to broker {}", groupId, node); GroupCoordinatorRequest.Builder requestBuilder = new GroupCoordinatorRequest.Builder(this.groupId); return client.send(node, requestBuilder) @@ -553,7 +556,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { - log.debug("Received group coordinator response {}", resp); + log.debug("Received GroupCoordinator response {} for group {}", resp, groupId); GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody(); // use MAX_VALUE - node.id as the coordinator id to mimic separate connections @@ -671,6 +674,7 @@ public abstract class AbstractCoordinator implements Closeable { if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) { // this is a minimal effort attempt to leave the group. we do not // attempt any resending if the request fails or times out. + log.debug("Sending LeaveGroup request to coordinator {} for group {}", coordinator, groupId); LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(groupId, generation.memberId); client.send(coordinator, request) @@ -697,6 +701,7 @@ public abstract class AbstractCoordinator implements Closeable { // visible for testing synchronized RequestFuture<Void> sendHeartbeatRequest() { + log.debug("Sending Heartbeat request for group {} to coordinator {}", groupId, coordinator); HeartbeatRequest.Builder requestBuilder = new HeartbeatRequest.Builder(this.groupId, this.generation.generationId, this.generation.memberId); return client.send(coordinator, requestBuilder) @@ -709,24 +714,24 @@ public abstract class AbstractCoordinator implements Closeable { sensors.heartbeatLatency.record(response.requestLatencyMs()); Errors error = Errors.forCode(heartbeatResponse.errorCode()); if (error == Errors.NONE) { - log.debug("Received successful heartbeat response for group {}", groupId); + log.debug("Received successful Heartbeat response for group {}", groupId); future.complete(null); } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) { - log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.", + log.debug("Attempt to heartbeat failed for group {} since coordinator {} is either not started or not valid.", groupId, coordinator()); coordinatorDead(); future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId); + log.debug("Attempt to heartbeat failed for group {} since it is rebalancing.", groupId); requestRejoin(); future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) { - log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId); + log.debug("Attempt to heartbeat failed for group {} since generation id is not legal.", groupId); resetGeneration(); future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) { - log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId); + log.debug("Attempt to heartbeat failed for group {} since member id is not valid.", groupId); resetGeneration(); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { @@ -818,18 +823,18 @@ public abstract class AbstractCoordinator implements Closeable { } } - private class HeartbeatThread extends Thread { + private class HeartbeatThread extends KafkaThread { private boolean enabled = false; private boolean closed = false; private AtomicReference<RuntimeException> failed = new AtomicReference<>(null); - HeartbeatThread() { - super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId)); - setDaemon(true); + private HeartbeatThread() { + super("kafka-coordinator-heartbeat-thread" + (groupId.isEmpty() ? "" : " | " + groupId), true); } public void enable() { synchronized (AbstractCoordinator.this) { + log.trace("Enabling heartbeat thread for group {}", groupId); this.enabled = true; heartbeat.resetTimeouts(time.milliseconds()); AbstractCoordinator.this.notify(); @@ -838,6 +843,7 @@ public abstract class AbstractCoordinator implements Closeable { public void disable() { synchronized (AbstractCoordinator.this) { + log.trace("Disabling heartbeat thread for group {}", groupId); this.enabled = false; } } @@ -860,6 +866,7 @@ public abstract class AbstractCoordinator implements Closeable { @Override public void run() { try { + log.debug("Heartbeat thread for group {} started", groupId); while (true) { synchronized (AbstractCoordinator.this) { if (closed) @@ -936,6 +943,8 @@ public abstract class AbstractCoordinator implements Closeable { } catch (RuntimeException e) { log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e); this.failed.set(e); + } finally { + log.debug("Heartbeat thread for group {} has closed", groupId); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 03b767a..8669527 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -100,7 +100,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { String metricGrpPrefix, Time time, long retryBackoffMs, - OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, @@ -117,7 +116,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch()); this.subscriptions = subscriptions; - this.defaultOffsetCommitCallback = defaultOffsetCommitCallback; + this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback(); this.autoCommitEnabled = autoCommitEnabled; this.autoCommitIntervalMs = autoCommitIntervalMs; this.assignors = assignors; @@ -355,13 +354,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!assignedTopics.containsAll(allSubscribedTopics)) { Set<String> notAssignedTopics = new HashSet<>(allSubscribedTopics); notAssignedTopics.removeAll(assignedTopics); - log.warn("The following subscribed topics are not assigned to any members in the group {} : {} ", groupId, notAssignedTopics); + log.warn("The following subscribed topics are not assigned to any members in the group {} : {} ", groupId, + notAssignedTopics); } if (!allSubscribedTopics.containsAll(assignedTopics)) { Set<String> newlyAddedTopics = new HashSet<>(assignedTopics); newlyAddedTopics.removeAll(allSubscribedTopics); - log.info("The following not-subscribed topics are assigned to group {}, and their metadata will be fetched from the brokers : {}", groupId, newlyAddedTopics); + log.info("The following not-subscribed topics are assigned to group {}, and their metadata will be " + + "fetched from the brokers : {}", groupId, newlyAddedTopics); allSubscribedTopics.addAll(assignedTopics); this.subscriptions.groupSubscribe(allSubscribedTopics); @@ -487,7 +488,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } } - public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); @@ -612,15 +612,19 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } private void doAutoCommitOffsetsAsync() { - commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() { + Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); + log.debug("Sending asynchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); + + commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { - log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage()); + log.warn("Auto-commit of offsets {} failed for group {}: {}", offsets, groupId, + exception.getMessage()); if (exception instanceof RetriableException) nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline); } else { - log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId); + log.debug("Completed auto-commit of offsets {} for group {}", offsets, groupId); } } }); @@ -628,25 +632,30 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private void maybeAutoCommitOffsetsSync(long timeoutMs) { if (autoCommitEnabled) { + Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed(); try { - if (!commitOffsetsSync(subscriptions.allConsumed(), timeoutMs)) - log.debug("Automatic commit of offsets {} for group {} timed out before completion", subscriptions.allConsumed(), groupId); + log.debug("Sending synchronous auto-commit of offsets {} for group {}", allConsumedOffsets, groupId); + if (!commitOffsetsSync(allConsumedOffsets, timeoutMs)) + log.debug("Auto-commit of offsets {} for group {} timed out before completion", + allConsumedOffsets, groupId); } catch (WakeupException | InterruptException e) { - log.debug("Automatic commit of offsets {} for group {} was interrupted before completion", subscriptions.allConsumed(), groupId); + log.debug("Auto-commit of offsets {} for group {} was interrupted before completion", + allConsumedOffsets, groupId); // rethrow wakeups since they are triggered by the user throw e; } catch (Exception e) { // consistent with async auto-commit failures, we do not propagate the exception - log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage()); + log.warn("Auto-commit of offsets {} failed for group {}: {}", allConsumedOffsets, groupId, + e.getMessage()); } } } - public static class DefaultOffsetCommitCallback implements OffsetCommitCallback { + private class DefaultOffsetCommitCallback implements OffsetCommitCallback { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) - log.error("Offset commit failed.", exception); + log.error("Offset commit with offsets {} failed for group {}", offsets, groupId, exception); } } @@ -694,7 +703,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { setMemberId(generation.memberId). setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME); - log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); + log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId); return client.send(coordinator, builder) .compose(new OffsetCommitResponseHandler(offsets)); @@ -704,7 +713,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final Map<TopicPartition, OffsetAndMetadata> offsets; - public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) { + private OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) { this.offsets = offsets; } http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index ea92ab7..e5c5cf6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -90,8 +90,7 @@ public class ConsumerNetworkClient implements Closeable { * @param requestBuilder A builder for the request payload * @return A future which indicates the result of the send. */ - public RequestFuture<ClientResponse> send(Node node, - AbstractRequest.Builder<?> requestBuilder) { + public RequestFuture<ClientResponse> send(Node node, AbstractRequest.Builder<?> requestBuilder) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); ClientRequest clientRequest = client.newClientRequest(node.idString(), requestBuilder, now, true, @@ -157,6 +156,7 @@ public class ConsumerNetworkClient implements Closeable { public void wakeup() { // wakeup should be safe without holding the client lock since it simply delegates to // Selector's wakeup, which is threadsafe + log.trace("Received user wakeup"); this.wakeup.set(true); this.client.wakeup(); } @@ -406,6 +406,7 @@ public class ConsumerNetworkClient implements Closeable { private void maybeTriggerWakeup() { if (wakeupDisabledCount == 0 && wakeup.get()) { + log.trace("Raising wakeup exception in response to user wakeup"); wakeup.set(false); throw new WakeupException(); } @@ -446,7 +447,7 @@ public class ConsumerNetworkClient implements Closeable { /** * Find whether a previous connection has failed. Note that the failure state will persist until either - * {@link #tryConnect(Node)} or {@link #send(Node, ApiKeys, AbstractRequest)} has been called. + * {@link #tryConnect(Node)} or {@link #send(Node, AbstractRequest.Builder)} has been called. * @param node Node to connect to if possible */ public boolean connectionFailed(Node node) { @@ -457,7 +458,7 @@ public class ConsumerNetworkClient implements Closeable { /** * Initiate a connection if currently possible. This is only really useful for resetting the failed - * status of a socket. If there is an actual request to send, then {@link #send(Node, ApiKeys, AbstractRequest)} + * status of a socket. If there is an actual request to send, then {@link #send(Node, AbstractRequest.Builder)} * should be used. * @param node The node to connect to */ http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index d4ecfc6..6a13d46 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -177,6 +177,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { final FetchRequest.Builder request = fetchEntry.getValue(); final Node fetchTarget = fetchEntry.getKey(); + log.debug("Sending fetch for partitions {} to broker {}", request.fetchData().keySet(), fetchTarget); client.send(fetchTarget, request) .addListener(new RequestFutureListener<ClientResponse>() { @Override @@ -208,7 +209,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { @Override public void onFailure(RuntimeException e) { - log.debug("Fetch request to {} failed", fetchTarget, e); + log.debug("Fetch request to {} for partitions {} failed", fetchTarget, request.fetchData().keySet(), e); } }); } @@ -722,7 +723,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { long position = this.subscriptions.position(partition); fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize)); - log.trace("Added fetch request for partition {} at offset {}", partition, position); + log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node); } else { log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node); } http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java index 57247c8..faca685 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/KafkaThread.java @@ -22,8 +22,17 @@ public class KafkaThread extends Thread { private final Logger log = LoggerFactory.getLogger(getClass()); + public KafkaThread(final String name, boolean daemon) { + super(name); + configureThread(name, daemon); + } + public KafkaThread(final String name, Runnable runnable, boolean daemon) { super(runnable, name); + configureThread(name, daemon); + } + + private void configureThread(final String name, boolean daemon) { setDaemon(daemon); setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t, Throwable e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 8346e93..4aaa172 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1486,7 +1486,6 @@ public class KafkaConsumerTest { Deserializer<String> valueDeserializer = new StringDeserializer(); OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST; - OffsetCommitCallback defaultCommitCallback = new ConsumerCoordinator.DefaultOffsetCommitCallback(); List<PartitionAssignor> assignors = Arrays.asList(assignor); ConsumerInterceptors<String, String> interceptors = null; @@ -1506,7 +1505,6 @@ public class KafkaConsumerTest { metricGroupPrefix, time, retryBackoffMs, - defaultCommitCallback, autoCommitEnabled, autoCommitIntervalMs, interceptors, http://git-wip-us.apache.org/repos/asf/kafka/blob/a5063521/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index e13d49f..e11bf30 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -112,10 +112,9 @@ public class ConsumerCoordinatorTest { private Metrics metrics; private ConsumerNetworkClient consumerClient; private MockRebalanceListener rebalanceListener; - private MockCommitCallback defaultOffsetCommitCallback; + private MockCommitCallback mockOffsetCommitCallback; private ConsumerCoordinator coordinator; - @Before public void setup() { this.time = new MockTime(); @@ -126,7 +125,7 @@ public class ConsumerCoordinatorTest { this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); - this.defaultOffsetCommitCallback = new MockCommitCallback(); + this.mockOffsetCommitCallback = new MockCommitCallback(); this.partitionAssignor.clear(); client.setNode(node); @@ -1010,14 +1009,14 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncWithDefaultCallback() { - int invokedBeforeTest = defaultOffsetCommitCallback.invoked; + int invokedBeforeTest = mockOffsetCommitCallback.invoked; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE.code()))); - coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), null); + coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); - assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); - assertNull(defaultOffsetCommitCallback.exception); + assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); + assertNull(mockOffsetCommitCallback.exception); } @Test @@ -1059,14 +1058,14 @@ public class ConsumerCoordinatorTest { @Test public void testCommitOffsetAsyncFailedWithDefaultCallback() { - int invokedBeforeTest = defaultOffsetCommitCallback.invoked; + int invokedBeforeTest = mockOffsetCommitCallback.invoked; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))); - coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), null); + coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); - assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); - assertTrue(defaultOffsetCommitCallback.exception instanceof RetriableCommitFailedException); + assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); + assertTrue(mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException); } @Test @@ -1220,12 +1219,12 @@ public class ConsumerCoordinatorTest { @Test public void testCommitAsyncNegativeOffset() { - int invokedBeforeTest = defaultOffsetCommitCallback.invoked; + int invokedBeforeTest = mockOffsetCommitCallback.invoked; client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), null); + coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), mockOffsetCommitCallback); coordinator.invokeCompletedOffsetCommitCallbacks(); - assertEquals(invokedBeforeTest + 1, defaultOffsetCommitCallback.invoked); - assertTrue(defaultOffsetCommitCallback.exception instanceof IllegalArgumentException); + assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked); + assertTrue(mockOffsetCommitCallback.exception instanceof IllegalArgumentException); } @Test @@ -1531,7 +1530,6 @@ public class ConsumerCoordinatorTest { "consumer" + groupId, time, retryBackoffMs, - defaultOffsetCommitCallback, autoCommitEnabled, autoCommitIntervalMs, null,