Repository: kafka Updated Branches: refs/heads/trunk 8c3c9548b -> 9fb1e2573
KAFKA-2880: consumer should handle disconnect/timeout for metadata requests Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma, Guozhang Wang Closes #581 from hachikuji/KAFKA-2880 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fb1e257 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fb1e257 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fb1e257 Branch: refs/heads/trunk Commit: 9fb1e25738f89b75a36ef69f730b0e138ccd55b1 Parents: 8c3c954 Author: Jason Gustafson <[email protected]> Authored: Wed Dec 2 11:27:36 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Wed Dec 2 11:27:36 2015 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 6 + .../consumer/internals/AbstractCoordinator.java | 35 +-- .../consumer/internals/ConsumerCoordinator.java | 5 +- .../internals/ConsumerNetworkClient.java | 23 +- .../clients/consumer/internals/Fetcher.java | 267 ++++++++++--------- .../common/errors/DisconnectException.java | 1 + .../kafka/common/requests/MetadataResponse.java | 37 ++- .../clients/consumer/internals/FetcherTest.java | 67 ++++- .../org/apache/kafka/common/utils/MockTime.java | 8 + .../main/scala/kafka/admin/AdminClient.scala | 6 +- .../kafka/api/AuthorizerIntegrationTest.scala | 14 + .../kafka/api/PlaintextConsumerTest.scala | 16 +- 12 files changed, 313 insertions(+), 172 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6c15df3..9b36af6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1136,6 +1136,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic + * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before + * expiration of the configured request timeout + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ @Override public List<PartitionInfo> partitionsFor(String topic) { @@ -1160,6 +1163,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * @return The map of topics and its partitions * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called + * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before + * expiration of the configured request timeout + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors */ @Override public Map<String, List<PartitionInfo>> listTopics() { http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 9aa1aaf..322de5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -43,7 +43,6 @@ import org.apache.kafka.common.requests.OffsetCommitRequest; import org.apache.kafka.common.requests.SyncGroupRequest; import org.apache.kafka.common.requests.SyncGroupResponse; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -235,7 +234,7 @@ public abstract class AbstractCoordinator implements Closeable { continue; else if (!future.isRetriable()) throw exception; - Utils.sleep(retryBackoffMs); + time.sleep(retryBackoffMs); } } } @@ -484,11 +483,7 @@ public abstract class AbstractCoordinator implements Closeable { private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture<Void> future) { log.debug("Group metadata response {}", resp); - // parse the response to get the coordinator info if it is not disconnected, - // otherwise we need to request metadata update - if (resp.wasDisconnected()) { - future.raise(new DisconnectException()); - } else if (!coordinatorUnknown()) { + if (!coordinatorUnknown()) { // We already found the coordinator, so ignore the request future.complete(null); } else { @@ -661,25 +656,19 @@ public abstract class AbstractCoordinator implements Closeable { public abstract void handle(R response, RequestFuture<T> future); @Override - public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) { - this.response = clientResponse; - - if (clientResponse.wasDisconnected()) { - int correlation = response.request().request().header().correlationId(); - log.debug("Cancelled request {} with correlation id {} due to coordinator {} being disconnected", - response.request(), - correlation, - response.request().request().destination()); - - // mark the coordinator as dead + public void onFailure(RuntimeException e, RequestFuture<T> future) { + // mark the coordinator as dead + if (e instanceof DisconnectException) coordinatorDead(); - future.raise(new DisconnectException()); - return; - } + future.raise(e); + } + @Override + public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) { try { - R response = parse(clientResponse); - handle(response, future); + this.response = clientResponse; + R responseObj = parse(clientResponse); + handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) future.raise(e); http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index f6d1029..8453c7b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -42,7 +42,6 @@ import org.apache.kafka.common.requests.OffsetCommitResponse; import org.apache.kafka.common.requests.OffsetFetchRequest; import org.apache.kafka.common.requests.OffsetFetchResponse; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -294,7 +293,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!future.isRetriable()) throw future.exception(); - Utils.sleep(retryBackoffMs); + time.sleep(retryBackoffMs); } } @@ -358,7 +357,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!future.isRetriable()) throw future.exception(); - Utils.sleep(retryBackoffMs); + time.sleep(retryBackoffMs); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index f2e215d..84c312e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -18,12 +18,15 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -45,6 +48,8 @@ import java.util.concurrent.atomic.AtomicBoolean; * How we do this may depend on KAFKA-2120, so for now, we retain the simplistic behavior. */ public class ConsumerNetworkClient implements Closeable { + private static final Logger log = LoggerFactory.getLogger(ConsumerNetworkClient.class); + private final KafkaClient client; private final AtomicBoolean wakeup = new AtomicBoolean(false); private final DelayedTaskQueue delayedTasks = new DelayedTaskQueue(); @@ -88,7 +93,9 @@ public class ConsumerNetworkClient implements Closeable { * Send a new request. Note that the request is not actually transmitted on the * network until one of the {@link #poll(long)} variants is invoked. At this * point the request will either be transmitted successfully or will fail. - * Use the returned future to obtain the result of the send. + * Use the returned future to obtain the result of the send. Note that there is no + * need to check for disconnects explicitly on the {@link ClientResponse} object; + * instead, the future will be failed with a {@link DisconnectException}. * @param node The destination of the request * @param api The Kafka API call * @param request The request payload @@ -258,7 +265,7 @@ public class ConsumerNetworkClient implements Closeable { for (ClientRequest request : requestEntry.getValue()) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); - handler.complete(new ClientResponse(request, now, true, null)); + handler.onComplete(new ClientResponse(request, now, true, null)); } iterator.remove(); } @@ -350,7 +357,17 @@ public class ConsumerNetworkClient implements Closeable { @Override public void onComplete(ClientResponse response) { - complete(response); + if (response.wasDisconnected()) { + ClientRequest request = response.request(); + RequestSend send = request.request(); + ApiKeys api = ApiKeys.forId(send.header().apiKey()); + int correlation = send.header().correlationId(); + log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", + api, request, correlation, send.destination()); + raise(DisconnectException.INSTANCE); + } else { + complete(response); + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index a034264..5708869 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -25,9 +25,11 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.InvalidMetadataException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; @@ -113,7 +115,7 @@ public class Fetcher<K, V> { this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; - this.records = new LinkedList<PartitionRecords<K, V>>(); + this.records = new LinkedList<>(); this.offsetOutOfRangePartitions = new HashMap<>(); this.unauthorizedTopics = new HashSet<>(); this.recordTooLargePartitions = new HashMap<>(); @@ -191,45 +193,82 @@ public class Fetcher<K, V> { if (topics != null && topics.isEmpty()) return Collections.emptyMap(); - final HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>(); - long startTime = time.milliseconds(); + long start = time.milliseconds(); + long remaining = timeout; - while (time.milliseconds() - startTime < timeout) { - RequestFuture<ClientResponse> requestFuture = sendMetadataRequest(topics); - if (requestFuture != null) { - client.poll(requestFuture); + do { + RequestFuture<ClientResponse> future = sendMetadataRequest(topics); + client.poll(future, remaining); - if (requestFuture.succeeded()) { - MetadataResponse response = - new MetadataResponse(requestFuture.value().responseBody()); + if (future.failed() && !future.isRetriable()) + throw future.exception(); - for (String topic : response.cluster().topics()) - topicsPartitionInfos.put( - topic, response.cluster().availablePartitionsForTopic(topic)); + if (future.succeeded()) { + MetadataResponse response = new MetadataResponse(future.value().responseBody()); + Cluster cluster = response.cluster(); + + Set<String> unauthorizedTopics = cluster.unauthorizedTopics(); + if (!unauthorizedTopics.isEmpty()) + throw new TopicAuthorizationException(unauthorizedTopics); + + boolean shouldRetry = false; + if (!response.errors().isEmpty()) { + // if there were errors, we need to check whether they were fatal or whether + // we should just retry + + log.debug("Topic metadata fetch included errors: {}", response.errors()); + + for (Map.Entry<String, Errors> errorEntry : response.errors().entrySet()) { + String topic = errorEntry.getKey(); + Errors error = errorEntry.getValue(); + + if (error == Errors.INVALID_TOPIC_EXCEPTION) + throw new InvalidTopicException("Topic '" + topic + "' is invalid"); + else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) + // if a requested topic is unknown, we just continue and let it be absent + // in the returned map + continue; + else if (error.exception() instanceof RetriableException) + shouldRetry = true; + else + throw new KafkaException("Unexpected error fetching metadata for topic " + topic, + error.exception()); + } + } + if (!shouldRetry) { + HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>(); + for (String topic : cluster.topics()) + topicsPartitionInfos.put(topic, cluster.availablePartitionsForTopic(topic)); return topicsPartitionInfos; } - - if (!requestFuture.isRetriable()) - throw requestFuture.exception(); } - Utils.sleep(retryBackoffMs); - } + long elapsed = time.milliseconds() - start; + remaining = timeout - elapsed; - return topicsPartitionInfos; + if (remaining > 0) { + long backoff = Math.min(remaining, retryBackoffMs); + time.sleep(backoff); + remaining -= backoff; + } + } while (remaining > 0); + + throw new TimeoutException("Timeout expired while fetching topic metadata"); } /** * Send Metadata Request to least loaded node in Kafka cluster asynchronously * @return A future that indicates result of sent metadata request */ - public RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) { + private RequestFuture<ClientResponse> sendMetadataRequest(List<String> topics) { if (topics == null) topics = Collections.emptyList(); final Node node = client.leastLoadedNode(); - return node == null ? null : - client.send(node, ApiKeys.METADATA, new MetadataRequest(topics)); + if (node == null) + return RequestFuture.noBrokersAvailable(); + else + return client.send(node, ApiKeys.METADATA, new MetadataRequest(topics)); } /** @@ -277,7 +316,7 @@ public class Fetcher<K, V> { if (future.exception() instanceof InvalidMetadataException) client.awaitMetadataUpdate(); else - Utils.sleep(retryBackoffMs); + time.sleep(retryBackoffMs); } } @@ -435,29 +474,25 @@ public class Fetcher<K, V> { private void handleListOffsetResponse(TopicPartition topicPartition, ClientResponse clientResponse, RequestFuture<Long> future) { - if (clientResponse.wasDisconnected()) { - future.raise(new DisconnectException()); + ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); + short errorCode = lor.responseData().get(topicPartition).errorCode; + if (errorCode == Errors.NONE.code()) { + List<Long> offsets = lor.responseData().get(topicPartition).offsets; + if (offsets.size() != 1) + throw new IllegalStateException("This should not happen."); + long offset = offsets.get(0); + log.debug("Fetched offset {} for partition {}", offset, topicPartition); + + future.complete(offset); + } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() + || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", + topicPartition); + future.raise(Errors.forCode(errorCode)); } else { - ListOffsetResponse lor = new ListOffsetResponse(clientResponse.responseBody()); - short errorCode = lor.responseData().get(topicPartition).errorCode; - if (errorCode == Errors.NONE.code()) { - List<Long> offsets = lor.responseData().get(topicPartition).offsets; - if (offsets.size() != 1) - throw new IllegalStateException("This should not happen."); - long offset = offsets.get(0); - log.debug("Fetched offset {} for partition {}", offset, topicPartition); - - future.complete(offset); - } else if (errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - log.warn("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", - topicPartition); - future.raise(Errors.forCode(errorCode)); - } else { - log.error("Attempt to fetch offsets for partition {} failed due to: {}", - topicPartition, Errors.forCode(errorCode).exception().getMessage()); - future.raise(new StaleMetadataException()); - } + log.error("Attempt to fetch offsets for partition {} failed due to: {}", + topicPartition, Errors.forCode(errorCode).exception().getMessage()); + future.raise(new StaleMetadataException()); } } @@ -502,84 +537,78 @@ public class Fetcher<K, V> { * The callback for fetch completion */ private void handleFetchResponse(ClientResponse resp, FetchRequest request) { - if (resp.wasDisconnected()) { - int correlation = resp.request().request().header().correlationId(); - log.debug("Cancelled fetch request {} with correlation id {} due to node {} being disconnected", - resp.request(), correlation, resp.request().request().destination()); - } else { - int totalBytes = 0; - int totalCount = 0; - FetchResponse response = new FetchResponse(resp.responseBody()); - for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) { - TopicPartition tp = entry.getKey(); - FetchResponse.PartitionData partition = entry.getValue(); - if (!subscriptions.isFetchable(tp)) { - // this can happen when a rebalance happened or a partition consumption paused - // while fetch is still in-flight - log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); - } else if (partition.errorCode == Errors.NONE.code()) { - long fetchOffset = request.fetchData().get(tp).offset; - - // we are interested in this fetch only if the beginning offset matches the - // current consumed position - Long consumed = subscriptions.consumed(tp); - if (consumed == null) { - continue; - } else if (consumed != fetchOffset) { - // the fetched position has gotten out of sync with the consumed position - // (which might happen when a rebalance occurs with a fetch in-flight), - // so we need to reset the fetch position so the next fetch is right - subscriptions.fetched(tp, consumed); - continue; - } - - int bytes = 0; - ByteBuffer buffer = partition.recordSet; - MemoryRecords records = MemoryRecords.readableRecords(buffer); - List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>(); - for (LogEntry logEntry : records) { - parsed.add(parseRecord(tp, logEntry)); - bytes += logEntry.size(); - } + int totalBytes = 0; + int totalCount = 0; + FetchResponse response = new FetchResponse(resp.responseBody()); + for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) { + TopicPartition tp = entry.getKey(); + FetchResponse.PartitionData partition = entry.getValue(); + if (!subscriptions.isFetchable(tp)) { + // this can happen when a rebalance happened or a partition consumption paused + // while fetch is still in-flight + log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp); + } else if (partition.errorCode == Errors.NONE.code()) { + long fetchOffset = request.fetchData().get(tp).offset; + + // we are interested in this fetch only if the beginning offset matches the + // current consumed position + Long consumed = subscriptions.consumed(tp); + if (consumed == null) { + continue; + } else if (consumed != fetchOffset) { + // the fetched position has gotten out of sync with the consumed position + // (which might happen when a rebalance occurs with a fetch in-flight), + // so we need to reset the fetch position so the next fetch is right + subscriptions.fetched(tp, consumed); + continue; + } - if (!parsed.isEmpty()) { - ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); - this.subscriptions.fetched(tp, record.offset() + 1); - this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed)); - this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); - } else if (buffer.limit() > 0) { - // we did not read a single message from a non-empty buffer - // because that message's size is larger than fetch size, in this case - // record this exception - this.recordTooLargePartitions.put(tp, fetchOffset); - } + int bytes = 0; + ByteBuffer buffer = partition.recordSet; + MemoryRecords records = MemoryRecords.readableRecords(buffer); + List<ConsumerRecord<K, V>> parsed = new ArrayList<ConsumerRecord<K, V>>(); + for (LogEntry logEntry : records) { + parsed.add(parseRecord(tp, logEntry)); + bytes += logEntry.size(); + } - this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size()); - totalBytes += bytes; - totalCount += parsed.size(); - } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() - || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { - this.metadata.requestUpdate(); - } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { - long fetchOffset = request.fetchData().get(tp).offset; - if (subscriptions.hasDefaultOffsetResetPolicy()) - subscriptions.needOffsetReset(tp); - else - this.offsetOutOfRangePartitions.put(tp, fetchOffset); - log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp)); - } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { - log.warn("Not authorized to read from topic {}.", tp.topic()); - unauthorizedTopics.add(tp.topic()); - } else if (partition.errorCode == Errors.UNKNOWN.code()) { - log.warn("Unknown error fetching data for topic-partition {}", tp); - } else { - throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data"); + if (!parsed.isEmpty()) { + ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); + this.subscriptions.fetched(tp, record.offset() + 1); + this.records.add(new PartitionRecords<>(fetchOffset, tp, parsed)); + this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); + } else if (buffer.limit() > 0) { + // we did not read a single message from a non-empty buffer + // because that message's size is larger than fetch size, in this case + // record this exception + this.recordTooLargePartitions.put(tp, fetchOffset); } + + this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, parsed.size()); + totalBytes += bytes; + totalCount += parsed.size(); + } else if (partition.errorCode == Errors.NOT_LEADER_FOR_PARTITION.code() + || partition.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) { + this.metadata.requestUpdate(); + } else if (partition.errorCode == Errors.OFFSET_OUT_OF_RANGE.code()) { + long fetchOffset = request.fetchData().get(tp).offset; + if (subscriptions.hasDefaultOffsetResetPolicy()) + subscriptions.needOffsetReset(tp); + else + this.offsetOutOfRangePartitions.put(tp, fetchOffset); + log.info("Fetch offset {} is out of range, resetting offset", subscriptions.fetched(tp)); + } else if (partition.errorCode == Errors.TOPIC_AUTHORIZATION_FAILED.code()) { + log.warn("Not authorized to read from topic {}.", tp.topic()); + unauthorizedTopics.add(tp.topic()); + } else if (partition.errorCode == Errors.UNKNOWN.code()) { + log.warn("Unknown error fetching data for topic-partition {}", tp); + } else { + throw new IllegalStateException("Unexpected error code " + partition.errorCode + " while fetching data"); } - this.sensors.bytesFetched.record(totalBytes); - this.sensors.recordsFetched.record(totalCount); - this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); } + this.sensors.bytesFetched.record(totalBytes); + this.sensors.recordsFetched.record(totalCount); + this.sensors.fetchThrottleTimeSensor.record(response.getThrottleTime()); this.sensors.fetchLatency.record(resp.requestLatencyMs()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java index 18d61a2..557681c 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.errors; * Server disconnected before a request could be completed. */ public class DisconnectException extends RetriableException { + public static final DisconnectException INSTANCE = new DisconnectException(); private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index eb163dd..170e4b8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -45,9 +45,12 @@ public class MetadataResponse extends AbstractRequestResponse { private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code"; /** - * Possible error code: + * Possible error codes: * - * TODO + * UnknownTopic (3) + * LeaderNotAvailable (5) + * InvalidTopic (17) + * TopicAuthorizationFailed (29) */ private static final String TOPIC_KEY_NAME = "topic"; @@ -57,9 +60,10 @@ public class MetadataResponse extends AbstractRequestResponse { private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code"; /** - * Possible error code: + * Possible error codes: * - * TODO + * LeaderNotAvailable (5) + * ReplicaNotAvailable (9) */ private static final String PARTITION_KEY_NAME = "partition_id"; @@ -87,14 +91,20 @@ public class MetadataResponse extends AbstractRequestResponse { } struct.set(BROKERS_KEY_NAME, brokerArray.toArray()); + List<Struct> topicArray = new ArrayList<Struct>(); - for (String topic : cluster.topics()) { + for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) { Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, errorEntry.getKey()); + topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errorEntry.getValue().code()); + topicData.set(PARTITION_METADATA_KEY_NAME, new Struct[0]); + topicArray.add(topicData); + } - topicData.set(TOPIC_KEY_NAME, topic); - if (errors.containsKey(topic)) { - topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code()); - } else { + for (String topic : cluster.topics()) { + if (!errors.containsKey(topic)) { + Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME); + topicData.set(TOPIC_KEY_NAME, topic); topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code()); List<Struct> partitionArray = new ArrayList<Struct>(); for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) { @@ -113,14 +123,13 @@ public class MetadataResponse extends AbstractRequestResponse { partitionArray.add(partitionData); } topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray()); + topicArray.add(topicData); } - - topicArray.add(topicData); } struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray()); this.cluster = cluster; - this.errors = new HashMap<String, Errors>(); + this.errors = errors; } public MetadataResponse(Struct struct) { @@ -183,6 +192,10 @@ public class MetadataResponse extends AbstractRequestResponse { return this.errors; } + public boolean hasError(String topic) { + return this.errors.containsKey(topic); + } + public Cluster cluster() { return this.cluster; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index fe9a6aa..1ffff4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -22,14 +22,16 @@ import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; @@ -73,7 +75,7 @@ public class FetcherTest { private int maxWaitMs = 0; private int fetchSize = 1000; private long retryBackoffMs = 100; - private MockTime time = new MockTime(); + private MockTime time = new MockTime(1); private MockClient client = new MockClient(time); private Metadata metadata = new Metadata(0, Long.MAX_VALUE); private Cluster cluster = TestUtils.singletonCluster(topicName, 1); @@ -395,7 +397,7 @@ public class FetcherTest { } @Test - public void testGetAllTopics() throws InterruptedException { + public void testGetAllTopics() { // sending response before request, as getTopicMetadata is a blocking call client.prepareResponse( new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct()); @@ -405,6 +407,63 @@ public class FetcherTest { assertEquals(cluster.topics().size(), allTopics.size()); } + @Test + public void testGetAllTopicsDisconnect() { + // first try gets a disconnect, next succeeds + client.prepareResponse(null, true); + client.prepareResponse(new MetadataResponse(cluster, Collections.<String, Errors>emptyMap()).toStruct()); + Map<String, List<PartitionInfo>> allTopics = fetcher.getAllTopicMetadata(5000L); + assertEquals(cluster.topics().size(), allTopics.size()); + } + + @Test(expected = TimeoutException.class) + public void testGetAllTopicsTimeout() { + // since no response is prepared, the request should timeout + fetcher.getAllTopicMetadata(50L); + } + + @Test + public void testGetAllTopicsUnauthorized() { + client.prepareResponse(new MetadataResponse(cluster, + Collections.singletonMap(topicName, Errors.TOPIC_AUTHORIZATION_FAILED)).toStruct()); + try { + fetcher.getAllTopicMetadata(10L); + fail(); + } catch (TopicAuthorizationException e) { + assertEquals(Collections.singleton(topicName), e.unauthorizedTopics()); + } + } + + @Test(expected = InvalidTopicException.class) + public void testGetTopicMetadataInvalidTopic() { + client.prepareResponse(new MetadataResponse(cluster, + Collections.singletonMap(topicName, Errors.INVALID_TOPIC_EXCEPTION)).toStruct()); + fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); + } + + @Test + public void testGetTopicMetadataUnknownTopic() { + Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(), + Collections.<String>emptySet()); + client.prepareResponse(new MetadataResponse(emptyCluster, + Collections.singletonMap(topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toStruct()); + + Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); + assertNull(topicMetadata.get(topicName)); + } + + @Test + public void testGetTopicMetadataLeaderNotAvailable() { + Cluster emptyCluster = new Cluster(this.cluster.nodes(), Collections.<PartitionInfo>emptyList(), + Collections.<String>emptySet()); + client.prepareResponse(new MetadataResponse(emptyCluster, + Collections.singletonMap(topicName, Errors.LEADER_NOT_AVAILABLE)).toStruct()); + client.prepareResponse(new MetadataResponse(this.cluster, + Collections.<String, Errors>emptyMap()).toStruct()); + Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topicName), 5000L); + assertTrue(topicMetadata.containsKey(topicName)); + } + /* * Send multiple requests. Verify that the client side quota metrics have the right values */ @@ -457,7 +516,7 @@ public class FetcherTest { } private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) { - return new Fetcher<byte[], byte[]>(consumerClient, + return new Fetcher<>(consumerClient, minBytes, maxWaitMs, fetchSize, http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java index 387e48f..533f869 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/MockTime.java @@ -20,18 +20,26 @@ import java.util.concurrent.TimeUnit; public class MockTime implements Time { private long nanos = 0; + private long autoTickMs = 0; public MockTime() { this.nanos = System.nanoTime(); } + public MockTime(long autoTickMs) { + this.nanos = System.nanoTime(); + this.autoTickMs = autoTickMs; + } + @Override public long milliseconds() { + this.sleep(autoTickMs); return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS); } @Override public long nanoseconds() { + this.sleep(autoTickMs); return nanos; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 53b6fdb..3a7e6de 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -52,11 +52,7 @@ class AdminClient(val time: Time, client.poll(future) if (future.succeeded()) - return if (future.value().wasDisconnected()) { - throw new DisconnectException() - } else { - future.value().responseBody() - } + return future.value().responseBody() now = time.milliseconds() } while (now < deadline && future.exception().isInstanceOf[SendFailedException]) http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index c8ca2a3..26ab885 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -499,6 +499,20 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { this.consumers.head.position(tp) } + @Test + def testListOffsetsWithNoTopicAccess() { + val e = intercept[TopicAuthorizationException] { + this.consumers.head.partitionsFor(topic); + } + assertEquals(Set(topic), e.unauthorizedTopics().asScala) + } + + @Test + def testListOfsetsWithTopicDescribe() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + this.consumers.head.partitionsFor(topic); + } + def removeAllAcls() = { servers.head.apis.authorizer.get.getAcls().keys.foreach { resource => servers.head.apis.authorizer.get.removeAcls(resource) http://git-wip-us.apache.org/repos/asf/kafka/blob/9fb1e257/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 6fabfdc..90e9562 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -18,9 +18,9 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.errors.{InvalidTopicException, RecordTooLargeException} import org.junit.Assert._ import org.junit.Test import scala.collection.mutable.Buffer @@ -249,7 +249,17 @@ class PlaintextConsumerTest extends BaseConsumerTest { val parts = this.consumers(0).partitionsFor("part-test") assertNotNull(parts) assertEquals(2, parts.size) - assertNull(this.consumers(0).partitionsFor("non-exist-topic")) + } + + @Test + def testPartitionsForAutoCreate() { + val partitions = this.consumers(0).partitionsFor("non-exist-topic") + assertFalse(partitions.isEmpty) + } + + @Test(expected=classOf[InvalidTopicException]) + def testPartitionsForInvalidTopic() { + this.consumers(0).partitionsFor(";3# ads,{234") } @Test
