KAFKA-1326 Refactor Sender to support consumer.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/548d1ba0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/548d1ba0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/548d1ba0 Branch: refs/heads/trunk Commit: 548d1ba0939c43fff14531510140e2c641b1caa5 Parents: dcc8840 Author: Jay Kreps <jay.kr...@gmail.com> Authored: Tue Jun 10 17:41:29 2014 -0700 Committer: Jay Kreps <jay.kr...@gmail.com> Committed: Tue Jun 10 17:41:29 2014 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/ClientRequest.java | 61 ++ .../apache/kafka/clients/ClientResponse.java | 78 +++ .../kafka/clients/ClusterConnectionStates.java | 113 ++++ .../apache/kafka/clients/ConnectionState.java | 20 + .../apache/kafka/clients/InFlightRequests.java | 126 ++++ .../org/apache/kafka/clients/KafkaClient.java | 83 +++ .../org/apache/kafka/clients/NetworkClient.java | 383 +++++++++++ .../kafka/clients/NodeConnectionState.java | 31 + .../kafka/clients/producer/KafkaProducer.java | 18 +- .../clients/producer/internals/Metadata.java | 8 +- .../producer/internals/RecordAccumulator.java | 41 +- .../clients/producer/internals/RecordBatch.java | 6 +- .../clients/producer/internals/Sender.java | 675 +++---------------- .../apache/kafka/common/metrics/Measurable.java | 28 +- .../apache/kafka/common/metrics/stats/Avg.java | 29 +- .../kafka/common/metrics/stats/Count.java | 29 +- .../apache/kafka/common/metrics/stats/Max.java | 29 +- .../apache/kafka/common/metrics/stats/Min.java | 29 +- .../kafka/common/metrics/stats/Percentiles.java | 37 +- .../apache/kafka/common/metrics/stats/Rate.java | 8 +- .../kafka/common/metrics/stats/SampledStat.java | 18 +- .../kafka/common/metrics/stats/Total.java | 28 +- .../kafka/common/network/ByteBufferSend.java | 2 +- .../apache/kafka/common/network/Selector.java | 36 +- .../org/apache/kafka/common/network/Send.java | 26 +- .../kafka/common/protocol/types/Schema.java | 34 +- .../kafka/common/record/MemoryRecords.java | 44 +- .../kafka/common/requests/ProduceRequest.java | 71 ++ .../kafka/common/requests/ProduceResponse.java | 87 +-- .../org/apache/kafka/clients/MockClient.java | 96 +++ .../apache/kafka/clients/NetworkClientTest.java | 99 +++ .../clients/producer/RecordAccumulatorTest.java | 7 +- .../kafka/clients/producer/SenderTest.java | 132 +--- .../kafka/common/metrics/MetricsTest.java | 2 +- .../org/apache/kafka/common/utils/MockTime.java | 29 +- .../kafka/api/ProducerFailureHandlingTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 1 + 37 files changed, 1582 insertions(+), 964 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java new file mode 100644 index 0000000..d32c319 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.requests.RequestSend; + +/** + * A request being sent to the server. This holds both the network send as well as the client-level metadata. + */ +public final class ClientRequest { + + private final long createdMs; + private final boolean expectResponse; + private final RequestSend request; + private final Object attachment; + + /** + * @param createdMs The unix timestamp in milliseconds for the time at which this request was created. + * @param expectResponse Should we expect a response message or is this request complete once it is sent? + * @param request The request + * @param attachment Associated data with the request + */ + public ClientRequest(long createdMs, boolean expectResponse, RequestSend request, Object attachment) { + this.createdMs = createdMs; + this.attachment = attachment; + this.request = request; + this.expectResponse = expectResponse; + } + + @Override + public String toString() { + return "ClientRequest(expectResponse=" + expectResponse + ", payload=" + attachment + ", request=" + request + ")"; + } + + public boolean expectResponse() { + return expectResponse; + } + + public RequestSend request() { + return request; + } + + public Object attachment() { + return attachment; + } + + public long createdTime() { + return createdMs; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java new file mode 100644 index 0000000..14ef69a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.common.protocol.types.Struct; + +/** + * A response from the server. Contains both the body of the response as well as the correlated request that was + * originally sent. + */ +public class ClientResponse { + + private final long received; + private final boolean disconnected; + private final ClientRequest request; + private final Struct responseBody; + + /** + * @param request The original request + * @param received The unix timestamp when this response was received + * @param disconnected Whether the client disconnected before fully reading a response + * @param responseBody The response contents (or null) if we disconnected or no response was expected + */ + public ClientResponse(ClientRequest request, long received, boolean disconnected, Struct responseBody) { + super(); + this.received = received; + this.disconnected = disconnected; + this.request = request; + this.responseBody = responseBody; + } + + public long receivedTime() { + return received; + } + + public boolean wasDisconnected() { + return disconnected; + } + + public ClientRequest request() { + return request; + } + + public Struct responseBody() { + return responseBody; + } + + public boolean hasResponse() { + return responseBody != null; + } + + public long requestLatencyMs() { + return receivedTime() - this.request.createdTime(); + } + + @Override + public String toString() { + return "ClientResponse(received=" + received + + ", disconnected=" + + disconnected + + ", request=" + + request + + ", responseBody=" + + responseBody + + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java new file mode 100644 index 0000000..d304660 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.HashMap; +import java.util.Map; + +/** + * The state of our connection to each node in the cluster. + * + */ +final class ClusterConnectionStates { + private final long reconnectBackoffMs; + private final Map<Integer, NodeConnectionState> nodeState; + + public ClusterConnectionStates(long reconnectBackoffMs) { + this.reconnectBackoffMs = reconnectBackoffMs; + this.nodeState = new HashMap<Integer, NodeConnectionState>(); + } + + /** + * Return true iff we can currently initiate a new connection to the given node. This will be the case if we are not + * connected and haven't been connected for at least the minimum reconnection backoff period. + * @param node The node id to check + * @param now The current time in MS + * @return true if we can initiate a new connection + */ + public boolean canConnect(int node, long now) { + NodeConnectionState state = nodeState.get(node); + if (state == null) + return true; + else + return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs >= this.reconnectBackoffMs; + } + + /** + * Return true if we are disconnected from the given node and can't re-establish a connection yet + * @param node The node to check + * @param now The current time in ms + */ + public boolean isBlackedOut(int node, long now) { + NodeConnectionState state = nodeState.get(node); + if (state == null) + return false; + else + return state.state == ConnectionState.DISCONNECTED && now - state.lastConnectAttemptMs < this.reconnectBackoffMs; + } + + /** + * Enter the connecting state for the given node. + * @param node The id of the node we are connecting to + * @param now The current time. + */ + public void connecting(int node, long now) { + nodeState.put(node, new NodeConnectionState(ConnectionState.CONNECTING, now)); + } + + /** + * Return true iff we have a connection to the give node + * @param node The id of the node to check + */ + public boolean isConnected(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTED; + } + + /** + * Return true iff we are in the process of connecting to the given node + * @param node The id of the node + */ + public boolean isConnecting(int node) { + NodeConnectionState state = nodeState.get(node); + return state != null && state.state == ConnectionState.CONNECTING; + } + + /** + * Enter the connected state for the given node + * @param node The node we have connected to + */ + public void connected(int node) { + nodeState(node).state = ConnectionState.CONNECTED; + } + + /** + * Enter the disconnected state for the given node + * @param node The node we have disconnected from + */ + public void disconnected(int node) { + nodeState(node).state = ConnectionState.DISCONNECTED; + } + + /** + * Get the state of our connection to the given state + * @param node The id of the node + * @return The state of our connection + */ + private NodeConnectionState nodeState(int node) { + NodeConnectionState state = this.nodeState.get(node); + if (state == null) + throw new IllegalStateException("No entry found for node " + node); + return state; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java new file mode 100644 index 0000000..ab7e322 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +/** + * The states of a node connection + */ +enum ConnectionState { + DISCONNECTED, CONNECTING, CONNECTED +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java new file mode 100644 index 0000000..936487b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; + +/** + * The set of requests which have been sent or are being sent but haven't yet received a response + */ +final class InFlightRequests { + + private final int maxInFlightRequestsPerConnection; + private final Map<Integer, Deque<ClientRequest>> requests = new HashMap<Integer, Deque<ClientRequest>>(); + + public InFlightRequests(int maxInFlightRequestsPerConnection) { + this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; + } + + /** + * Add the given request to the queue for the node it was directed to + */ + public void add(ClientRequest request) { + Deque<ClientRequest> reqs = this.requests.get(request.request().destination()); + if (reqs == null) { + reqs = new ArrayDeque<ClientRequest>(); + this.requests.put(request.request().destination(), reqs); + } + reqs.addFirst(request); + } + + /** + * Get the request queue for the given node + */ + private Deque<ClientRequest> requestQueue(int node) { + Deque<ClientRequest> reqs = requests.get(node); + if (reqs == null || reqs.isEmpty()) + throw new IllegalStateException("Response from server for which there are no in-flight requests."); + return reqs; + } + + /** + * Get the oldest request (the one that that will be completed next) for the given node + */ + public ClientRequest completeNext(int node) { + return requestQueue(node).pollLast(); + } + + /** + * Get the last request we sent to the given node (but don't remove it from the queue) + * @param node The node id + */ + public ClientRequest lastSent(int node) { + return requestQueue(node).peekFirst(); + } + + /** + * Complete the last request that was sent to a particular node. + * @param node The node the request was sent to + * @return The request + */ + public ClientRequest completeLastSent(int node) { + return requestQueue(node).pollFirst(); + } + + /** + * Can we send more requests to this node? + * + * @param node Node in question + * @return true iff we have no requests still being sent to the given node + */ + public boolean canSendMore(int node) { + Deque<ClientRequest> queue = requests.get(node); + return queue == null || queue.isEmpty() || + (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); + } + + /** + * Return the number of inflight requests directed at the given node + * @param node The node + * @return The request count. + */ + public int inFlightRequestCount(int node) { + Deque<ClientRequest> queue = requests.get(node); + return queue == null ? 0 : queue.size(); + } + + /** + * Count all in-flight requests for all nodes + */ + public int inFlightRequestCount() { + int total = 0; + for (Deque<ClientRequest> deque : this.requests.values()) + total += deque.size(); + return total; + } + + /** + * Clear out all the in-flight requests for the given node and return them + * + * @param node The node + * @return All the in-flight requests for that node that have been removed + */ + public Iterable<ClientRequest> clearAll(int node) { + Deque<ClientRequest> reqs = requests.get(node); + if (reqs == null) { + return Collections.emptyList(); + } else { + return requests.remove(node); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java new file mode 100644 index 0000000..29658d4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.util.List; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.RequestHeader; + +/** + * The interface for {@link NetworkClient} + */ +public interface KafkaClient { + + /** + * Check if we are currently ready to send another request to the given node but don't attempt to connect if we + * aren't. + * @param node The node to check + * @param now The current timestamp + */ + public boolean isReady(Node node, long now); + + /** + * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a + * node will change only when poll is invoked. + * @param node The node to connect to. + * @param now The current time + * @return true iff we are ready to immediately initiate the sending of another request to the given node. + */ + public boolean ready(Node node, long now); + + /** + * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready + * connections. + * @param requests The requests to send + * @param timeout The maximum amount of time to wait for responses in ms + * @param now The current time in ms + * @throws IllegalStateException If a request is sent to an unready node + */ + public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now); + + /** + * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection, + * but will potentially choose a node for which we don't yet have a connection if all existing connections are in + * use. + * @param now The current time in ms + * @return The node with the fewest in-flight requests. + */ + public Node leastLoadedNode(long now); + + /** + * The number of currently in-flight requests for which we have not yet returned a response + */ + public int inFlightRequestCount(); + + /** + * Generate a request header for the next request + * @param key The API key of the request + */ + public RequestHeader nextRequestHeader(ApiKeys key); + + /** + * Wake up the client if it is currently blocked waiting for I/O + */ + public void wakeup(); + + /** + * Close the client and disconnect from all nodes + */ + public void close(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java new file mode 100644 index 0000000..522881c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import org.apache.kafka.clients.producer.internals.Metadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.NetworkReceive; +import org.apache.kafka.common.network.NetworkSend; +import org.apache.kafka.common.network.Selectable; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A network client for asynchronous request/response network i/o. This is an internal class used to implement the + * user-facing producer and consumer clients. + * <p> + * This class is not thread-safe! + */ +public class NetworkClient implements KafkaClient { + + private static final Logger log = LoggerFactory.getLogger(NetworkClient.class); + + /* the selector used to perform network i/o */ + private final Selectable selector; + + /* the current cluster metadata */ + private final Metadata metadata; + + /* the state of each node's connection */ + private final ClusterConnectionStates connectionStates; + + /* the set of requests currently being sent or awaiting a response */ + private final InFlightRequests inFlightRequests; + + /* the socket send buffer size in bytes */ + private final int socketSendBuffer; + + /* the socket receive size buffer in bytes */ + private final int socketReceiveBuffer; + + /* the client id used to identify this client in requests to the server */ + private final String clientId; + + /* a random offset to use when choosing nodes to avoid having all nodes choose the same node */ + private final int nodeIndexOffset; + + /* the current correlation id to use when sending requests to servers */ + private int correlation; + + /* true iff there is a metadata request that has been sent and for which we have not yet received a response */ + private boolean metadataFetchInProgress; + + public NetworkClient(Selectable selector, + Metadata metadata, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + int socketSendBuffer, + int socketReceiveBuffer) { + this.selector = selector; + this.metadata = metadata; + this.clientId = clientId; + this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection); + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs); + this.socketSendBuffer = socketSendBuffer; + this.socketReceiveBuffer = socketReceiveBuffer; + this.correlation = 0; + this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); + this.metadataFetchInProgress = false; + } + + /** + * Begin connecting to the given node, return true if we are already connected and ready to send to that node. + * @param node The node to check + * @param now The current timestamp + * @return True if we are ready to send to the given node + */ + @Override + public boolean ready(Node node, long now) { + if (isReady(node, now)) + return true; + + if (connectionStates.canConnect(node.id(), now)) + // if we are interested in sending to a node and we don't have a connection to it, initiate one + initiateConnect(node, now); + + return false; + } + + /** + * Check if the node with the given id is ready to send more requests. + * @param nodeId The node id + * @param now The current time in ms + * @return true if the node is ready + */ + @Override + public boolean isReady(Node node, long now) { + return isReady(node.id(), now); + } + + private boolean isReady(int node, long now) { + if (this.metadata.needsUpdate(now)) + // if we need to update our metadata declare all requests unready to metadata requests first priority + return false; + else + // otherwise we are ready if we are connected and can send more requests + return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node); + } + + /** + * Initiate the given requests and check for any new responses, waiting up to the specified time. Requests can only + * be sent for ready nodes. + * @param requests The requests to initiate + * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately + * @param now The current time in milliseconds + * @return The list of responses received + */ + @Override + public List<ClientResponse> poll(List<ClientRequest> requests, long timeout, long now) { + // should we update our metadata? + List<NetworkSend> sends = new ArrayList<NetworkSend>(); + maybeUpdateMetadata(sends, now); + + for (int i = 0; i < requests.size(); i++) { + ClientRequest request = requests.get(i); + int nodeId = request.request().destination(); + if (!isReady(nodeId, now)) + throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); + + this.inFlightRequests.add(request); + sends.add(request.request()); + } + + // do the I/O + try { + this.selector.poll(timeout, sends); + } catch (IOException e) { + log.error("Unexpected error during I/O in producer network thread", e); + } + + List<ClientResponse> responses = new ArrayList<ClientResponse>(); + handleCompletedSends(responses, now); + handleCompletedReceives(responses, now); + handleDisconnections(responses, now); + handleConnections(); + + return responses; + } + + /** + * Get the number of in-flight requests + */ + @Override + public int inFlightRequestCount() { + return this.inFlightRequests.inFlightRequestCount(); + } + + /** + * Generate a request header for the given API key + * @param key The api key + * @return A request header with the appropriate client id and correlation id + */ + @Override + public RequestHeader nextRequestHeader(ApiKeys key) { + return new RequestHeader(key.id, clientId, correlation++); + } + + /** + * Interrupt the client if it is blocked waiting on I/O. + */ + @Override + public void wakeup() { + this.selector.wakeup(); + } + + /** + * Close the network client + */ + @Override + public void close() { + this.selector.close(); + } + + /** + * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will + * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a + * connection if all existing connections are in use. This method will never choose a node for which there is no + * existing connection and from which we have disconnected within the reconnect backoff period. + * @return The node with the fewest in-flight requests. + */ + public Node leastLoadedNode(long now) { + List<Node> nodes = this.metadata.fetch().nodes(); + int inflight = Integer.MAX_VALUE; + Node found = null; + for (int i = 0; i < nodes.size(); i++) { + int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size()); + Node node = nodes.get(idx); + int currInflight = this.inFlightRequests.inFlightRequestCount(node.id()); + if (currInflight == 0 && this.connectionStates.isConnected(node.id())) { + // if we find an established connection with no in-flight requests we can stop right away + return node; + } else if (!this.connectionStates.isBlackedOut(node.id(), now) && currInflight < inflight) { + // otherwise if this is the best we have found so far, record that + inflight = currInflight; + found = node; + } + } + + return found; + } + + /** + * Handle any completed request send. In particular if no response is expected consider the request complete. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedSends(List<ClientResponse> responses, long now) { + // if no response is expected then when the send is completed, return it + for (NetworkSend send : this.selector.completedSends()) { + ClientRequest request = this.inFlightRequests.lastSent(send.destination()); + if (!request.expectResponse()) { + this.inFlightRequests.completeLastSent(send.destination()); + responses.add(new ClientResponse(request, now, false, null)); + } + } + } + + /** + * Handle any completed receives and update the response list with the responses received. + * @param responses The list of responses to update + * @param now The current time + */ + private void handleCompletedReceives(List<ClientResponse> responses, long now) { + for (NetworkReceive receive : this.selector.completedReceives()) { + int source = receive.source(); + ClientRequest req = inFlightRequests.completeNext(source); + ResponseHeader header = ResponseHeader.parse(receive.payload()); + short apiKey = req.request().header().apiKey(); + Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload()); + correlate(req.request().header(), header); + if (apiKey == ApiKeys.METADATA.id) { + handleMetadataResponse(req.request().header(), body, now); + } else { + // need to add body/header to response here + responses.add(new ClientResponse(req, now, false, body)); + } + } + } + + private void handleMetadataResponse(RequestHeader header, Struct body, long now) { + this.metadataFetchInProgress = false; + MetadataResponse response = new MetadataResponse(body); + Cluster cluster = response.cluster(); + // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being + // created which means we will get errors and no nodes until it exists + if (cluster.nodes().size() > 0) + this.metadata.update(cluster, now); + else + log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); + } + + /** + * Handle any disconnected connections + * @param responses The list of responses that completed with the disconnection + * @param now The current time + */ + private void handleDisconnections(List<ClientResponse> responses, long now) { + for (int node : this.selector.disconnected()) { + connectionStates.disconnected(node); + log.debug("Node {} disconnected.", node); + for (ClientRequest request : this.inFlightRequests.clearAll(node)) { + log.trace("Cancelled request {} due to node {} being disconnected", request, node); + ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); + if (requestKey == ApiKeys.METADATA) + metadataFetchInProgress = false; + else + responses.add(new ClientResponse(request, now, true, null)); + } + } + // we got a disconnect so we should probably refresh our metadata and see if that broker is dead + if (this.selector.disconnected().size() > 0) + this.metadata.forceUpdate(); + } + + /** + * Record any newly completed connections + */ + private void handleConnections() { + for (Integer id : this.selector.connected()) { + log.debug("Completed connection to node {}", id); + this.connectionStates.connected(id); + } + } + + /** + * Validate that the response corresponds to the request we expect or else explode + */ + private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { + if (requestHeader.correlationId() != responseHeader.correlationId()) + throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId() + + ") does not match request (" + + requestHeader.correlationId() + + ")"); + } + + /** + * Create a metadata request for the given topics + */ + private ClientRequest metadataRequest(long now, int node, Set<String> topics) { + MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics)); + RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); + return new ClientRequest(now, true, send, null); + } + + /** + * Add a metadata request to the list of sends if we need to make one + */ + private void maybeUpdateMetadata(List<NetworkSend> sends, long now) { + if (this.metadataFetchInProgress || !metadata.needsUpdate(now)) + return; + + Node node = this.leastLoadedNode(now); + if (node == null) + return; + + if (connectionStates.isConnected(node.id()) && inFlightRequests.canSendMore(node.id())) { + Set<String> topics = metadata.topics(); + this.metadataFetchInProgress = true; + ClientRequest metadataRequest = metadataRequest(now, node.id(), topics); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); + sends.add(metadataRequest.request()); + this.inFlightRequests.add(metadataRequest); + } else if (connectionStates.canConnect(node.id(), now)) { + // we don't have a connection to this node right now, make one + initiateConnect(node, now); + } + } + + /** + * Initiate a connection to the given node + */ + private void initiateConnect(Node node, long now) { + try { + log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port()); + selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), this.socketSendBuffer, this.socketReceiveBuffer); + this.connectionStates.connecting(node.id(), now); + } catch (IOException e) { + /* attempt failed, we'll try again after the backoff */ + connectionStates.disconnected(node.id()); + /* maybe the problem is our metadata, update it */ + metadata.forceUpdate(); + log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java new file mode 100644 index 0000000..752a979 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.clients; + +/** + * The state of our connection to a node + */ +final class NodeConnectionState { + + ConnectionState state; + long lastConnectAttemptMs; + + public NodeConnectionState(ConnectionState state, long lastConnectAttempt) { + this.state = state; + this.lastConnectAttemptMs = lastConnectAttempt; + } + + public String toString() { + return "NodeState(" + state + ", " + lastConnectAttemptMs + ")"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index d15562a..00775ab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.Metadata; import org.apache.kafka.clients.producer.internals.Partitioner; @@ -119,19 +120,22 @@ public class KafkaProducer implements Producer { metrics, time); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); - this.sender = new Sender(new Selector(this.metrics, time), + this.metadata.update(Cluster.bootstrap(addresses), 0); + + NetworkClient client = new NetworkClient(new Selector(this.metrics, time), + this.metadata, + clientId, + config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), + config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), + config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), + config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG)); + this.sender = new Sender(client, this.metadata, this.accumulator, - clientId, config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG), - config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)), config.getInt(ProducerConfig.RETRIES_CONFIG), config.getInt(ProducerConfig.TIMEOUT_CONFIG), - config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), - config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION), this.metrics, new SystemTime()); this.ioThread = new KafkaThread("kafka-producer-network-thread", this.sender, true); http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java index f47a461..57bc285 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java @@ -105,8 +105,8 @@ public final class Metadata { * since our last update and either (1) an update has been requested or (2) the current metadata has expired (more * than metadataExpireMs has passed since the last refresh) */ - public synchronized boolean needsUpdate(long nowMs) { - long msSinceLastUpdate = nowMs - this.lastRefreshMs; + public synchronized boolean needsUpdate(long now) { + long msSinceLastUpdate = now - this.lastRefreshMs; boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs; boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= this.metadataExpireMs; return updateAllowed && updateNeeded; @@ -129,9 +129,9 @@ public final class Metadata { /** * Update the cluster metadata */ - public synchronized void update(Cluster cluster, long nowMs) { + public synchronized void update(Cluster cluster, long now) { this.forceUpdate = false; - this.lastRefreshMs = nowMs; + this.lastRefreshMs = now; this.cluster = cluster; notifyAll(); log.debug("Updated cluster metadata to {}", cluster); http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 4010d42..1ed3c28 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,7 +13,15 @@ package org.apache.kafka.clients.producer.internals; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import org.apache.kafka.clients.producer.Callback; @@ -91,21 +99,21 @@ public final class RecordAccumulator { metrics.addMetric("waiting-threads", "The number of user threads blocked waiting for buffer memory to enqueue their records", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return free.queued(); } }); metrics.addMetric("buffer-total-bytes", "The maximum amount of buffer memory the client can use (whether or not it is currently used).", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return free.totalMemory(); } }); metrics.addMetric("buffer-available-bytes", "The total amount of buffer memory that is not being used (either unallocated or in the free list).", new Measurable() { - public double measure(MetricConfig config, long nowMs) { + public double measure(MetricConfig config, long now) { return free.availableMemory(); } }); @@ -163,9 +171,9 @@ public final class RecordAccumulator { /** * Re-enqueue the given record batch in the accumulator to retry */ - public void reenqueue(RecordBatch batch, long nowMs) { + public void reenqueue(RecordBatch batch, long now) { batch.attempts++; - batch.lastAttemptMs = nowMs; + batch.lastAttemptMs = now; Deque<RecordBatch> deque = dequeFor(batch.topicPartition); synchronized (deque) { deque.addFirst(batch); @@ -175,8 +183,8 @@ public final class RecordAccumulator { /** * Get a list of nodes whose partitions are ready to be sent. * <p> - * A destination node is ready to send data if ANY one of its partition is not backing off the send - * and ANY of the following are true : + * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the + * following are true : * <ol> * <li>The record set is full * <li>The record set has sat in the accumulator for at least lingerMs milliseconds @@ -185,7 +193,7 @@ public final class RecordAccumulator { * <li>The accumulator has been closed * </ol> */ - public Set<Node> ready(Cluster cluster, long nowMs) { + public Set<Node> ready(Cluster cluster, long now) { Set<Node> readyNodes = new HashSet<Node>(); boolean exhausted = this.free.queued() > 0; @@ -198,9 +206,9 @@ public final class RecordAccumulator { synchronized (deque) { RecordBatch batch = deque.peekFirst(); if (batch != null) { - boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; + boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > now; boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = nowMs - batch.createdMs >= lingerMs; + boolean expired = now - batch.createdMs >= lingerMs; boolean sendable = full || expired || exhausted || closed; if (sendable && !backingOff) readyNodes.add(leader); @@ -227,18 +235,17 @@ public final class RecordAccumulator { } /** - * Drain all the data for the given nodes and collate them into a list of - * batches that will fit within the specified size on a per-node basis. - * This method attempts to avoid choosing the same topic-node over and over. + * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified + * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over. * * @param cluster The current cluster metadata * @param nodes The list of node to drain * @param maxSize The maximum number of bytes to drain - * @param nowMs The current unix time in milliseconds + * @param now The current unix time in milliseconds * @return A list of {@link RecordBatch} for each node specified with total size less than the requested maxSize. * TODO: There may be a starvation issue due to iteration order */ - public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long nowMs) { + public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); @@ -266,7 +273,7 @@ public final class RecordAccumulator { batch.records.close(); size += batch.records.sizeInBytes(); ready.add(batch); - batch.drainedMs = nowMs; + batch.drainedMs = now; } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 5ee5455..dd0af8a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -42,9 +42,9 @@ public final class RecordBatch { private final ProduceRequestResult produceFuture; private final List<Thunk> thunks; - public RecordBatch(TopicPartition tp, MemoryRecords records, long nowMs) { - this.createdMs = nowMs; - this.lastAttemptMs = nowMs; + public RecordBatch(TopicPartition tp, MemoryRecords records, long now) { + this.createdMs = now; + this.lastAttemptMs = now; this.records = records; this.topicPartition = tp; this.produceFuture = new ProduceRequestResult();