Repository: kafka Updated Branches: refs/heads/trunk 4c42654b1 -> 8827a5b34
KAFKA-4635; Client Compatibility follow-ups Author: Colin P. Mccabe <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #2414 from cmccabe/KAFKA-4635 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8827a5b3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8827a5b3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8827a5b3 Branch: refs/heads/trunk Commit: 8827a5b34ed2a2a570219fb8f29d955918867796 Parents: 4c42654 Author: Colin P. Mccabe <[email protected]> Authored: Sat Jan 28 00:58:35 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Sat Jan 28 01:08:46 2017 +0000 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 23 +++--- .../apache/kafka/clients/NodeApiVersions.java | 72 ++++++++++------- .../kafka/clients/consumer/KafkaConsumer.java | 9 ++- .../clients/consumer/OffsetAndTimestamp.java | 9 +-- .../clients/consumer/internals/Fetcher.java | 82 +++++++++++++------- .../kafka/clients/producer/KafkaProducer.java | 4 + .../common/errors/ObsoleteBrokerException.java | 31 -------- .../common/requests/CreateTopicsRequest.java | 5 +- .../common/requests/ListOffsetRequest.java | 6 +- .../common/requests/OffsetFetchRequest.java | 5 +- .../kafka/clients/NodeApiVersionsTest.java | 29 ++++++- .../common/requests/RequestResponseTest.java | 4 +- .../kafka/tools/ClientCompatibilityTest.java | 6 +- 13 files changed, 171 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 5e97eac..0eb7670 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -14,7 +14,6 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; -import org.apache.kafka.common.errors.ObsoleteBrokerException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.Selectable; @@ -289,20 +288,17 @@ public class NetworkClient implements KafkaClient { // the case when sending the initial ApiVersionRequest which fetches the version // information itself. It is also the case when discoverBrokerVersions is set to false. if (versionInfo == null) { - if ((!discoverBrokerVersions) && (log.isTraceEnabled())) - log.trace("No version information found when sending message of type {} to node {}", - clientRequest.apiKey(), nodeId); + if (discoverBrokerVersions && log.isTraceEnabled()) + log.trace("No version information found when sending message of type {} to node {}. " + + "Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version()); } else { short version = versionInfo.usableVersion(clientRequest.apiKey()); - if (log.isTraceEnabled()) - log.trace("When sending message of type {} to node {}, the best usable version is {}", - clientRequest.apiKey(), nodeId, version); builder.setVersion(version); } // The call to build may also throw UnsupportedVersionException, if there are essential // fields that cannot be represented in the chosen version. request = builder.build(); - } catch (ObsoleteBrokerException | UnsupportedVersionException e) { + } catch (UnsupportedVersionException e) { // If the version is not supported, skip sending the request over the wire. // Instead, simply add it to the local queue of aborted requests. log.debug("Version mismatch when attempting to send {} to {}", @@ -314,8 +310,15 @@ public class NetworkClient implements KafkaClient { return; } RequestHeader header = clientRequest.makeHeader(); - if (log.isTraceEnabled()) - log.trace("Sending {} to node {}", request, nodeId); + if (log.isDebugEnabled()) { + int latestClientVersion = ProtoUtils.latestVersion(clientRequest.apiKey().id); + if (header.apiVersion() == latestClientVersion) { + log.trace("Sending {} to node {}.", request, nodeId); + } else { + log.debug("Using older server API v{} to send {} to node {}.", + header.apiVersion(), request, nodeId); + } + } Send send = request.toSend(nodeId, header); InFlightRequest inFlightRequest = new InFlightRequest( header, http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index 1c5d8fb..aa9c85a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -27,10 +27,18 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.TreeMap; +/** + * An internal class which represents the API versions supported by a particular node. + */ public class NodeApiVersions { - private final Collection<ApiVersion> apiVersions; + private static final Short API_NOT_ON_NODE = null; + private static final short NODE_TOO_OLD = (short) -1; + private static final short NODE_TOO_NEW = (short) -2; + private final Collection<ApiVersion> nodeApiVersions; - // An array of the usable versions of each API, indexed by ApiKeys ID. + /** + * An array of the usable versions of each API, indexed by the ApiKeys ID. + */ private final Map<ApiKeys, Short> usableVersions = new EnumMap<>(ApiKeys.class); /** @@ -67,30 +75,39 @@ public class NodeApiVersions { return new NodeApiVersions(apiVersions); } - public NodeApiVersions(Collection<ApiVersion> apiVersions) { - this.apiVersions = apiVersions; - for (ApiVersion apiVersion : apiVersions) { - int apiKeyId = apiVersion.apiKey; + public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) { + this.nodeApiVersions = nodeApiVersions; + for (ApiVersion nodeApiVersion : nodeApiVersions) { + int nodeApiKey = nodeApiVersion.apiKey; // Newer brokers may support ApiKeys we don't know about, ignore them - if (ApiKeys.hasId(apiKeyId)) { - short version = Utils.min(ProtoUtils.latestVersion(apiKeyId), apiVersion.maxVersion); - if (version >= apiVersion.minVersion && version >= ProtoUtils.oldestVersion(apiKeyId)) - usableVersions.put(ApiKeys.forId(apiKeyId), version); + if (ApiKeys.hasId(nodeApiKey)) { + short v = Utils.min(ProtoUtils.latestVersion(nodeApiKey), nodeApiVersion.maxVersion); + if (v < nodeApiVersion.minVersion) { + usableVersions.put(ApiKeys.forId(nodeApiKey), NODE_TOO_NEW); + } else if (v < ProtoUtils.oldestVersion(nodeApiKey)) { + usableVersions.put(ApiKeys.forId(nodeApiKey), NODE_TOO_OLD); + } else { + usableVersions.put(ApiKeys.forId(nodeApiKey), v); + } } } } /** - * Return the most recent version supported by both the client and the server. + * Return the most recent version supported by both the node and the local software. */ public short usableVersion(ApiKeys apiKey) { Short usableVersion = usableVersions.get(apiKey); - if (usableVersion == null) { - throw new UnsupportedVersionException("The client cannot send an " + - "API request of type " + apiKey + ", because the " + - "server does not understand any of the versions this client supports."); - } - return usableVersion; + if (usableVersion == API_NOT_ON_NODE) + throw new UnsupportedVersionException("The broker does not support " + apiKey); + else if (usableVersion == NODE_TOO_OLD) + throw new UnsupportedVersionException("The broker is too old to support " + apiKey + + " version " + ProtoUtils.oldestVersion(apiKey.id)); + else if (usableVersion == NODE_TOO_NEW) + throw new UnsupportedVersionException("The broker is too new to support " + apiKey + + " version " + ProtoUtils.latestVersion(apiKey.id)); + else + return usableVersion; } /** @@ -113,11 +130,11 @@ public class NodeApiVersions { // a TreeMap before printing it out to ensure that we always print in // ascending order. TreeMap<Short, String> apiKeysText = new TreeMap<>(); - for (ApiVersion apiVersion : this.apiVersions) + for (ApiVersion apiVersion : this.nodeApiVersions) apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion)); - // Also handle the case where some apiKey types are - // unknown, which may happen when either the client or server is newer. + // Also handle the case where some apiKey types are not specified at all in the given ApiVersions, + // which may happen when the remote is too old. for (ApiKeys apiKey : ApiKeys.values()) { if (!apiKeysText.containsKey(apiKey.id)) { StringBuilder bld = new StringBuilder(); @@ -156,19 +173,20 @@ public class NodeApiVersions { } if (apiKey != null) { Short usableVersion = usableVersions.get(apiKey); - if (usableVersion == null) { - bld.append(" [usable: NONE]"); - } else { + if (usableVersion == NODE_TOO_OLD) + bld.append(" [unusable: node too old]"); + else if (usableVersion == NODE_TOO_NEW) + bld.append(" [unusable: node too new]"); + else bld.append(" [usable: ").append(usableVersion).append("]"); - } } return bld.toString(); } public ApiVersion apiVersion(ApiKeys apiKey) { - for (ApiVersion apiVersion : apiVersions) { - if (apiVersion.apiKey == apiKey.id) { - return apiVersion; + for (ApiVersion nodeApiVersion : nodeApiVersions) { + if (nodeApiVersion.apiKey == apiKey.id) { + return nodeApiVersion; } } throw new NoSuchElementException(); http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index e21c196..23e7ed6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -75,6 +75,13 @@ import java.util.regex.Pattern; * Failure to close the consumer after use will leak these connections. * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details. * + * <h3>Cross-Version Compatibility</h3> + * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support + * certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added + * in version 0.10.1. You will receive an UnsupportedVersionException when invoking an API that is not available on the + * running broker version. + * <p> + * * <h3>Offsets and Consumer Position</h3> * Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of * a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer @@ -497,7 +504,6 @@ import java.util.regex.Pattern; * There are many possible variations on this approach. For example each processor thread can have its own queue, and * the consumer threads can hash into these queues using the TopicPartition to ensure in-order consumption and simplify * commit. - * */ public class KafkaConsumer<K, V> implements Consumer<K, V> { @@ -1437,6 +1443,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no * such message. * @throws IllegalArgumentException if the target timestamp is negative. + * @throws UnsupportedVersionException if the broker does not support looking up the offsets by timestamp. */ @Override public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java index f74a333..2f95291 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java @@ -20,11 +20,6 @@ import org.apache.kafka.common.utils.Utils; /** * A container class for offset and timestamp. - * - * Offset must be non-negative. - * - * The timestamp should never be negative, unless it is invalid. This could happen when - * handling a response from a broker that doesn't support KIP-79. */ public final class OffsetAndTimestamp { private final long timestamp; @@ -32,7 +27,9 @@ public final class OffsetAndTimestamp { public OffsetAndTimestamp(long offset, long timestamp) { this.offset = offset; + assert this.offset >= 0; this.timestamp = timestamp; + assert this.timestamp >= 0; } public long timestamp() { @@ -45,7 +42,7 @@ public final class OffsetAndTimestamp { @Override public String toString() { - return "{timestamp=" + timestamp + ", offset=" + offset + "}"; + return "(timestamp=" + timestamp + ", offset=" + offset + ")"; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index af82570..d4ecfc6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -131,6 +131,28 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { } /** + * Represents data about an offset returned by a broker. + */ + private static class OffsetData { + /** + * The offset + */ + final long offset; + + /** + * The timestamp. + * + * Will be null if the broker does not support returning timestamps. + */ + final Long timestamp; + + OffsetData(long offset, Long timestamp) { + this.offset = offset; + this.timestamp = timestamp; + } + } + + /** * Return whether we have any completed fetches pending return to the user. This method is thread-safe. * @return true if there are completed fetches, false otherwise */ @@ -344,12 +366,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { timestamp = ListOffsetRequest.LATEST_TIMESTAMP; else throw new NoOffsetForPartitionException(partition); - Map<TopicPartition, OffsetAndTimestamp> offsetsByTimes = retrieveOffsetsByTimes( + Map<TopicPartition, OffsetData> offsetsByTimes = retrieveOffsetsByTimes( Collections.singletonMap(partition, timestamp), Long.MAX_VALUE, false); - OffsetAndTimestamp offsetAndTimestamp = offsetsByTimes.get(partition); - if (offsetAndTimestamp == null) + OffsetData offsetData = offsetsByTimes.get(partition); + if (offsetData == null) throw new NoOffsetForPartitionException(partition); - long offset = offsetAndTimestamp.offset(); + long offset = offsetData.offset; // we might lose the assignment while fetching the offset, so check it is still active if (subscriptions.isAssigned(partition)) this.subscriptions.seek(partition, offset); @@ -357,10 +379,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, long timeout) { - return retrieveOffsetsByTimes(timestampsToSearch, timeout, true); + Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true); + HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(offsetData.size()); + for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) { + OffsetData data = entry.getValue(); + if (data == null) + offsetsByTimes.put(entry.getKey(), null); + else + offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp)); + } + return offsetsByTimes; } - private Map<TopicPartition, OffsetAndTimestamp> retrieveOffsetsByTimes( + private Map<TopicPartition, OffsetData> retrieveOffsetsByTimes( Map<TopicPartition, Long> timestampsToSearch, long timeout, boolean requireTimestamps) { if (timestampsToSearch.isEmpty()) return Collections.emptyMap(); @@ -368,7 +399,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { long startMs = time.milliseconds(); long remaining = timeout; do { - RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future = + RequestFuture<Map<TopicPartition, OffsetData>> future = sendListOffsetRequests(requireTimestamps, timestampsToSearch); client.poll(future, remaining); @@ -412,9 +443,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { for (TopicPartition tp : partitions) timestampsToSearch.put(tp, timestamp); Map<TopicPartition, Long> result = new HashMap<>(); - for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : + for (Map.Entry<TopicPartition, OffsetData> entry : retrieveOffsetsByTimes(timestampsToSearch, timeout, false).entrySet()) { - result.put(entry.getKey(), entry.getValue().offset()); + result.put(entry.getKey(), entry.getValue().offset); } return result; } @@ -503,12 +534,12 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { /** * Search the offsets by target times for the specified partitions. * - * @param requireTimestamps true if we should fail with an ObsoleteBrokerException if the broker does not support - * fetching precise timestamps for offsets + * @param requireTimestamps true if we should fail with an UnsupportedVersionException if the broker does + * not support fetching precise timestamps for offsets * @param timestampsToSearch the mapping between partitions and target time * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ - private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequests( + private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequests( final boolean requireTimestamps, final Map<TopicPartition, Long> timestampsToSearch) { // Group the partitions by node. @@ -534,14 +565,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { } } - final RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> listOffsetRequestsFuture = new RequestFuture<>(); - final Map<TopicPartition, OffsetAndTimestamp> fetchedTimestampOffsets = new HashMap<>(); + final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<>(); + final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>(); final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) { sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps) - .addListener(new RequestFutureListener<Map<TopicPartition, OffsetAndTimestamp>>() { + .addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() { @Override - public void onSuccess(Map<TopicPartition, OffsetAndTimestamp> value) { + public void onSuccess(Map<TopicPartition, OffsetData> value) { synchronized (listOffsetRequestsFuture) { fetchedTimestampOffsets.putAll(value); if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone()) @@ -570,7 +601,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { * @param requireTimestamp True if we require a timestamp in the response. * @return A response which can be polled to obtain the corresponding timestamps and offsets. */ - private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> sendListOffsetRequest(final Node node, + private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node, final Map<TopicPartition, Long> timestampsToSearch, boolean requireTimestamp) { ListOffsetRequest.Builder builder = new ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch); @@ -581,9 +612,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { log.trace("Sending ListOffsetRequest {} to broker {}", builder, node); return client.send(node, builder) - .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() { + .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetData>>() { @Override - public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) { + public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetData>> future) { ListOffsetResponse lor = (ListOffsetResponse) response.responseBody(); log.trace("Received ListOffsetResponse {} from broker {}", lor, node); handleListOffsetResponse(timestampsToSearch, lor, future); @@ -600,8 +631,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { @SuppressWarnings("deprecation") private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch, ListOffsetResponse listOffsetResponse, - RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) { - Map<TopicPartition, OffsetAndTimestamp> timestampOffsetMap = new HashMap<>(); + RequestFuture<Map<TopicPartition, OffsetData>> future) { + Map<TopicPartition, OffsetData> timestampOffsetMap = new HashMap<>(); for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) { TopicPartition topicPartition = entry.getKey(); ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(topicPartition); @@ -622,17 +653,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", topicPartition, offset); if (offset != ListOffsetResponse.UNKNOWN_OFFSET) { - OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(offset, -1); - timestampOffsetMap.put(topicPartition, offsetAndTimestamp); + OffsetData offsetData = new OffsetData(offset, null); + timestampOffsetMap.put(topicPartition, offsetData); } } else { // Handle v1 and later response log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", topicPartition, partitionData.offset, partitionData.timestamp); if (partitionData.offset != ListOffsetResponse.UNKNOWN_OFFSET) { - OffsetAndTimestamp offsetAndTimestamp = - new OffsetAndTimestamp(partitionData.offset, partitionData.timestamp); - timestampOffsetMap.put(topicPartition, offsetAndTimestamp); + OffsetData offsetData = new OffsetData(partitionData.offset, partitionData.timestamp); + timestampOffsetMap.put(topicPartition, offsetData); } } } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 4415c64..59916a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -124,6 +124,10 @@ import java.util.concurrent.atomic.AtomicReference; * The <code>key.serializer</code> and <code>value.serializer</code> instruct how to turn the key and value objects the user provides with * their <code>ProducerRecord</code> into bytes. You can use the included {@link org.apache.kafka.common.serialization.ByteArraySerializer} or * {@link org.apache.kafka.common.serialization.StringSerializer} for simple string or byte types. + * <p> + * This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support + * certain client features. You will receive an UnsupportedVersionException when invoking an API that is not available + * with the running broker verion. */ public class KafkaProducer<K, V> implements Producer<K, V> { http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java b/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java deleted file mode 100644 index 18f822d..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package org.apache.kafka.common.errors; - -/** - * Indicates that a request cannot be completed because an obsolete broker - * does not support the required functionality. - */ -public class ObsoleteBrokerException extends ApiException { - - private static final long serialVersionUID = 1L; - - public ObsoleteBrokerException(String message, Throwable cause) { - super(message, cause); - } - - public ObsoleteBrokerException(String message) { - super(message); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index 542c6af..0a4bce0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.ObsoleteBrokerException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; @@ -115,7 +115,8 @@ public class CreateTopicsRequest extends AbstractRequest { @Override public CreateTopicsRequest build() { if (validateOnly && version() == 0) - throw new ObsoleteBrokerException("validateOnly is not supported in version 0 of CreateTopicsRequest"); + throw new UnsupportedVersionException("validateOnly is not supported in version 0 of " + + "CreateTopicsRequest"); return new CreateTopicsRequest(topics, timeout, validateOnly, version()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 3361383..79251ed 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ObsoleteBrokerException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -88,7 +87,8 @@ public class ListOffsetRequest extends AbstractRequest { public ListOffsetRequest build() { short version = version(); if (version < minVersion) { - throw new ObsoleteBrokerException("The broker is too old to send this request."); + throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest because " + + "we require features supported only in " + minVersion + " or later."); } if (version == 0) { if (offsetData == null) { @@ -106,7 +106,7 @@ public class ListOffsetRequest extends AbstractRequest { } } else { if (offsetData != null) { - throw new UnsupportedVersionException("Cannot create a v" + version + " ListOffsetRequest with v0 " + + throw new RuntimeException("Cannot create a v" + version + " ListOffsetRequest with v0 " + "PartitionData."); } else if (partitionTimestamps == null) { throw new RuntimeException("Must set partitionTimestamps when creating a v" + http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java index 553fd96..43ddf88 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java @@ -13,7 +13,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ObsoleteBrokerException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; @@ -60,7 +60,8 @@ public class OffsetFetchRequest extends AbstractRequest { @Override public OffsetFetchRequest build() { if (isAllTopicPartitions() && version() < 2) - throw new ObsoleteBrokerException("The broker is too old to send this request."); + throw new UnsupportedVersionException("The broker only supports OffsetFetchRequest " + + "v" + version() + ", but we need v2 or newer to request all topic partitions."); return new OffsetFetchRequest(groupId, partitions, version()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index f72137b..8773d9c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -19,12 +19,14 @@ package org.apache.kafka.clients; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion; import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import static org.junit.Assert.assertEquals; @@ -51,6 +53,8 @@ public class NodeApiVersionsTest { for (ApiKeys apiKey : ApiKeys.values()) { if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) { versionList.add(new ApiVersion(apiKey.id, (short) 0, (short) 0)); + } else if (apiKey == ApiKeys.DELETE_TOPICS) { + versionList.add(new ApiVersion(apiKey.id, (short) 10000, (short) 10001)); } else { versionList.add(new ApiVersion(apiKey.id, ProtoUtils.oldestVersion(apiKey.id), ProtoUtils.latestVersion(apiKey.id))); @@ -62,7 +66,9 @@ public class NodeApiVersionsTest { for (ApiKeys apiKey : ApiKeys.values()) { bld.append(prefix); if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) { - bld.append("ControlledShutdown(7): 0 [usable: NONE]"); + bld.append("ControlledShutdown(7): 0 [unusable: node too old]"); + } else if (apiKey == ApiKeys.DELETE_TOPICS) { + bld.append("DeleteTopics(20): 10000 to 10001 [unusable: node too new]"); } else { bld.append(apiKey.name).append("("). append(apiKey.id).append("): "); @@ -97,4 +103,25 @@ public class NodeApiVersionsTest { } assertEquals(2, versions.usableVersion(ApiKeys.FETCH)); } + + @Test(expected = UnsupportedVersionException.class) + public void testUsableVersionCalculationNoKnownVersions() { + List<ApiVersion> versionList = new ArrayList<>(); + NodeApiVersions versions = new NodeApiVersions(versionList); + versions.usableVersion(ApiKeys.FETCH); + } + + @Test + public void testUsableVersionLatestVersions() { + List<ApiVersion> versionList = new LinkedList<>(); + for (ApiVersion apiVersion: ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions()) { + versionList.add(apiVersion); + } + // Add an API key that we don't know about. + versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1)); + NodeApiVersions versions = new NodeApiVersions(versionList); + for (ApiKeys apiKey: ApiKeys.values()) { + assertEquals(ProtoUtils.latestVersion(apiKey.id), versions.usableVersion(apiKey)); + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 7b71f8f..69f6276 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -15,9 +15,9 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ObsoleteBrokerException; import org.apache.kafka.common.errors.NotCoordinatorForGroupException; import org.apache.kafka.common.errors.UnknownServerException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -269,7 +269,7 @@ public class RequestResponseTest { assertEquals("", deserialized.clientId()); // null is defaulted to "" } - @Test(expected = ObsoleteBrokerException.class) + @Test(expected = UnsupportedVersionException.class) public void testCreateTopicRequestV0FailsIfValidateOnly() { createCreateTopicRequest(0, true); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8827a5b3/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index d7eb717..6ed97bb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -35,8 +35,8 @@ import org.apache.kafka.common.ClusterResource; import org.apache.kafka.common.ClusterResourceListener; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.ObsoleteBrokerException; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.Time; @@ -362,8 +362,8 @@ public class ClientCompatibilityTest { try { invoker.run(); log.info("Successfully used feature {}", featureName); - } catch (ObsoleteBrokerException e) { - log.info("Got ObsoleteBrokerException when attempting to use feature {}", featureName); + } catch (UnsupportedVersionException e) { + log.info("Got UnsupportedVersionException when attempting to use feature {}", featureName); if (supported) { throw new RuntimeException("Expected " + featureName + " to be supported, but it wasn't.", e); }
