KAFKA-2411; remove usage of blocking channel Author: Ismael Juma <[email protected]>
Reviewers: Jun Rao <[email protected]>, Gwen Shapira <[email protected]> Closes #151 from ijuma/kafka-2411-remove-usage-of-blocking-channel Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d02ca36c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d02ca36c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d02ca36c Branch: refs/heads/trunk Commit: d02ca36ca1cccdb6962191b97f54ce96b9d75abc Parents: d0adf6a Author: Ismael Juma <[email protected]> Authored: Wed Sep 2 11:55:08 2015 -0700 Committer: Jun Rao <[email protected]> Committed: Wed Sep 2 11:55:08 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientUtils.java | 23 +- .../kafka/clients/ClusterConnectionStates.java | 13 +- .../org/apache/kafka/clients/KafkaClient.java | 18 +- .../kafka/clients/ManualMetadataUpdater.java | 76 +++++ .../apache/kafka/clients/MetadataUpdater.java | 72 ++++ .../org/apache/kafka/clients/NetworkClient.java | 318 ++++++++++++------ .../errors/BrokerNotAvailableException.java | 32 ++ .../common/errors/ControllerMovedException.java | 32 ++ .../kafka/common/network/ChannelBuilders.java | 52 +++ .../kafka/common/network/NetworkReceive.java | 10 - .../apache/kafka/common/network/Selectable.java | 20 +- .../apache/kafka/common/network/Selector.java | 8 +- .../apache/kafka/common/protocol/ApiKeys.java | 4 +- .../apache/kafka/common/protocol/Errors.java | 6 +- .../kafka/common/protocol/ProtoUtils.java | 6 + .../apache/kafka/common/protocol/Protocol.java | 111 ++++++- .../kafka/common/requests/AbstractRequest.java | 6 + .../requests/ControlledShutdownRequest.java | 69 ++++ .../requests/ControlledShutdownResponse.java | 91 ++++++ .../common/requests/LeaderAndIsrRequest.java | 212 ++++++++++++ .../common/requests/LeaderAndIsrResponse.java | 105 ++++++ .../common/requests/UpdateMetadataRequest.java | 291 +++++++++++++++++ .../common/requests/UpdateMetadataResponse.java | 59 ++++ .../org/apache/kafka/clients/MockClient.java | 10 + .../apache/kafka/clients/NetworkClientTest.java | 23 +- .../common/requests/RequestResponseTest.java | 119 ++++++- .../org/apache/kafka/test/MockSelector.java | 4 + .../kafka/api/ControlledShutdownRequest.scala | 19 +- .../controller/ControllerChannelManager.scala | 327 ++++++++++++------- .../kafka/controller/KafkaController.scala | 46 ++- .../controller/PartitionStateMachine.scala | 4 +- .../kafka/controller/ReplicaStateMachine.scala | 2 +- .../kafka/controller/TopicDeletionManager.scala | 20 +- .../scala/kafka/network/BlockingChannel.scala | 10 +- .../main/scala/kafka/network/SocketServer.scala | 13 +- .../main/scala/kafka/server/KafkaServer.scala | 184 +++++++++-- .../kafka/utils/NetworkClientBlockingOps.scala | 142 ++++++++ .../controller/ControllerFailoverTest.scala | 18 +- .../unit/kafka/server/LeaderElectionTest.scala | 38 ++- 39 files changed, 2229 insertions(+), 384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index ba3bcbe..e7514f8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -19,10 +19,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import org.apache.kafka.common.network.ChannelBuilders; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.network.ChannelBuilder; -import org.apache.kafka.common.network.SSLChannelBuilder; -import org.apache.kafka.common.network.PlaintextChannelBuilder; import org.apache.kafka.common.security.ssl.SSLFactory; import org.apache.kafka.common.config.ConfigException; import org.slf4j.Logger; @@ -71,25 +70,13 @@ public class ClientUtils { /** * @param configs client/server configs - * returns ChannelBuilder configured channelBuilder based on the configs. + * @return configured ChannelBuilder based on the configs. */ public static ChannelBuilder createChannelBuilder(Map<String, ?> configs) { SecurityProtocol securityProtocol = SecurityProtocol.valueOf((String) configs.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)); - ChannelBuilder channelBuilder = null; - - switch (securityProtocol) { - case SSL: - channelBuilder = new SSLChannelBuilder(SSLFactory.Mode.CLIENT); - break; - case PLAINTEXT: - channelBuilder = new PlaintextChannelBuilder(); - break; - default: - throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); - } - - channelBuilder.configure(configs); - return channelBuilder; + if (securityProtocol != SecurityProtocol.SSL && securityProtocol != SecurityProtocol.PLAINTEXT) + throw new ConfigException("Invalid SecurityProtocol " + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG); + return ChannelBuilders.create(securityProtocol, SSLFactory.Mode.CLIENT, configs); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 9ebda5e..6c58211 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -120,6 +120,17 @@ final class ClusterConnectionStates { NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.DISCONNECTED; } + + /** + * Remove the given node from the tracked connection states. The main difference between this and `disconnected` + * is the impact on `connectionDelay`: it will be 0 after this call whereas `reconnectBackoffMs` will be taken + * into account after `disconnected` is called. + * + * @param id The connection to remove + */ + public void remove(String id) { + nodeState.remove(id); + } /** * Get the state of a given connection @@ -158,4 +169,4 @@ final class ClusterConnectionStates { return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 7ab2503..f46c0d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -81,6 +81,13 @@ public interface KafkaClient extends Closeable { public List<ClientResponse> poll(long timeout, long now); /** + * Closes the connection to a particular node (if there is one). + * + * @param nodeId The id of the node + */ + public void close(String nodeId); + + /** * Complete all in-flight requests for a given connection * * @param id The connection to complete requests for @@ -127,8 +134,17 @@ public interface KafkaClient extends Closeable { public RequestHeader nextRequestHeader(ApiKeys key); /** + * Generate a request header for the given API key + * + * @param key The api key + * @param version The api version + * @return A request header with the appropriate client id and correlation id + */ + public RequestHeader nextRequestHeader(ApiKeys key, short version); + + /** * Wake up the client if it is currently blocked waiting for I/O */ public void wakeup(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java new file mode 100644 index 0000000..efbe664 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.types.Struct; + +import java.util.ArrayList; +import java.util.List; + +/** + * A simple implementation of `MetadataUpdater` that returns the cluster nodes set via the constructor or via + * `setNodes`. + * + * This is useful in cases where automatic metadata updates are not required. An example is controller/broker + * communication. + * + * This class is not thread-safe! + */ +public class ManualMetadataUpdater implements MetadataUpdater { + + private List<Node> nodes; + + public ManualMetadataUpdater() { + this(new ArrayList<Node>(0)); + } + + public ManualMetadataUpdater(List<Node> nodes) { + this.nodes = nodes; + } + + public void setNodes(List<Node> nodes) { + this.nodes = nodes; + } + + @Override + public List<Node> fetchNodes() { + return new ArrayList<>(nodes); + } + + @Override + public boolean isUpdateDue(long now) { + return false; + } + + @Override + public long maybeUpdate(long now) { + return Long.MAX_VALUE; + } + + @Override + public boolean maybeHandleDisconnection(ClientRequest request) { + return false; + } + + @Override + public boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body) { + return false; + } + + @Override + public void requestUpdate() { + // Do nothing + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java new file mode 100644 index 0000000..4669a68 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.clients; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.types.Struct; + +import java.util.List; + +/** + * The interface used by `NetworkClient` to request cluster metadata info to be updated and to retrieve the cluster nodes + * from such metadata. This is an internal class. + * <p> + * This class is not thread-safe! + */ +interface MetadataUpdater { + + /** + * Gets the current cluster info without blocking. + */ + List<Node> fetchNodes(); + + /** + * Returns true if an update to the cluster metadata info is due. + */ + boolean isUpdateDue(long now); + + /** + * Starts a cluster metadata update if needed and possible. Returns the time until the metadata update (which would + * be 0 if an update has been started as a result of this call). + * + * If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to + * `maybeHandleCompletedReceive`. + * + * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of + * factors like node availability, how long since the last metadata update, etc. + */ + long maybeUpdate(long now); + + /** + * If `request` is a metadata request, handles it and return `true`. Otherwise, returns `false`. + * + * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own + * requests with special handling for disconnections of such requests. + */ + boolean maybeHandleDisconnection(ClientRequest request); + + /** + * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`. + * + * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own + * requests with special handling for completed receives of such requests. + */ + boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body); + + /** + * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the + * start of the update if possible (see `maybeUpdate` for more information). + */ + void requestUpdate(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index b31f7f1..0a6f952 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -48,9 +48,8 @@ public class NetworkClient implements KafkaClient { /* the selector used to perform network i/o */ private final Selectable selector; - - /* the current cluster metadata */ - private final Metadata metadata; + + private final MetadataUpdater metadataUpdater; /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; @@ -73,12 +72,6 @@ public class NetworkClient implements KafkaClient { /* the current correlation id to use when sending requests to servers */ private int correlation; - /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ - private boolean metadataFetchInProgress; - - /* the last timestamp when no broker node is available to connect */ - private long lastNoNodeAvailableMs; - public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -86,8 +79,43 @@ public class NetworkClient implements KafkaClient { long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer) { + this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, + reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer); + } + + public NetworkClient(Selectable selector, + MetadataUpdater metadataUpdater, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer) { + this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, + socketSendBuffer, socketReceiveBuffer); + } + + private NetworkClient(MetadataUpdater metadataUpdater, + Metadata metadata, + Selectable selector, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer) { + + /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not + * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the + * super constructor is invoked. + */ + if (metadataUpdater == null) { + if (metadata == null) + throw new IllegalArgumentException("`metadata` must not be null"); + this.metadataUpdater = new DefaultMetadataUpdater(metadata); + } else { + this.metadataUpdater = metadataUpdater; + } + this.selector = selector; - this.metadata = metadata; this.clientId = clientId; this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs); @@ -95,8 +123,6 @@ public class NetworkClient implements KafkaClient { this.socketReceiveBuffer = socketReceiveBuffer; this.correlation = 0; this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); - this.metadataFetchInProgress = false; - this.lastNoNodeAvailableMs = 0; } /** @@ -119,6 +145,17 @@ public class NetworkClient implements KafkaClient { } /** + * Closes the connection to a particular node (if there is one). + * + * @param nodeId The id of the node + */ + @Override + public void close(String nodeId) { + selector.close(nodeId); + connectionStates.remove(nodeId); + } + + /** * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled * connections. @@ -154,14 +191,9 @@ public class NetworkClient implements KafkaClient { */ @Override public boolean isReady(Node node, long now) { - String nodeId = node.idString(); - if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0) - // if we need to update our metadata now declare all requests unready to make metadata requests first - // priority - return false; - else - // otherwise we are ready if we are connected and can send more requests - return canSendRequest(nodeId); + // if we need to update our metadata now declare all requests unready to make metadata requests first + // priority + return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString()); } /** @@ -193,7 +225,10 @@ public class NetworkClient implements KafkaClient { String nodeId = request.request().destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); + doSend(request); + } + private void doSend(ClientRequest request) { this.inFlightRequests.add(request); selector.send(request.request()); } @@ -207,16 +242,7 @@ public class NetworkClient implements KafkaClient { */ @Override public List<ClientResponse> poll(long timeout, long now) { - // should we update our metadata? - long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); - long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); - long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0; - // if there is no node available to connect, back off refreshing metadata - long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), - waitForMetadataFetch); - if (metadataTimeout == 0) - maybeUpdateMetadata(now); - // do the I/O + long metadataTimeout = metadataUpdater.maybeUpdate(now); try { this.selector.poll(Math.min(timeout, metadataTimeout)); } catch (IOException e) { @@ -224,7 +250,7 @@ public class NetworkClient implements KafkaClient { } // process completed actions - List<ClientResponse> responses = new ArrayList<ClientResponse>(); + List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, now); handleCompletedReceives(responses, now); handleDisconnections(responses, now); @@ -304,6 +330,18 @@ public class NetworkClient implements KafkaClient { } /** + * Generate a request header for the given API key and version + * + * @param key The api key + * @param version The api version + * @return A request header with the appropriate client id and correlation id + */ + @Override + public RequestHeader nextRequestHeader(ApiKeys key, short version) { + return new RequestHeader(key.id, version, clientId, correlation++); + } + + /** * Interrupt the client if it is blocked waiting on I/O. */ @Override @@ -327,8 +365,9 @@ public class NetworkClient implements KafkaClient { * * @return The node with the fewest in-flight requests. */ + @Override public Node leastLoadedNode(long now) { - List<Node> nodes = this.metadata.fetch().nodes(); + List<Node> nodes = this.metadataUpdater.fetchNodes(); int inflight = Integer.MAX_VALUE; Node found = null; for (int i = 0; i < nodes.size(); i++) { @@ -378,30 +417,8 @@ public class NetworkClient implements KafkaClient { short apiKey = req.request().header().apiKey(); Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); correlate(req.request().header(), header); - if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) { - handleMetadataResponse(req.request().header(), body, now); - } else { - // need to add body/header to response here + if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) responses.add(new ClientResponse(req, now, false, body)); - } - } - } - - private void handleMetadataResponse(RequestHeader header, Struct body, long now) { - this.metadataFetchInProgress = false; - MetadataResponse response = new MetadataResponse(body); - Cluster cluster = response.cluster(); - // check if any topics metadata failed to get updated - if (response.errors().size() > 0) { - log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors()); - } - // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being - // created which means we will get errors and no nodes until it exists - if (cluster.nodes().size() > 0) { - this.metadata.update(cluster, now); - } else { - log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); - this.metadata.failedUpdate(now); } } @@ -417,16 +434,13 @@ public class NetworkClient implements KafkaClient { log.debug("Node {} disconnected.", node); for (ClientRequest request : this.inFlightRequests.clearAll(node)) { log.trace("Cancelled request {} due to node {} being disconnected", request, node); - ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); - if (requestKey == ApiKeys.METADATA) - metadataFetchInProgress = false; - else + if (!metadataUpdater.maybeHandleDisconnection(request)) responses.add(new ClientResponse(request, now, true, null)); } } // we got a disconnect so we should probably refresh our metadata and see if that broker is dead if (this.selector.disconnected().size() > 0) - this.metadata.requestUpdate(); + metadataUpdater.requestUpdate(); } /** @@ -449,52 +463,6 @@ public class NetworkClient implements KafkaClient { } /** - * Create a metadata request for the given topics - */ - private ClientRequest metadataRequest(long now, String node, Set<String> topics) { - MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics)); - RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); - return new ClientRequest(now, true, send, null, true); - } - - /** - * Add a metadata request to the list of sends if we can make one - */ - private void maybeUpdateMetadata(long now) { - // Beware that the behavior of this method and the computation of timeouts for poll() are - // highly dependent on the behavior of leastLoadedNode. - Node node = this.leastLoadedNode(now); - if (node == null) { - log.debug("Give up sending metadata request since no node is available"); - // mark the timestamp for no node available to connect - this.lastNoNodeAvailableMs = now; - return; - } - String nodeConnectionId = node.idString(); - - if (canSendRequest(nodeConnectionId)) { - Set<String> topics = metadata.topics(); - this.metadataFetchInProgress = true; - ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics); - log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - this.selector.send(metadataRequest.request()); - this.inFlightRequests.add(metadataRequest); - } else if (connectionStates.canConnect(nodeConnectionId, now)) { - // we don't have a connection to this node right now, make one - log.debug("Initialize connection to node {} for sending metadata request", node.id()); - initiateConnect(node, now); - // If initiateConnect failed immediately, this node will be put into blackout and we - // should allow immediately retrying in case there is another candidate node. If it - // is still connecting, the worst case is that we end up setting a longer timeout - // on the next round and then wait for the response. - } else { // connected, but can't send more OR connecting - // In either case, we just need to wait for a network event to let us know the selected - // connection might be usable again. - this.lastNoNodeAvailableMs = now; - } - } - - /** * Initiate a connection to the given node */ private void initiateConnect(Node node, long now) { @@ -510,9 +478,145 @@ public class NetworkClient implements KafkaClient { /* attempt failed, we'll try again after the backoff */ connectionStates.disconnected(nodeConnectionId); /* maybe the problem is our metadata, update it */ - metadata.requestUpdate(); + metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); } } + class DefaultMetadataUpdater implements MetadataUpdater { + + /* the current cluster metadata */ + private final Metadata metadata; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ + private boolean metadataFetchInProgress; + + /* the last timestamp when no broker node is available to connect */ + private long lastNoNodeAvailableMs; + + DefaultMetadataUpdater(Metadata metadata) { + this.metadata = metadata; + this.metadataFetchInProgress = false; + this.lastNoNodeAvailableMs = 0; + } + + @Override + public List<Node> fetchNodes() { + return metadata.fetch().nodes(); + } + + @Override + public boolean isUpdateDue(long now) { + return !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0; + } + + @Override + public long maybeUpdate(long now) { + // should we update our metadata? + long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); + long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0); + long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0; + // if there is no node available to connect, back off refreshing metadata + long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), + waitForMetadataFetch); + + if (metadataTimeout == 0) { + // Beware that the behavior of this method and the computation of timeouts for poll() are + // highly dependent on the behavior of leastLoadedNode. + Node node = leastLoadedNode(now); + maybeUpdate(now, node); + } + + return metadataTimeout; + } + + @Override + public boolean maybeHandleDisconnection(ClientRequest request) { + ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); + + if (requestKey == ApiKeys.METADATA) { + metadataFetchInProgress = false; + return true; + } + + return false; + } + + @Override + public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) { + short apiKey = req.request().header().apiKey(); + if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) { + handleResponse(req.request().header(), body, now); + return true; + } + return false; + } + + @Override + public void requestUpdate() { + this.metadata.requestUpdate(); + } + + private void handleResponse(RequestHeader header, Struct body, long now) { + this.metadataFetchInProgress = false; + MetadataResponse response = new MetadataResponse(body); + Cluster cluster = response.cluster(); + // check if any topics metadata failed to get updated + if (response.errors().size() > 0) { + log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors()); + } + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being + // created which means we will get errors and no nodes until it exists + if (cluster.nodes().size() > 0) { + this.metadata.update(cluster, now); + } else { + log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + this.metadata.failedUpdate(now); + } + } + + /** + * Create a metadata request for the given topics + */ + private ClientRequest request(long now, String node, Set<String> topics) { + MetadataRequest metadata = new MetadataRequest(new ArrayList<>(topics)); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); + return new ClientRequest(now, true, send, null, true); + } + + /** + * Add a metadata request to the list of sends if we can make one + */ + private void maybeUpdate(long now, Node node) { + if (node == null) { + log.debug("Give up sending metadata request since no node is available"); + // mark the timestamp for no node available to connect + this.lastNoNodeAvailableMs = now; + return; + } + String nodeConnectionId = node.idString(); + + if (canSendRequest(nodeConnectionId)) { + Set<String> topics = metadata.topics(); + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = request(now, nodeConnectionId, topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + doSend(metadataRequest); + } else if (connectionStates.canConnect(nodeConnectionId, now)) { + // we don't have a connection to this node right now, make one + log.debug("Initialize connection to node {} for sending metadata request", node.id()); + initiateConnect(node, now); + // If initiateConnect failed immediately, this node will be put into blackout and we + // should allow immediately retrying in case there is another candidate node. If it + // is still connecting, the worst case is that we end up setting a longer timeout + // on the next round and then wait for the response. + } else { // connected, but can't send more OR connecting + // In either case, we just need to wait for a network event to let us know the selected + // connection might be usable again. + this.lastNoNodeAvailableMs = now; + } + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java new file mode 100644 index 0000000..f78f061 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/BrokerNotAvailableException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors; + +public class BrokerNotAvailableException extends ApiException { + + private static final long serialVersionUID = 1L; + + public BrokerNotAvailableException(String message) { + super(message); + } + + public BrokerNotAvailableException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java b/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java new file mode 100644 index 0000000..8dd7487 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/ControllerMovedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.errors; + +public class ControllerMovedException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ControllerMovedException(String message) { + super(message); + } + + public ControllerMovedException(String message, Throwable cause) { + super(message, cause); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java new file mode 100644 index 0000000..2332d3f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.network; + +import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.ssl.SSLFactory; + +import java.util.Map; + +public class ChannelBuilders { + + private ChannelBuilders() { } + + /** + * @param securityProtocol the securityProtocol + * @param mode the SSL mode, it must be non-null if `securityProcol` is `SSL` and it is ignored otherwise + * @param configs client/server configs + * @return the configured `ChannelBuilder` + * @throws IllegalArgumentException if `mode` invariants described above is not maintained + */ + public static ChannelBuilder create(SecurityProtocol securityProtocol, SSLFactory.Mode mode, Map<String, ?> configs) { + ChannelBuilder channelBuilder = null; + + switch (securityProtocol) { + case SSL: + if (mode == null) + throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `SSL`"); + channelBuilder = new SSLChannelBuilder(mode); + break; + case PLAINTEXT: + case TRACE: + channelBuilder = new PlaintextChannelBuilder(); + break; + default: + throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol); + } + + channelBuilder.configure(configs); + return channelBuilder; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java index 2a1568e..409775c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java @@ -107,14 +107,4 @@ public class NetworkReceive implements Receive { return this.buffer; } - // Used only by BlockingChannel, so we may be able to get rid of this when/if we get rid of BlockingChannel - @Deprecated - public long readCompletely(ReadableByteChannel channel) throws IOException { - int totalRead = 0; - while (!complete()) { - totalRead += readFromReadableChannel(channel); - } - return totalRead; - } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/Selectable.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java index 39eae4a..70e74bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selectable.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selectable.java @@ -23,6 +23,11 @@ import java.util.List; public interface Selectable { /** + * See {@link #connect(String, InetSocketAddress, int, int) connect()} + */ + public static final int USE_DEFAULT_BUFFER_SIZE = -1; + + /** * Begin establishing a socket connection to the given address identified by the given address * @param id The id for this connection * @param address The address to connect to @@ -48,7 +53,12 @@ public interface Selectable { public void close(); /** - * Queue the given request for sending in the subsequent {@poll(long)} calls + * Close the connection identified by the given id + */ + public void close(String nodeId); + + /** + * Queue the given request for sending in the subsequent {@link #poll(long) poll()} calls * @param send The request to send */ public void send(Send send); @@ -61,23 +71,23 @@ public interface Selectable { public void poll(long timeout) throws IOException; /** - * The list of sends that completed on the last {@link #poll(long, List) poll()} call. + * The list of sends that completed on the last {@link #poll(long) poll()} call. */ public List<Send> completedSends(); /** - * The list of receives that completed on the last {@link #poll(long, List) poll()} call. + * The list of receives that completed on the last {@link #poll(long) poll()} call. */ public List<NetworkReceive> completedReceives(); /** - * The list of connections that finished disconnecting on the last {@link #poll(long, List) poll()} + * The list of connections that finished disconnecting on the last {@link #poll(long) poll()} * call. */ public List<String> disconnected(); /** - * The list of connections that completed their connection on the last {@link #poll(long, List) poll()} + * The list of connections that completed their connection on the last {@link #poll(long) poll()} * call. */ public List<String> connected(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index f49d54c..4aa5cbb 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -146,8 +146,10 @@ public class Selector implements Selectable { socketChannel.configureBlocking(false); Socket socket = socketChannel.socket(); socket.setKeepAlive(true); - socket.setSendBufferSize(sendBufferSize); - socket.setReceiveBufferSize(receiveBufferSize); + if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) + socket.setSendBufferSize(sendBufferSize); + if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) + socket.setReceiveBufferSize(receiveBufferSize); socket.setTcpNoDelay(true); try { socketChannel.connect(address); @@ -182,7 +184,7 @@ public class Selector implements Selectable { */ @Override public void disconnect(String id) { - KafkaChannel channel = channelForId(id); + KafkaChannel channel = this.channels.get(id); if (channel != null) channel.disconnect(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index b39e9bb..46ddddb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -49,7 +49,7 @@ public enum ApiKeys { MAX_API_KEY = maxKey; } - /** the perminant and immutable id of an API--this can't change ever */ + /** the permanent and immutable id of an API--this can't change ever */ public final short id; /** an english description of the api--this is for debugging and can change */ @@ -63,4 +63,4 @@ public enum ApiKeys { public static ApiKeys forId(int id) { return codeToType[id]; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index e17e390..641afa1 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.kafka.common.errors.*; +import org.apache.kafka.common.errors.ControllerMovedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,11 +46,14 @@ public enum Errors { new NotLeaderForPartitionException("This server is not the leader for that topic-partition.")), REQUEST_TIMED_OUT(7, new TimeoutException("The request timed out.")), - // TODO: errorCode 8 for BrokerNotAvailable + BROKER_NOT_AVAILABLE(8, + new BrokerNotAvailableException("The broker is not available.")), REPLICA_NOT_AVAILABLE(9, new ApiException("The replica is not available for the requested topic-partition")), MESSAGE_TOO_LARGE(10, new RecordTooLargeException("The request included a message larger than the max message size the server will accept.")), + STALE_CONTROLLER_EPOCH(11, + new ControllerMovedException("The controller moved to another broker.")), OFFSET_METADATA_TOO_LARGE(12, new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java index c2cbbbd..85357ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java @@ -29,6 +29,8 @@ public class ProtoUtils { Schema[] versions = schemas[apiKey]; if (version < 0 || version > versions.length) throw new IllegalArgumentException("Invalid version for API key " + apiKey + ": " + version); + if (versions[version] == null) + throw new IllegalArgumentException("Unsupported version for API key " + apiKey + ": " + version); return versions[version]; } @@ -62,4 +64,8 @@ public class ProtoUtils { return (Struct) currentResponseSchema(apiKey).read(buffer); } + public static Struct parseResponse(int apiKey, int version, ByteBuffer buffer) { + return (Struct) responseSchema(apiKey, version).read(buffer); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index a951e90..b72db4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -41,7 +41,7 @@ public class Protocol { public static final Schema METADATA_REQUEST_V0 = new Schema(new Field("topics", new ArrayOf(STRING), - "An array of topics to fetch metadata for. If no topics are specified fetch metadtata for all topics.")); + "An array of topics to fetch metadata for. If no topics are specified fetch metadata for all topics.")); public static final Schema BROKER = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), @@ -396,6 +396,25 @@ public class Protocol { public static final Schema[] CONSUMER_METADATA_REQUEST = new Schema[] {CONSUMER_METADATA_REQUEST_V0}; public static final Schema[] CONSUMER_METADATA_RESPONSE = new Schema[] {CONSUMER_METADATA_RESPONSE_V0}; + /* Controlled shutdown api */ + public static final Schema CONTROLLED_SHUTDOWN_REQUEST_V1 = new Schema(new Field("broker_id", + INT32, + "The id of the broker for which controlled shutdown has been requested.")); + + public static final Schema CONTROLLED_SHUTDOWN_PARTITION_V1 = new Schema(new Field("topic", STRING), + new Field("partition", + INT32, + "Topic partition id.")); + + public static final Schema CONTROLLED_SHUTDOWN_RESPONSE_V1 = new Schema(new Field("error_code", INT16), + new Field("partitions_remaining", + new ArrayOf(CONTROLLED_SHUTDOWN_PARTITION_V1), + "The partitions that the broker still leads.")); + + /* V0 is not supported as it would require changes to the request header not to include `clientId` */ + public static final Schema[] CONTROLLED_SHUTDOWN_REQUEST = new Schema[] {null, CONTROLLED_SHUTDOWN_REQUEST_V1}; + public static final Schema[] CONTROLLED_SHUTDOWN_RESPONSE = new Schema[] {null, CONTROLLED_SHUTDOWN_RESPONSE_V1}; + /* Join group api */ public static final Schema JOIN_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, @@ -442,6 +461,39 @@ public class Protocol { public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0}; public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0}; + /* Leader and ISR api */ + public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 = + new Schema(new Field("topic", STRING, "Topic name."), + new Field("partition", INT32, "Topic partition id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("leader", INT32, "The broker id for the leader."), + new Field("leader_epoch", INT32, "The leader epoch."), + new Field("isr", new ArrayOf(INT32), "The in sync replica ids."), + new Field("zk_version", INT32, "The ZK version."), + new Field("replicas", new ArrayOf(INT32), "The replica ids.")); + + public static final Schema LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0 = + new Schema(new Field("id", INT32, "The broker id."), + new Field("host", STRING, "The hostname of the broker."), + new Field("port", INT32, "The port on which the broker accepts requests.")); + + public static final Schema LEADER_AND_ISR_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", + new ArrayOf(LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0)), + new Field("live_leaders", new ArrayOf(LEADER_AND_ISR_REQUEST_LIVE_LEADER_V0))); + + public static final Schema LEADER_AND_ISR_RESPONSE_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."), + new Field("partition", INT32, "Topic partition id."), + new Field("error_code", INT16, "Error code.")); + + public static final Schema LEADER_AND_ISR_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."), + new Field("partitions", + new ArrayOf(LEADER_AND_ISR_RESPONSE_PARTITION_V0))); + + public static final Schema[] LEADER_AND_ISR_REQUEST = new Schema[] {LEADER_AND_ISR_REQUEST_V0}; + public static final Schema[] LEADER_AND_ISR_RESPONSE = new Schema[] {LEADER_AND_ISR_RESPONSE_V0}; + /* Replica api */ public static final Schema STOP_REPLICA_REQUEST_PARTITION_V0 = new Schema(new Field("topic", STRING, "Topic name."), new Field("partition", INT32, "Topic partition id.")); @@ -465,7 +517,50 @@ public class Protocol { public static final Schema[] STOP_REPLICA_REQUEST = new Schema[] {STOP_REPLICA_REQUEST_V0}; public static final Schema[] STOP_REPLICA_RESPONSE = new Schema[] {STOP_REPLICA_RESPONSE_V0}; - /* an array of all requests and responses with all schema versions */ + /* Update metadata api */ + + public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V0 = LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0; + + public static final Schema UPDATE_METADATA_REQUEST_BROKER_V0 = + new Schema(new Field("id", INT32, "The broker id."), + new Field("host", STRING, "The hostname of the broker."), + new Field("port", INT32, "The port on which the broker accepts requests.")); + + public static final Schema UPDATE_METADATA_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", + new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V0)), + new Field("live_brokers", + new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V0))); + + public static final Schema UPDATE_METADATA_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code.")); + + public static final Schema UPDATE_METADATA_REQUEST_PARTITION_STATE_V1 = UPDATE_METADATA_REQUEST_PARTITION_STATE_V0; + + public static final Schema UPDATE_METADATA_REQUEST_END_POINT_V1 = + // for some reason, V1 sends `port` before `host` while V0 sends `host` before `port + new Schema(new Field("port", INT32, "The port on which the broker accepts requests."), + new Field("host", STRING, "The hostname of the broker."), + new Field("security_protocol_type", INT16, "The security protocol type.")); + + public static final Schema UPDATE_METADATA_REQUEST_BROKER_V1 = + new Schema(new Field("id", INT32, "The broker id."), + new Field("end_points", new ArrayOf(UPDATE_METADATA_REQUEST_END_POINT_V1))); + + public static final Schema UPDATE_METADATA_REQUEST_V1 = new Schema(new Field("controller_id", INT32, "The controller id."), + new Field("controller_epoch", INT32, "The controller epoch."), + new Field("partition_states", + new ArrayOf(UPDATE_METADATA_REQUEST_PARTITION_STATE_V1)), + new Field("live_brokers", + new ArrayOf(UPDATE_METADATA_REQUEST_BROKER_V1))); + + public static final Schema UPDATE_METADATA_RESPONSE_V1 = UPDATE_METADATA_RESPONSE_V0; + + public static final Schema[] UPDATE_METADATA_REQUEST = new Schema[] {UPDATE_METADATA_REQUEST_V0, UPDATE_METADATA_REQUEST_V1}; + public static final Schema[] UPDATE_METADATA_RESPONSE = new Schema[] {UPDATE_METADATA_RESPONSE_V0, UPDATE_METADATA_RESPONSE_V1}; + + /* an array of all requests and responses with all schema versions; a null value in the inner array means that the + * particular version is not supported */ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; @@ -477,10 +572,10 @@ public class Protocol { REQUESTS[ApiKeys.FETCH.id] = FETCH_REQUEST; REQUESTS[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_REQUEST; REQUESTS[ApiKeys.METADATA.id] = METADATA_REQUEST; - REQUESTS[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; + REQUESTS[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_REQUEST; REQUESTS[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_REQUEST; - REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; - REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; + REQUESTS[ApiKeys.UPDATE_METADATA_KEY.id] = UPDATE_METADATA_REQUEST; + REQUESTS[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_REQUEST; REQUESTS[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_REQUEST; REQUESTS[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_REQUEST; REQUESTS[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_REQUEST; @@ -492,10 +587,10 @@ public class Protocol { RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; RESPONSES[ApiKeys.LIST_OFFSETS.id] = LIST_OFFSET_RESPONSE; RESPONSES[ApiKeys.METADATA.id] = METADATA_RESPONSE; - RESPONSES[ApiKeys.LEADER_AND_ISR.id] = new Schema[] {}; + RESPONSES[ApiKeys.LEADER_AND_ISR.id] = LEADER_AND_ISR_RESPONSE; RESPONSES[ApiKeys.STOP_REPLICA.id] = STOP_REPLICA_RESPONSE; - RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = new Schema[] {}; - RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = new Schema[] {}; + RESPONSES[ApiKeys.UPDATE_METADATA_KEY.id] = UPDATE_METADATA_RESPONSE; + RESPONSES[ApiKeys.CONTROLLED_SHUTDOWN_KEY.id] = CONTROLLED_SHUTDOWN_RESPONSE; RESPONSES[ApiKeys.OFFSET_COMMIT.id] = OFFSET_COMMIT_RESPONSE; RESPONSES[ApiKeys.OFFSET_FETCH.id] = OFFSET_FETCH_RESPONSE; RESPONSES[ApiKeys.CONSUMER_METADATA.id] = CONSUMER_METADATA_RESPONSE; http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index e316957..a696e80 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -57,6 +57,12 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return HeartbeatRequest.parse(buffer, versionId); case STOP_REPLICA: return StopReplicaRequest.parse(buffer, versionId); + case CONTROLLED_SHUTDOWN_KEY: + return ControlledShutdownRequest.parse(buffer, versionId); + case UPDATE_METADATA_KEY: + return UpdateMetadataRequest.parse(buffer, versionId); + case LEADER_AND_ISR: + return LeaderAndIsrRequest.parse(buffer, versionId); default: return null; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java new file mode 100644 index 0000000..57f51d8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.Collections; + +public class ControlledShutdownRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id); + + private static final String BROKER_ID_KEY_NAME = "broker_id"; + + private int brokerId; + + public ControlledShutdownRequest(int brokerId) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(BROKER_ID_KEY_NAME, brokerId); + this.brokerId = brokerId; + } + + public ControlledShutdownRequest(Struct struct) { + super(struct); + brokerId = struct.getInt(BROKER_ID_KEY_NAME); + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + throw new IllegalArgumentException(String.format("Version 0 is not supported. It is only supported by " + + "the Scala request class for controlled shutdown")); + case 1: + return new ControlledShutdownResponse(Errors.forException(e).code(), Collections.<TopicPartition>emptySet()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id))); + } + } + + public int brokerId() { + return brokerId; + } + + public static ControlledShutdownRequest parse(ByteBuffer buffer, int versionId) { + return new ControlledShutdownRequest(ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, versionId, buffer)); + } + + public static ControlledShutdownRequest parse(ByteBuffer buffer) { + return new ControlledShutdownRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java new file mode 100644 index 0000000..15d600d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.*; + +public class ControlledShutdownResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id); + + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String PARTITIONS_REMAINING_KEY_NAME = "partitions_remaining"; + + private static final String TOPIC_KEY_NAME = "topic"; + private static final String PARTITION_KEY_NAME = "partition"; + + /** + * Possible error codes: + * + * UNKNOWN(-1) (this is because IllegalStateException may be thrown in `KafkaController.shutdownBroker`, it would be good to improve this) + * BROKER_NOT_AVAILABLE(8) + * STALE_CONTROLLER_EPOCH(11) + */ + private final short errorCode; + + private final Set<TopicPartition> partitionsRemaining; + + public ControlledShutdownResponse(short errorCode, Set<TopicPartition> partitionsRemaining) { + super(new Struct(CURRENT_SCHEMA)); + + struct.set(ERROR_CODE_KEY_NAME, errorCode); + + List<Struct> partitionsRemainingList = new ArrayList<>(partitionsRemaining.size()); + for (TopicPartition topicPartition : partitionsRemaining) { + Struct topicPartitionStruct = struct.instance(PARTITIONS_REMAINING_KEY_NAME); + topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic()); + topicPartitionStruct.set(PARTITION_KEY_NAME, topicPartition.partition()); + } + struct.set(PARTITIONS_REMAINING_KEY_NAME, partitionsRemainingList.toArray()); + + this.errorCode = errorCode; + this.partitionsRemaining = partitionsRemaining; + } + + public ControlledShutdownResponse(Struct struct) { + super(struct); + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + Set<TopicPartition> partitions = new HashSet<>(); + for (Object topicPartitionObj : struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) { + Struct topicPartition = (Struct) topicPartitionObj; + String topic = topicPartition.getString(TOPIC_KEY_NAME); + int partition = topicPartition.getInt(PARTITION_KEY_NAME); + partitions.add(new TopicPartition(topic, partition)); + } + partitionsRemaining = partitions; + } + + public short errorCode() { + return errorCode; + } + + public Set<TopicPartition> partitionsRemaining() { + return partitionsRemaining; + } + + public static ControlledShutdownResponse parse(ByteBuffer buffer) { + return new ControlledShutdownResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + + public static ControlledShutdownResponse parse(ByteBuffer buffer, int version) { + return new ControlledShutdownResponse(ProtoUtils.parseResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version, buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java new file mode 100644 index 0000000..6b16496 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer;; +import java.util.*; + +public class LeaderAndIsrRequest extends AbstractRequest { + + public static class PartitionState { + public final int controllerEpoch; + public final int leader; + public final int leaderEpoch; + public final List<Integer> isr; + public final int zkVersion; + public final Set<Integer> replicas; + + public PartitionState(int controllerEpoch, int leader, int leaderEpoch, List<Integer> isr, int zkVersion, Set<Integer> replicas) { + this.controllerEpoch = controllerEpoch; + this.leader = leader; + this.leaderEpoch = leaderEpoch; + this.isr = isr; + this.zkVersion = zkVersion; + this.replicas = replicas; + } + + } + + public static final class EndPoint { + public final int id; + public final String host; + public final int port; + + public EndPoint(int id, String host, int port) { + this.id = id; + this.host = host; + this.port = port; + } + } + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LEADER_AND_ISR.id); + + private static final String CONTROLLER_ID_KEY_NAME = "controller_id"; + private static final String CONTROLLER_EPOCH_KEY_NAME = "controller_epoch"; + private static final String PARTITION_STATES_KEY_NAME = "partition_states"; + private static final String LIVE_LEADERS_KEY_NAME = "live_leaders"; + + // partition_states key names + private static final String TOPIC_KEY_NAME = "topic"; + private static final String PARTITION_KEY_NAME = "partition"; + private static final String LEADER_KEY_NAME = "leader"; + private static final String LEADER_EPOCH_KEY_NAME = "leader_epoch"; + private static final String ISR_KEY_NAME = "isr"; + private static final String ZK_VERSION_KEY_NAME = "zk_version"; + private static final String REPLICAS_KEY_NAME = "replicas"; + + // live_leaders key names + private static final String END_POINT_ID_KEY_NAME = "id"; + private static final String HOST_KEY_NAME = "host"; + private static final String PORT_KEY_NAME = "port"; + + private final int controllerId; + private final int controllerEpoch; + private final Map<TopicPartition, PartitionState> partitionStates; + private final Set<EndPoint> liveLeaders; + + public LeaderAndIsrRequest(int controllerId, int controllerEpoch, Map<TopicPartition, PartitionState> partitionStates, + Set<EndPoint> liveLeaders) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(CONTROLLER_ID_KEY_NAME, controllerId); + struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); + + List<Struct> partitionStatesData = new ArrayList<>(partitionStates.size()); + for (Map.Entry<TopicPartition, PartitionState> entry : partitionStates.entrySet()) { + Struct partitionStateData = struct.instance(PARTITION_STATES_KEY_NAME); + TopicPartition topicPartition = entry.getKey(); + partitionStateData.set(TOPIC_KEY_NAME, topicPartition.topic()); + partitionStateData.set(PARTITION_KEY_NAME, topicPartition.partition()); + PartitionState partitionState = entry.getValue(); + partitionStateData.set(CONTROLLER_EPOCH_KEY_NAME, partitionState.controllerEpoch); + partitionStateData.set(LEADER_KEY_NAME, partitionState.leader); + partitionStateData.set(LEADER_EPOCH_KEY_NAME, partitionState.leaderEpoch); + partitionStateData.set(ISR_KEY_NAME, partitionState.isr.toArray()); + partitionStateData.set(ZK_VERSION_KEY_NAME, partitionState.zkVersion); + partitionStateData.set(REPLICAS_KEY_NAME, partitionState.replicas.toArray()); + partitionStatesData.add(partitionStateData); + } + struct.set(PARTITION_STATES_KEY_NAME, partitionStatesData.toArray()); + + List<Struct> leadersData = new ArrayList<>(liveLeaders.size()); + for (EndPoint leader : liveLeaders) { + Struct leaderData = struct.instance(LIVE_LEADERS_KEY_NAME); + leaderData.set(END_POINT_ID_KEY_NAME, leader.id); + leaderData.set(HOST_KEY_NAME, leader.host); + leaderData.set(PORT_KEY_NAME, leader.port); + leadersData.add(leaderData); + } + struct.set(LIVE_LEADERS_KEY_NAME, leadersData.toArray()); + + this.controllerId = controllerId; + this.controllerEpoch = controllerEpoch; + this.partitionStates = partitionStates; + this.liveLeaders = liveLeaders; + } + + public LeaderAndIsrRequest(Struct struct) { + super(struct); + + Map<TopicPartition, PartitionState> partitionStates = new HashMap<>(); + for (Object partitionStateDataObj : struct.getArray(PARTITION_STATES_KEY_NAME)) { + Struct partitionStateData = (Struct) partitionStateDataObj; + String topic = partitionStateData.getString(TOPIC_KEY_NAME); + int partition = partitionStateData.getInt(PARTITION_KEY_NAME); + int controllerEpoch = partitionStateData.getInt(CONTROLLER_EPOCH_KEY_NAME); + int leader = partitionStateData.getInt(LEADER_KEY_NAME); + int leaderEpoch = partitionStateData.getInt(LEADER_EPOCH_KEY_NAME); + + Object[] isrArray = partitionStateData.getArray(ISR_KEY_NAME); + List<Integer> isr = new ArrayList<>(isrArray.length); + for (Object r : isrArray) + isr.add((Integer) r); + + int zkVersion = partitionStateData.getInt(ZK_VERSION_KEY_NAME); + + Object[] replicasArray = partitionStateData.getArray(REPLICAS_KEY_NAME); + Set<Integer> replicas = new HashSet<>(replicasArray.length); + for (Object r : replicasArray) + replicas.add((Integer) r); + + PartitionState partitionState = new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas); + partitionStates.put(new TopicPartition(topic, partition), partitionState); + + } + + Set<EndPoint> leaders = new HashSet<>(); + for (Object leadersDataObj : struct.getArray(LIVE_LEADERS_KEY_NAME)) { + Struct leadersData = (Struct) leadersDataObj; + int id = leadersData.getInt(END_POINT_ID_KEY_NAME); + String host = leadersData.getString(HOST_KEY_NAME); + int port = leadersData.getInt(PORT_KEY_NAME); + leaders.add(new EndPoint(id, host, port)); + } + + controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); + controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME); + this.partitionStates = partitionStates; + this.liveLeaders = leaders; + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + Map<TopicPartition, Short> responses = new HashMap<>(partitionStates.size()); + for (TopicPartition partition : partitionStates.keySet()) { + responses.put(partition, Errors.forException(e).code()); + } + + switch (versionId) { + case 0: + return new LeaderAndIsrResponse(Errors.NONE.code(), responses); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LEADER_AND_ISR.id))); + } + } + + public int controllerId() { + return controllerId; + } + + public int controllerEpoch() { + return controllerEpoch; + } + + public Map<TopicPartition, PartitionState> partitionStates() { + return partitionStates; + } + + public Set<EndPoint> liveLeaders() { + return liveLeaders; + } + + public static LeaderAndIsrRequest parse(ByteBuffer buffer, int versionId) { + return new LeaderAndIsrRequest(ProtoUtils.parseRequest(ApiKeys.LEADER_AND_ISR.id, versionId, buffer)); + } + + public static LeaderAndIsrRequest parse(ByteBuffer buffer) { + return new LeaderAndIsrRequest((Struct) CURRENT_SCHEMA.read(buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d02ca36c/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java new file mode 100644 index 0000000..3a6f4ee --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class LeaderAndIsrResponse extends AbstractRequestResponse { + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LEADER_AND_ISR.id); + + private static final String ERROR_CODE_KEY_NAME = "error_code"; + private static final String PARTITIONS_KEY_NAME = "partitions"; + + private static final String PARTITIONS_TOPIC_KEY_NAME = "topic"; + private static final String PARTITIONS_PARTITION_KEY_NAME = "partition"; + private static final String PARTITIONS_ERROR_CODE_KEY_NAME = "error_code"; + + /** + * Possible error code: + * + * STALE_CONTROLLER_EPOCH (11) + */ + private final short errorCode; + + private final Map<TopicPartition, Short> responses; + + public LeaderAndIsrResponse(Map<TopicPartition, Short> responses) { + this(Errors.NONE.code(), responses); + } + + public LeaderAndIsrResponse(short errorCode, Map<TopicPartition, Short> responses) { + super(new Struct(CURRENT_SCHEMA)); + + struct.set(ERROR_CODE_KEY_NAME, errorCode); + + List<Struct> responseDatas = new ArrayList<>(responses.size()); + for (Map.Entry<TopicPartition, Short> response : responses.entrySet()) { + Struct partitionData = struct.instance(PARTITIONS_KEY_NAME); + TopicPartition partition = response.getKey(); + partitionData.set(PARTITIONS_TOPIC_KEY_NAME, partition.topic()); + partitionData.set(PARTITIONS_PARTITION_KEY_NAME, partition.partition()); + partitionData.set(PARTITIONS_ERROR_CODE_KEY_NAME, response.getValue()); + responseDatas.add(partitionData); + } + + struct.set(PARTITIONS_KEY_NAME, responseDatas.toArray()); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + + this.responses = responses; + this.errorCode = errorCode; + } + + public LeaderAndIsrResponse(Struct struct) { + super(struct); + + responses = new HashMap<>(); + for (Object responseDataObj : struct.getArray(PARTITIONS_KEY_NAME)) { + Struct responseData = (Struct) responseDataObj; + String topic = responseData.getString(PARTITIONS_TOPIC_KEY_NAME); + int partition = responseData.getInt(PARTITIONS_PARTITION_KEY_NAME); + short errorCode = responseData.getShort(PARTITIONS_ERROR_CODE_KEY_NAME); + responses.put(new TopicPartition(topic, partition), errorCode); + } + + errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + } + + public Map<TopicPartition, Short> responses() { + return responses; + } + + public short errorCode() { + return errorCode; + } + + public static LeaderAndIsrResponse parse(ByteBuffer buffer, int version) { + return new LeaderAndIsrResponse(ProtoUtils.parseResponse(ApiKeys.LEADER_AND_ISR.id, version, buffer)); + } + + public static LeaderAndIsrResponse parse(ByteBuffer buffer) { + return new LeaderAndIsrResponse((Struct) CURRENT_SCHEMA.read(buffer)); + } + +}
