Repository: kafka
Updated Branches:
  refs/heads/0.10.2 903548f1a -> 6829d9615


KAFKA-4635; Client Compatibility follow-ups

Author: Colin P. Mccabe <[email protected]>

Reviewers: Jason Gustafson <[email protected]>, Ismael Juma <[email protected]>

Closes #2414 from cmccabe/KAFKA-4635

(cherry picked from commit 8827a5b34ed2a2a570219fb8f29d955918867796)
Signed-off-by: Ismael Juma <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6829d961
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6829d961
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6829d961

Branch: refs/heads/0.10.2
Commit: 6829d9615ab13d1314a73b77b6781e8daec18ea8
Parents: 903548f
Author: Colin P. Mccabe <[email protected]>
Authored: Sat Jan 28 00:58:35 2017 +0000
Committer: Ismael Juma <[email protected]>
Committed: Sat Jan 28 01:09:07 2017 +0000

----------------------------------------------------------------------
 .../org/apache/kafka/clients/NetworkClient.java | 23 +++---
 .../apache/kafka/clients/NodeApiVersions.java   | 72 ++++++++++-------
 .../kafka/clients/consumer/KafkaConsumer.java   |  9 ++-
 .../clients/consumer/OffsetAndTimestamp.java    |  9 +--
 .../clients/consumer/internals/Fetcher.java     | 82 +++++++++++++-------
 .../kafka/clients/producer/KafkaProducer.java   |  4 +
 .../common/errors/ObsoleteBrokerException.java  | 31 --------
 .../common/requests/CreateTopicsRequest.java    |  5 +-
 .../common/requests/ListOffsetRequest.java      |  6 +-
 .../common/requests/OffsetFetchRequest.java     |  5 +-
 .../kafka/clients/NodeApiVersionsTest.java      | 29 ++++++-
 .../common/requests/RequestResponseTest.java    |  4 +-
 .../kafka/tools/ClientCompatibilityTest.java    |  6 +-
 13 files changed, 171 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 5e97eac..0eb7670 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -14,7 +14,6 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.errors.ObsoleteBrokerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
@@ -289,20 +288,17 @@ public class NetworkClient implements KafkaClient {
             // the case when sending the initial ApiVersionRequest which 
fetches the version
             // information itself.  It is also the case when 
discoverBrokerVersions is set to false.
             if (versionInfo == null) {
-                if ((!discoverBrokerVersions) && (log.isTraceEnabled()))
-                    log.trace("No version information found when sending 
message of type {} to node {}",
-                            clientRequest.apiKey(), nodeId);
+                if (discoverBrokerVersions && log.isTraceEnabled())
+                    log.trace("No version information found when sending 
message of type {} to node {}. " +
+                            "Assuming version {}.", clientRequest.apiKey(), 
nodeId, builder.version());
             } else {
                 short version = 
versionInfo.usableVersion(clientRequest.apiKey());
-                if (log.isTraceEnabled())
-                    log.trace("When sending message of type {} to node {}, the 
best usable version is {}",
-                            clientRequest.apiKey(), nodeId, version);
                 builder.setVersion(version);
             }
             // The call to build may also throw UnsupportedVersionException, 
if there are essential
             // fields that cannot be represented in the chosen version.
             request = builder.build();
-        } catch (ObsoleteBrokerException | UnsupportedVersionException e) {
+        } catch (UnsupportedVersionException e) {
             // If the version is not supported, skip sending the request over 
the wire.
             // Instead, simply add it to the local queue of aborted requests.
             log.debug("Version mismatch when attempting to send {} to {}",
@@ -314,8 +310,15 @@ public class NetworkClient implements KafkaClient {
             return;
         }
         RequestHeader header = clientRequest.makeHeader();
-        if (log.isTraceEnabled())
-            log.trace("Sending {} to node {}", request,  nodeId);
+        if (log.isDebugEnabled()) {
+            int latestClientVersion = 
ProtoUtils.latestVersion(clientRequest.apiKey().id);
+            if (header.apiVersion() == latestClientVersion) {
+                log.trace("Sending {} to node {}.", request, nodeId);
+            } else {
+                log.debug("Using older server API v{} to send {} to node {}.",
+                    header.apiVersion(), request, nodeId);
+            }
+        }
         Send send = request.toSend(nodeId, header);
         InFlightRequest inFlightRequest = new InFlightRequest(
                 header,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java 
b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 1c5d8fb..aa9c85a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -27,10 +27,18 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.TreeMap;
 
+/**
+ * An internal class which represents the API versions supported by a 
particular node.
+ */
 public class NodeApiVersions {
-    private final Collection<ApiVersion> apiVersions;
+    private static final Short API_NOT_ON_NODE = null;
+    private static final short NODE_TOO_OLD = (short) -1;
+    private static final short NODE_TOO_NEW = (short) -2;
+    private final Collection<ApiVersion> nodeApiVersions;
 
-    // An array of the usable versions of each API, indexed by ApiKeys ID.
+    /**
+     * An array of the usable versions of each API, indexed by the ApiKeys ID.
+     */
     private final Map<ApiKeys, Short> usableVersions = new 
EnumMap<>(ApiKeys.class);
 
     /**
@@ -67,30 +75,39 @@ public class NodeApiVersions {
         return new NodeApiVersions(apiVersions);
     }
 
-    public NodeApiVersions(Collection<ApiVersion> apiVersions) {
-        this.apiVersions = apiVersions;
-        for (ApiVersion apiVersion : apiVersions) {
-            int apiKeyId = apiVersion.apiKey;
+    public NodeApiVersions(Collection<ApiVersion> nodeApiVersions) {
+        this.nodeApiVersions = nodeApiVersions;
+        for (ApiVersion nodeApiVersion : nodeApiVersions) {
+            int nodeApiKey = nodeApiVersion.apiKey;
             // Newer brokers may support ApiKeys we don't know about, ignore 
them
-            if (ApiKeys.hasId(apiKeyId)) {
-                short version = Utils.min(ProtoUtils.latestVersion(apiKeyId), 
apiVersion.maxVersion);
-                if (version >= apiVersion.minVersion && version >= 
ProtoUtils.oldestVersion(apiKeyId))
-                    usableVersions.put(ApiKeys.forId(apiKeyId), version);
+            if (ApiKeys.hasId(nodeApiKey)) {
+                short v = Utils.min(ProtoUtils.latestVersion(nodeApiKey), 
nodeApiVersion.maxVersion);
+                if (v < nodeApiVersion.minVersion) {
+                    usableVersions.put(ApiKeys.forId(nodeApiKey), 
NODE_TOO_NEW);
+                } else if (v < ProtoUtils.oldestVersion(nodeApiKey)) {
+                    usableVersions.put(ApiKeys.forId(nodeApiKey), 
NODE_TOO_OLD);
+                } else {
+                    usableVersions.put(ApiKeys.forId(nodeApiKey), v);
+                }
             }
         }
     }
 
     /**
-     * Return the most recent version supported by both the client and the 
server.
+     * Return the most recent version supported by both the node and the local 
software.
      */
     public short usableVersion(ApiKeys apiKey) {
         Short usableVersion = usableVersions.get(apiKey);
-        if (usableVersion == null) {
-            throw new UnsupportedVersionException("The client cannot send an " 
+
-                    "API request of type " + apiKey + ", because the " +
-                    "server does not understand any of the versions this 
client supports.");
-        }
-        return usableVersion;
+        if (usableVersion == API_NOT_ON_NODE)
+            throw new UnsupportedVersionException("The broker does not support 
" + apiKey);
+        else if (usableVersion == NODE_TOO_OLD)
+            throw new UnsupportedVersionException("The broker is too old to 
support " + apiKey +
+                " version " + ProtoUtils.oldestVersion(apiKey.id));
+        else if (usableVersion == NODE_TOO_NEW)
+            throw new UnsupportedVersionException("The broker is too new to 
support " + apiKey +
+                " version " + ProtoUtils.latestVersion(apiKey.id));
+        else
+            return usableVersion;
     }
 
     /**
@@ -113,11 +130,11 @@ public class NodeApiVersions {
         // a TreeMap before printing it out to ensure that we always print in
         // ascending order.
         TreeMap<Short, String> apiKeysText = new TreeMap<>();
-        for (ApiVersion apiVersion : this.apiVersions)
+        for (ApiVersion apiVersion : this.nodeApiVersions)
             apiKeysText.put(apiVersion.apiKey, apiVersionToText(apiVersion));
 
-        // Also handle the case where some apiKey types are
-        // unknown, which may happen when either the client or server is newer.
+        // Also handle the case where some apiKey types are not specified at 
all in the given ApiVersions,
+        // which may happen when the remote is too old.
         for (ApiKeys apiKey : ApiKeys.values()) {
             if (!apiKeysText.containsKey(apiKey.id)) {
                 StringBuilder bld = new StringBuilder();
@@ -156,19 +173,20 @@ public class NodeApiVersions {
         }
         if (apiKey != null) {
             Short usableVersion = usableVersions.get(apiKey);
-            if (usableVersion == null) {
-                bld.append(" [usable: NONE]");
-            } else {
+            if (usableVersion == NODE_TOO_OLD)
+                bld.append(" [unusable: node too old]");
+            else if (usableVersion == NODE_TOO_NEW)
+                bld.append(" [unusable: node too new]");
+            else
                 bld.append(" [usable: ").append(usableVersion).append("]");
-            }
         }
         return bld.toString();
     }
 
     public ApiVersion apiVersion(ApiKeys apiKey) {
-        for (ApiVersion apiVersion : apiVersions) {
-            if (apiVersion.apiKey == apiKey.id) {
-                return apiVersion;
+        for (ApiVersion nodeApiVersion : nodeApiVersions) {
+            if (nodeApiVersion.apiKey == apiKey.id) {
+                return nodeApiVersion;
             }
         }
         throw new NoSuchElementException();

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 2936f0f..6064c39 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -75,6 +75,13 @@ import java.util.regex.Pattern;
  * Failure to close the consumer after use will leak these connections.
  * The consumer is not thread-safe. See <a 
href="#multithreaded">Multi-threaded Processing</a> for more details.
  *
+ * <h3>Cross-Version Compatibility</h3>
+ * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
+ * certain features. For example, 0.10.0 brokers do not support 
offsetsForTimes, because this feature was added
+ * in version 0.10.1. You will receive an UnsupportedVersionException when 
invoking an API that is not available on the
+ * running broker version.
+ * <p>
+ *
  * <h3>Offsets and Consumer Position</h3>
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
@@ -497,7 +504,6 @@ import java.util.regex.Pattern;
  * There are many possible variations on this approach. For example each 
processor thread can have its own queue, and
  * the consumer threads can hash into these queues using the TopicPartition to 
ensure in-order consumption and simplify
  * commit.
- *
  */
 public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
@@ -1437,6 +1443,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      *         than or equal to the target timestamp. {@code null} will be 
returned for the partition if there is no
      *         such message.
      * @throws IllegalArgumentException if the target timestamp is negative.
+     * @throws UnsupportedVersionException if the broker does not support 
looking up the offsets by timestamp.
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
index f74a333..2f95291 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java
@@ -20,11 +20,6 @@ import org.apache.kafka.common.utils.Utils;
 
 /**
  * A container class for offset and timestamp.
- *
- * Offset must be non-negative.
- *
- * The timestamp should never be negative, unless it is invalid.  This could 
happen when
- * handling a response from a broker that doesn't support KIP-79.
  */
 public final class OffsetAndTimestamp {
     private final long timestamp;
@@ -32,7 +27,9 @@ public final class OffsetAndTimestamp {
 
     public OffsetAndTimestamp(long offset, long timestamp) {
         this.offset = offset;
+        assert this.offset >= 0;
         this.timestamp = timestamp;
+        assert this.timestamp >= 0;
     }
 
     public long timestamp() {
@@ -45,7 +42,7 @@ public final class OffsetAndTimestamp {
 
     @Override
     public String toString() {
-        return "{timestamp=" + timestamp + ", offset=" + offset + "}";
+        return "(timestamp=" + timestamp + ", offset=" + offset + ")";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index af82570..d4ecfc6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -131,6 +131,28 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
     }
 
     /**
+     * Represents data about an offset returned by a broker.
+     */
+    private static class OffsetData {
+        /**
+         * The offset
+         */
+        final long offset;
+
+        /**
+         * The timestamp.
+         *
+         * Will be null if the broker does not support returning timestamps.
+         */
+        final Long timestamp;
+
+        OffsetData(long offset, Long timestamp) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+        }
+    }
+
+    /**
      * Return whether we have any completed fetches pending return to the 
user. This method is thread-safe.
      * @return true if there are completed fetches, false otherwise
      */
@@ -344,12 +366,12 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
             timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
         else
             throw new NoOffsetForPartitionException(partition);
-        Map<TopicPartition, OffsetAndTimestamp> offsetsByTimes = 
retrieveOffsetsByTimes(
+        Map<TopicPartition, OffsetData> offsetsByTimes = 
retrieveOffsetsByTimes(
                 Collections.singletonMap(partition, timestamp), 
Long.MAX_VALUE, false);
-        OffsetAndTimestamp offsetAndTimestamp = offsetsByTimes.get(partition);
-        if (offsetAndTimestamp == null)
+        OffsetData offsetData = offsetsByTimes.get(partition);
+        if (offsetData == null)
             throw new NoOffsetForPartitionException(partition);
-        long offset = offsetAndTimestamp.offset();
+        long offset = offsetData.offset;
         // we might lose the assignment while fetching the offset, so check it 
is still active
         if (subscriptions.isAssigned(partition))
             this.subscriptions.seek(partition, offset);
@@ -357,10 +379,19 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
 
     public Map<TopicPartition, OffsetAndTimestamp> 
getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch,
                                                                      long 
timeout) {
-        return retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
+        Map<TopicPartition, OffsetData> offsetData = 
retrieveOffsetsByTimes(timestampsToSearch, timeout, true);
+        HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new 
HashMap<>(offsetData.size());
+        for (Map.Entry<TopicPartition, OffsetData> entry : 
offsetData.entrySet()) {
+            OffsetData data = entry.getValue();
+            if (data == null)
+                offsetsByTimes.put(entry.getKey(), null);
+            else
+                offsetsByTimes.put(entry.getKey(), new 
OffsetAndTimestamp(data.offset, data.timestamp));
+        }
+        return offsetsByTimes;
     }
 
-    private Map<TopicPartition, OffsetAndTimestamp> retrieveOffsetsByTimes(
+    private Map<TopicPartition, OffsetData> retrieveOffsetsByTimes(
             Map<TopicPartition, Long> timestampsToSearch, long timeout, 
boolean requireTimestamps) {
         if (timestampsToSearch.isEmpty())
             return Collections.emptyMap();
@@ -368,7 +399,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
         long startMs = time.milliseconds();
         long remaining = timeout;
         do {
-            RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future =
+            RequestFuture<Map<TopicPartition, OffsetData>> future =
                     sendListOffsetRequests(requireTimestamps, 
timestampsToSearch);
             client.poll(future, remaining);
 
@@ -412,9 +443,9 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
         for (TopicPartition tp : partitions)
             timestampsToSearch.put(tp, timestamp);
         Map<TopicPartition, Long> result = new HashMap<>();
-        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry :
+        for (Map.Entry<TopicPartition, OffsetData> entry :
                 retrieveOffsetsByTimes(timestampsToSearch, timeout, 
false).entrySet()) {
-            result.put(entry.getKey(), entry.getValue().offset());
+            result.put(entry.getKey(), entry.getValue().offset);
         }
         return result;
     }
@@ -503,12 +534,12 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
     /**
      * Search the offsets by target times for the specified partitions.
      *
-     * @param requireTimestamps true if we should fail with an 
ObsoleteBrokerException if the broker does not support
-     *                         fetching precise timestamps for offsets
+     * @param requireTimestamps true if we should fail with an 
UnsupportedVersionException if the broker does
+     *                         not support fetching precise timestamps for 
offsets
      * @param timestampsToSearch the mapping between partitions and target time
      * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
      */
-    private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> 
sendListOffsetRequests(
+    private RequestFuture<Map<TopicPartition, OffsetData>> 
sendListOffsetRequests(
             final boolean requireTimestamps,
             final Map<TopicPartition, Long> timestampsToSearch) {
         // Group the partitions by node.
@@ -534,14 +565,14 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
             }
         }
 
-        final RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> 
listOffsetRequestsFuture = new RequestFuture<>();
-        final Map<TopicPartition, OffsetAndTimestamp> fetchedTimestampOffsets 
= new HashMap<>();
+        final RequestFuture<Map<TopicPartition, OffsetData>> 
listOffsetRequestsFuture = new RequestFuture<>();
+        final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new 
HashMap<>();
         final AtomicInteger remainingResponses = new 
AtomicInteger(timestampsToSearchByNode.size());
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : 
timestampsToSearchByNode.entrySet()) {
             sendListOffsetRequest(entry.getKey(), entry.getValue(), 
requireTimestamps)
-                    .addListener(new RequestFutureListener<Map<TopicPartition, 
OffsetAndTimestamp>>() {
+                    .addListener(new RequestFutureListener<Map<TopicPartition, 
OffsetData>>() {
                         @Override
-                        public void onSuccess(Map<TopicPartition, 
OffsetAndTimestamp> value) {
+                        public void onSuccess(Map<TopicPartition, OffsetData> 
value) {
                             synchronized (listOffsetRequestsFuture) {
                                 fetchedTimestampOffsets.putAll(value);
                                 if (remainingResponses.decrementAndGet() == 0 
&& !listOffsetRequestsFuture.isDone())
@@ -570,7 +601,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
      * @param requireTimestamp  True if we require a timestamp in the response.
      * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
      */
-    private RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> 
sendListOffsetRequest(final Node node,
+    private RequestFuture<Map<TopicPartition, OffsetData>> 
sendListOffsetRequest(final Node node,
                                                                                
          final Map<TopicPartition, Long> timestampsToSearch,
                                                                                
          boolean requireTimestamp) {
         ListOffsetRequest.Builder builder = new 
ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch);
@@ -581,9 +612,9 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
 
         log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
         return client.send(node, builder)
-                .compose(new RequestFutureAdapter<ClientResponse, 
Map<TopicPartition, OffsetAndTimestamp>>() {
+                .compose(new RequestFutureAdapter<ClientResponse, 
Map<TopicPartition, OffsetData>>() {
                     @Override
-                    public void onSuccess(ClientResponse response, 
RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) {
+                    public void onSuccess(ClientResponse response, 
RequestFuture<Map<TopicPartition, OffsetData>> future) {
                         ListOffsetResponse lor = (ListOffsetResponse) 
response.responseBody();
                         log.trace("Received ListOffsetResponse {} from broker 
{}", lor, node);
                         handleListOffsetResponse(timestampsToSearch, lor, 
future);
@@ -600,8 +631,8 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
     @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, Long> 
timestampsToSearch,
                                           ListOffsetResponse 
listOffsetResponse,
-                                          RequestFuture<Map<TopicPartition, 
OffsetAndTimestamp>> future) {
-        Map<TopicPartition, OffsetAndTimestamp> timestampOffsetMap = new 
HashMap<>();
+                                          RequestFuture<Map<TopicPartition, 
OffsetData>> future) {
+        Map<TopicPartition, OffsetData> timestampOffsetMap = new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry : 
timestampsToSearch.entrySet()) {
             TopicPartition topicPartition = entry.getKey();
             ListOffsetResponse.PartitionData partitionData = 
listOffsetResponse.responseData().get(topicPartition);
@@ -622,17 +653,16 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
                     log.debug("Handling v0 ListOffsetResponse response for {}. 
Fetched offset {}",
                             topicPartition, offset);
                     if (offset != ListOffsetResponse.UNKNOWN_OFFSET) {
-                        OffsetAndTimestamp offsetAndTimestamp = new 
OffsetAndTimestamp(offset, -1);
-                        timestampOffsetMap.put(topicPartition, 
offsetAndTimestamp);
+                        OffsetData offsetData = new OffsetData(offset, null);
+                        timestampOffsetMap.put(topicPartition, offsetData);
                     }
                 } else {
                     // Handle v1 and later response
                     log.debug("Handling ListOffsetResponse response for {}. 
Fetched offset {}, timestamp {}",
                             topicPartition, partitionData.offset, 
partitionData.timestamp);
                     if (partitionData.offset != 
ListOffsetResponse.UNKNOWN_OFFSET) {
-                        OffsetAndTimestamp offsetAndTimestamp =
-                                new OffsetAndTimestamp(partitionData.offset, 
partitionData.timestamp);
-                        timestampOffsetMap.put(topicPartition, 
offsetAndTimestamp);
+                        OffsetData offsetData = new 
OffsetData(partitionData.offset, partitionData.timestamp);
+                        timestampOffsetMap.put(topicPartition, offsetData);
                     }
                 }
             } else if (error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c604daa..29defbb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -124,6 +124,10 @@ import java.util.concurrent.atomic.AtomicReference;
  * The <code>key.serializer</code> and <code>value.serializer</code> instruct 
how to turn the key and value objects the user provides with
  * their <code>ProducerRecord</code> into bytes. You can use the included 
{@link org.apache.kafka.common.serialization.ByteArraySerializer} or
  * {@link org.apache.kafka.common.serialization.StringSerializer} for simple 
string or byte types.
+ * <p>
+ * This client can communicate with brokers that are version 0.10.0 or newer. 
Older or newer brokers may not support
+ * certain client features.  You will receive an UnsupportedVersionException 
when invoking an API that is not available
+ * with the running broker verion.
  */
 public class KafkaProducer<K, V> implements Producer<K, V> {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
deleted file mode 100644
index 18f822d..0000000
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/ObsoleteBrokerException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.errors;
-
-/**
- * Indicates that a request cannot be completed because an obsolete broker
- * does not support the required functionality.
- */
-public class ObsoleteBrokerException extends ApiException {
-
-    private static final long serialVersionUID = 1L;
-
-    public ObsoleteBrokerException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public ObsoleteBrokerException(String message) {
-        super(message);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 542c6af..0a4bce0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.errors.ObsoleteBrokerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -115,7 +115,8 @@ public class CreateTopicsRequest extends AbstractRequest {
         @Override
         public CreateTopicsRequest build() {
             if (validateOnly && version() == 0)
-                throw new ObsoleteBrokerException("validateOnly is not 
supported in version 0 of CreateTopicsRequest");
+                throw new UnsupportedVersionException("validateOnly is not 
supported in version 0 of " +
+                        "CreateTopicsRequest");
             return new CreateTopicsRequest(topics, timeout, validateOnly, 
version());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
index 3361383..79251ed 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ObsoleteBrokerException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -88,7 +87,8 @@ public class ListOffsetRequest extends AbstractRequest {
         public ListOffsetRequest build() {
             short version = version();
             if (version < minVersion) {
-                throw new ObsoleteBrokerException("The broker is too old to 
send this request.");
+                throw new UnsupportedVersionException("Cannot create a v" + 
version + " ListOffsetRequest because " +
+                    "we require features supported only in " + minVersion + " 
or later.");
             }
             if (version == 0) {
                 if (offsetData == null) {
@@ -106,7 +106,7 @@ public class ListOffsetRequest extends AbstractRequest {
                 }
             } else {
                 if (offsetData != null) {
-                    throw new UnsupportedVersionException("Cannot create a v" 
+ version + " ListOffsetRequest with v0 " +
+                    throw new RuntimeException("Cannot create a v" + version + 
" ListOffsetRequest with v0 " +
                         "PartitionData.");
                 } else if (partitionTimestamps == null) {
                     throw new RuntimeException("Must set partitionTimestamps 
when creating a v" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index 553fd96..43ddf88 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -13,7 +13,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ObsoleteBrokerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
@@ -60,7 +60,8 @@ public class OffsetFetchRequest extends AbstractRequest {
         @Override
         public OffsetFetchRequest build() {
             if (isAllTopicPartitions() && version() < 2)
-                throw new ObsoleteBrokerException("The broker is too old to 
send this request.");
+                throw new UnsupportedVersionException("The broker only 
supports OffsetFetchRequest " +
+                        "v" + version() + ", but we need v2 or newer to 
request all topic partitions.");
             return new OffsetFetchRequest(groupId, partitions, version());
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index f72137b..8773d9c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -51,6 +53,8 @@ public class NodeApiVersionsTest {
         for (ApiKeys apiKey : ApiKeys.values()) {
             if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
                 versionList.add(new ApiVersion(apiKey.id, (short) 0, (short) 
0));
+            } else if (apiKey == ApiKeys.DELETE_TOPICS) {
+                versionList.add(new ApiVersion(apiKey.id, (short) 10000, 
(short) 10001));
             } else {
                 versionList.add(new ApiVersion(apiKey.id,
                         ProtoUtils.oldestVersion(apiKey.id), 
ProtoUtils.latestVersion(apiKey.id)));
@@ -62,7 +66,9 @@ public class NodeApiVersionsTest {
         for (ApiKeys apiKey : ApiKeys.values()) {
             bld.append(prefix);
             if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
-                bld.append("ControlledShutdown(7): 0 [usable: NONE]");
+                bld.append("ControlledShutdown(7): 0 [unusable: node too 
old]");
+            } else if (apiKey == ApiKeys.DELETE_TOPICS) {
+                bld.append("DeleteTopics(20): 10000 to 10001 [unusable: node 
too new]");
             } else {
                 bld.append(apiKey.name).append("(").
                         append(apiKey.id).append("): ");
@@ -97,4 +103,25 @@ public class NodeApiVersionsTest {
         }
         assertEquals(2, versions.usableVersion(ApiKeys.FETCH));
     }
+
+    @Test(expected = UnsupportedVersionException.class)
+    public void testUsableVersionCalculationNoKnownVersions() {
+        List<ApiVersion> versionList = new ArrayList<>();
+        NodeApiVersions versions =  new NodeApiVersions(versionList);
+        versions.usableVersion(ApiKeys.FETCH);
+    }
+
+    @Test
+    public void testUsableVersionLatestVersions() {
+        List<ApiVersion> versionList = new LinkedList<>();
+        for (ApiVersion apiVersion: 
ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions()) {
+            versionList.add(apiVersion);
+        }
+        // Add an API key that we don't know about.
+        versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1));
+        NodeApiVersions versions =  new NodeApiVersions(versionList);
+        for (ApiKeys apiKey: ApiKeys.values()) {
+            assertEquals(ProtoUtils.latestVersion(apiKey.id), 
versions.usableVersion(apiKey));
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7b71f8f..69f6276 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -15,9 +15,9 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ObsoleteBrokerException;
 import org.apache.kafka.common.errors.NotCoordinatorForGroupException;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -269,7 +269,7 @@ public class RequestResponseTest {
         assertEquals("", deserialized.clientId()); // null is defaulted to ""
     }
 
-    @Test(expected = ObsoleteBrokerException.class)
+    @Test(expected = UnsupportedVersionException.class)
     public void testCreateTopicRequestV0FailsIfValidateOnly() {
         createCreateTopicRequest(0, true);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6829d961/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java 
b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
index d7eb717..6ed97bb 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java
@@ -35,8 +35,8 @@ import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.ClusterResourceListener;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.ObsoleteBrokerException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Time;
@@ -362,8 +362,8 @@ public class ClientCompatibilityTest {
         try {
             invoker.run();
             log.info("Successfully used feature {}", featureName);
-        } catch (ObsoleteBrokerException e) {
-            log.info("Got ObsoleteBrokerException when attempting to use 
feature {}", featureName);
+        } catch (UnsupportedVersionException e) {
+            log.info("Got UnsupportedVersionException when attempting to use 
feature {}", featureName);
             if (supported) {
                 throw new RuntimeException("Expected " + featureName + " to be 
supported, but it wasn't.", e);
             }

Reply via email to