KAFKA-1326 Refactor Sender to support consumer.

Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/548d1ba0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/548d1ba0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/548d1ba0

Branch: refs/heads/trunk
Commit: 548d1ba0939c43fff14531510140e2c641b1caa5
Parents: dcc8840
Author: Jay Kreps <jay.kr...@gmail.com>
Authored: Tue Jun 10 17:41:29 2014 -0700
Committer: Jay Kreps <jay.kr...@gmail.com>
Committed: Tue Jun 10 17:41:29 2014 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java |  61 ++
 .../apache/kafka/clients/ClientResponse.java    |  78 +++
 .../kafka/clients/ClusterConnectionStates.java  | 113 ++++
 .../apache/kafka/clients/ConnectionState.java   |  20 +
 .../apache/kafka/clients/InFlightRequests.java  | 126 ++++
 .../org/apache/kafka/clients/KafkaClient.java   |  83 +++
 .../org/apache/kafka/clients/NetworkClient.java | 383 +++++++++++
 .../kafka/clients/NodeConnectionState.java      |  31 +
 .../kafka/clients/producer/KafkaProducer.java   |  18 +-
 .../clients/producer/internals/Metadata.java    |   8 +-
 .../producer/internals/RecordAccumulator.java   |  41 +-
 .../clients/producer/internals/RecordBatch.java |   6 +-
 .../clients/producer/internals/Sender.java      | 675 +++----------------
 .../apache/kafka/common/metrics/Measurable.java |  28 +-
 .../apache/kafka/common/metrics/stats/Avg.java  |  29 +-
 .../kafka/common/metrics/stats/Count.java       |  29 +-
 .../apache/kafka/common/metrics/stats/Max.java  |  29 +-
 .../apache/kafka/common/metrics/stats/Min.java  |  29 +-
 .../kafka/common/metrics/stats/Percentiles.java |  37 +-
 .../apache/kafka/common/metrics/stats/Rate.java |   8 +-
 .../kafka/common/metrics/stats/SampledStat.java |  18 +-
 .../kafka/common/metrics/stats/Total.java       |  28 +-
 .../kafka/common/network/ByteBufferSend.java    |   2 +-
 .../apache/kafka/common/network/Selector.java   |  36 +-
 .../org/apache/kafka/common/network/Send.java   |  26 +-
 .../kafka/common/protocol/types/Schema.java     |  34 +-
 .../kafka/common/record/MemoryRecords.java      |  44 +-
 .../kafka/common/requests/ProduceRequest.java   |  71 ++
 .../kafka/common/requests/ProduceResponse.java  |  87 +--
 .../org/apache/kafka/clients/MockClient.java    |  96 +++
 .../apache/kafka/clients/NetworkClientTest.java |  99 +++
 .../clients/producer/RecordAccumulatorTest.java |   7 +-
 .../kafka/clients/producer/SenderTest.java      | 132 +---
 .../kafka/common/metrics/MetricsTest.java       |   2 +-
 .../org/apache/kafka/common/utils/MockTime.java |  29 +-
 .../kafka/api/ProducerFailureHandlingTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   1 +
 37 files changed, 1582 insertions(+), 964 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
new file mode 100644
index 0000000..d32c319
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.requests.RequestSend;
+
+/**
+ * A request being sent to the server. This holds both the network send as 
well as the client-level metadata.
+ */
+public final class ClientRequest {
+
+    private final long createdMs;
+    private final boolean expectResponse;
+    private final RequestSend request;
+    private final Object attachment;
+
+    /**
+     * @param createdMs The unix timestamp in milliseconds for the time at 
which this request was created.
+     * @param expectResponse Should we expect a response message or is this 
request complete once it is sent?
+     * @param request The request
+     * @param attachment Associated data with the request
+     */
+    public ClientRequest(long createdMs, boolean expectResponse, RequestSend 
request, Object attachment) {
+        this.createdMs = createdMs;
+        this.attachment = attachment;
+        this.request = request;
+        this.expectResponse = expectResponse;
+    }
+
+    @Override
+    public String toString() {
+        return "ClientRequest(expectResponse=" + expectResponse + ", payload=" 
+ attachment + ", request=" + request + ")";
+    }
+
+    public boolean expectResponse() {
+        return expectResponse;
+    }
+
+    public RequestSend request() {
+        return request;
+    }
+
+    public Object attachment() {
+        return attachment;
+    }
+
+    public long createdTime() {
+        return createdMs;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
new file mode 100644
index 0000000..14ef69a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.protocol.types.Struct;
+
+/**
+ * A response from the server. Contains both the body of the response as well 
as the correlated request that was
+ * originally sent.
+ */
+public class ClientResponse {
+
+    private final long received;
+    private final boolean disconnected;
+    private final ClientRequest request;
+    private final Struct responseBody;
+
+    /**
+     * @param request The original request
+     * @param received The unix timestamp when this response was received
+     * @param disconnected Whether the client disconnected before fully 
reading a response
+     * @param responseBody The response contents (or null) if we disconnected 
or no response was expected
+     */
+    public ClientResponse(ClientRequest request, long received, boolean 
disconnected, Struct responseBody) {
+        super();
+        this.received = received;
+        this.disconnected = disconnected;
+        this.request = request;
+        this.responseBody = responseBody;
+    }
+
+    public long receivedTime() {
+        return received;
+    }
+
+    public boolean wasDisconnected() {
+        return disconnected;
+    }
+
+    public ClientRequest request() {
+        return request;
+    }
+
+    public Struct responseBody() {
+        return responseBody;
+    }
+
+    public boolean hasResponse() {
+        return responseBody != null;
+    }
+
+    public long requestLatencyMs() {
+        return receivedTime() - this.request.createdTime();
+    }
+
+    @Override
+    public String toString() {
+        return "ClientResponse(received=" + received +
+               ", disconnected=" +
+               disconnected +
+               ", request=" +
+               request +
+               ", responseBody=" +
+               responseBody +
+               ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
new file mode 100644
index 0000000..d304660
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The state of our connection to each node in the cluster.
+ * 
+ */
+final class ClusterConnectionStates {
+    private final long reconnectBackoffMs;
+    private final Map<Integer, NodeConnectionState> nodeState;
+
+    public ClusterConnectionStates(long reconnectBackoffMs) {
+        this.reconnectBackoffMs = reconnectBackoffMs;
+        this.nodeState = new HashMap<Integer, NodeConnectionState>();
+    }
+
+    /**
+     * Return true iff we can currently initiate a new connection to the given 
node. This will be the case if we are not
+     * connected and haven't been connected for at least the minimum 
reconnection backoff period.
+     * @param node The node id to check
+     * @param now The current time in MS
+     * @return true if we can initiate a new connection
+     */
+    public boolean canConnect(int node, long now) {
+        NodeConnectionState state = nodeState.get(node);
+        if (state == null)
+            return true;
+        else
+            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs >= this.reconnectBackoffMs;
+    }
+
+    /**
+     * Return true if we are disconnected from the given node and can't 
re-establish a connection yet
+     * @param node The node to check
+     * @param now The current time in ms
+     */
+    public boolean isBlackedOut(int node, long now) {
+        NodeConnectionState state = nodeState.get(node);
+        if (state == null)
+            return false;
+        else
+            return state.state == ConnectionState.DISCONNECTED && now - 
state.lastConnectAttemptMs < this.reconnectBackoffMs;
+    }
+
+    /**
+     * Enter the connecting state for the given node.
+     * @param node The id of the node we are connecting to
+     * @param now The current time.
+     */
+    public void connecting(int node, long now) {
+        nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
+    }
+
+    /**
+     * Return true iff we have a connection to the give node
+     * @param node The id of the node to check
+     */
+    public boolean isConnected(int node) {
+        NodeConnectionState state = nodeState.get(node);
+        return state != null && state.state == ConnectionState.CONNECTED;
+    }
+
+    /**
+     * Return true iff we are in the process of connecting to the given node
+     * @param node The id of the node
+     */
+    public boolean isConnecting(int node) {
+        NodeConnectionState state = nodeState.get(node);
+        return state != null && state.state == ConnectionState.CONNECTING;
+    }
+
+    /**
+     * Enter the connected state for the given node
+     * @param node The node we have connected to
+     */
+    public void connected(int node) {
+        nodeState(node).state = ConnectionState.CONNECTED;
+    }
+
+    /**
+     * Enter the disconnected state for the given node
+     * @param node The node we have disconnected from
+     */
+    public void disconnected(int node) {
+        nodeState(node).state = ConnectionState.DISCONNECTED;
+    }
+
+    /**
+     * Get the state of our connection to the given state
+     * @param node The id of the node
+     * @return The state of our connection
+     */
+    private NodeConnectionState nodeState(int node) {
+        NodeConnectionState state = this.nodeState.get(node);
+        if (state == null)
+            throw new IllegalStateException("No entry found for node " + node);
+        return state;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
new file mode 100644
index 0000000..ab7e322
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/ConnectionState.java
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+/**
+ * The states of a node connection
+ */
+enum ConnectionState {
+    DISCONNECTED, CONNECTING, CONNECTED
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
new file mode 100644
index 0000000..936487b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The set of requests which have been sent or are being sent but haven't yet 
received a response
+ */
+final class InFlightRequests {
+
+    private final int maxInFlightRequestsPerConnection;
+    private final Map<Integer, Deque<ClientRequest>> requests = new 
HashMap<Integer, Deque<ClientRequest>>();
+
+    public InFlightRequests(int maxInFlightRequestsPerConnection) {
+        this.maxInFlightRequestsPerConnection = 
maxInFlightRequestsPerConnection;
+    }
+
+    /**
+     * Add the given request to the queue for the node it was directed to
+     */
+    public void add(ClientRequest request) {
+        Deque<ClientRequest> reqs = 
this.requests.get(request.request().destination());
+        if (reqs == null) {
+            reqs = new ArrayDeque<ClientRequest>();
+            this.requests.put(request.request().destination(), reqs);
+        }
+        reqs.addFirst(request);
+    }
+
+    /**
+     * Get the request queue for the given node
+     */
+    private Deque<ClientRequest> requestQueue(int node) {
+        Deque<ClientRequest> reqs = requests.get(node);
+        if (reqs == null || reqs.isEmpty())
+            throw new IllegalStateException("Response from server for which 
there are no in-flight requests.");
+        return reqs;
+    }
+
+    /**
+     * Get the oldest request (the one that that will be completed next) for 
the given node
+     */
+    public ClientRequest completeNext(int node) {
+        return requestQueue(node).pollLast();
+    }
+
+    /**
+     * Get the last request we sent to the given node (but don't remove it 
from the queue)
+     * @param node The node id
+     */
+    public ClientRequest lastSent(int node) {
+        return requestQueue(node).peekFirst();
+    }
+
+    /**
+     * Complete the last request that was sent to a particular node.
+     * @param node The node the request was sent to
+     * @return The request
+     */
+    public ClientRequest completeLastSent(int node) {
+        return requestQueue(node).pollFirst();
+    }
+
+    /**
+     * Can we send more requests to this node?
+     * 
+     * @param node Node in question
+     * @return true iff we have no requests still being sent to the given node
+     */
+    public boolean canSendMore(int node) {
+        Deque<ClientRequest> queue = requests.get(node);
+        return queue == null || queue.isEmpty() ||
+               (queue.peekFirst().request().completed() && queue.size() < 
this.maxInFlightRequestsPerConnection);
+    }
+
+    /**
+     * Return the number of inflight requests directed at the given node
+     * @param node The node
+     * @return The request count.
+     */
+    public int inFlightRequestCount(int node) {
+        Deque<ClientRequest> queue = requests.get(node);
+        return queue == null ? 0 : queue.size();
+    }
+
+    /**
+     * Count all in-flight requests for all nodes
+     */
+    public int inFlightRequestCount() {
+        int total = 0;
+        for (Deque<ClientRequest> deque : this.requests.values())
+            total += deque.size();
+        return total;
+    }
+
+    /**
+     * Clear out all the in-flight requests for the given node and return them
+     * 
+     * @param node The node
+     * @return All the in-flight requests for that node that have been removed
+     */
+    public Iterable<ClientRequest> clearAll(int node) {
+        Deque<ClientRequest> reqs = requests.get(node);
+        if (reqs == null) {
+            return Collections.emptyList();
+        } else {
+            return requests.remove(node);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
new file mode 100644
index 0000000..29658d4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.List;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.RequestHeader;
+
+/**
+ * The interface for {@link NetworkClient}
+ */
+public interface KafkaClient {
+
+    /**
+     * Check if we are currently ready to send another request to the given 
node but don't attempt to connect if we
+     * aren't.
+     * @param node The node to check
+     * @param now The current timestamp
+     */
+    public boolean isReady(Node node, long now);
+
+    /**
+     * Initiate a connection to the given node (if necessary), and return true 
if already connected. The readiness of a
+     * node will change only when poll is invoked.
+     * @param node The node to connect to.
+     * @param now The current time
+     * @return true iff we are ready to immediately initiate the sending of 
another request to the given node.
+     */
+    public boolean ready(Node node, long now);
+
+    /**
+     * Initiate the sending of the given requests and return any completed 
responses. Requests can only be sent on ready
+     * connections.
+     * @param requests The requests to send
+     * @param timeout The maximum amount of time to wait for responses in ms
+     * @param now The current time in ms
+     * @throws IllegalStateException If a request is sent to an unready node
+     */
+    public List<ClientResponse> poll(List<ClientRequest> requests, long 
timeout, long now);
+
+    /**
+     * Choose the node with the fewest outstanding requests. This method will 
prefer a node with an existing connection,
+     * but will potentially choose a node for which we don't yet have a 
connection if all existing connections are in
+     * use.
+     * @param now The current time in ms
+     * @return The node with the fewest in-flight requests.
+     */
+    public Node leastLoadedNode(long now);
+
+    /**
+     * The number of currently in-flight requests for which we have not yet 
returned a response
+     */
+    public int inFlightRequestCount();
+
+    /**
+     * Generate a request header for the next request
+     * @param key The API key of the request
+     */
+    public RequestHeader nextRequestHeader(ApiKeys key);
+
+    /**
+     * Wake up the client if it is currently blocked waiting for I/O
+     */
+    public void wakeup();
+
+    /**
+     * Close the client and disconnect from all nodes
+     */
+    public void close();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
new file mode 100644
index 0000000..522881c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.internals.Metadata;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.network.NetworkSend;
+import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ProtoUtils;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.requests.RequestSend;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A network client for asynchronous request/response network i/o. This is an 
internal class used to implement the
+ * user-facing producer and consumer clients.
+ * <p>
+ * This class is not thread-safe!
+ */
+public class NetworkClient implements KafkaClient {
+
+    private static final Logger log = 
LoggerFactory.getLogger(NetworkClient.class);
+
+    /* the selector used to perform network i/o */
+    private final Selectable selector;
+
+    /* the current cluster metadata */
+    private final Metadata metadata;
+
+    /* the state of each node's connection */
+    private final ClusterConnectionStates connectionStates;
+
+    /* the set of requests currently being sent or awaiting a response */
+    private final InFlightRequests inFlightRequests;
+
+    /* the socket send buffer size in bytes */
+    private final int socketSendBuffer;
+
+    /* the socket receive size buffer in bytes */
+    private final int socketReceiveBuffer;
+
+    /* the client id used to identify this client in requests to the server */
+    private final String clientId;
+
+    /* a random offset to use when choosing nodes to avoid having all nodes 
choose the same node */
+    private final int nodeIndexOffset;
+
+    /* the current correlation id to use when sending requests to servers */
+    private int correlation;
+
+    /* true iff there is a metadata request that has been sent and for which 
we have not yet received a response */
+    private boolean metadataFetchInProgress;
+
+    public NetworkClient(Selectable selector,
+                         Metadata metadata,
+                         String clientId,
+                         int maxInFlightRequestsPerConnection,
+                         long reconnectBackoffMs,
+                         int socketSendBuffer,
+                         int socketReceiveBuffer) {
+        this.selector = selector;
+        this.metadata = metadata;
+        this.clientId = clientId;
+        this.inFlightRequests = new 
InFlightRequests(maxInFlightRequestsPerConnection);
+        this.connectionStates = new 
ClusterConnectionStates(reconnectBackoffMs);
+        this.socketSendBuffer = socketSendBuffer;
+        this.socketReceiveBuffer = socketReceiveBuffer;
+        this.correlation = 0;
+        this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
+        this.metadataFetchInProgress = false;
+    }
+
+    /**
+     * Begin connecting to the given node, return true if we are already 
connected and ready to send to that node.
+     * @param node The node to check
+     * @param now The current timestamp
+     * @return True if we are ready to send to the given node
+     */
+    @Override
+    public boolean ready(Node node, long now) {
+        if (isReady(node, now))
+            return true;
+
+        if (connectionStates.canConnect(node.id(), now))
+            // if we are interested in sending to a node and we don't have a 
connection to it, initiate one
+            initiateConnect(node, now);
+
+        return false;
+    }
+
+    /**
+     * Check if the node with the given id is ready to send more requests.
+     * @param nodeId The node id
+     * @param now The current time in ms
+     * @return true if the node is ready
+     */
+    @Override
+    public boolean isReady(Node node, long now) {
+        return isReady(node.id(), now);
+    }
+
+    private boolean isReady(int node, long now) {
+        if (this.metadata.needsUpdate(now))
+            // if we need to update our metadata declare all requests unready 
to metadata requests first priority
+            return false;
+        else
+            // otherwise we are ready if we are connected and can send more 
requests
+            return connectionStates.isConnected(node) && 
inFlightRequests.canSendMore(node);
+    }
+
+    /**
+     * Initiate the given requests and check for any new responses, waiting up 
to the specified time. Requests can only
+     * be sent for ready nodes.
+     * @param requests The requests to initiate
+     * @param timeout The maximum amount of time to wait (in ms) for responses 
if there are none immediately
+     * @param now The current time in milliseconds
+     * @return The list of responses received
+     */
+    @Override
+    public List<ClientResponse> poll(List<ClientRequest> requests, long 
timeout, long now) {
+        // should we update our metadata?
+        List<NetworkSend> sends = new ArrayList<NetworkSend>();
+        maybeUpdateMetadata(sends, now);
+
+        for (int i = 0; i < requests.size(); i++) {
+            ClientRequest request = requests.get(i);
+            int nodeId = request.request().destination();
+            if (!isReady(nodeId, now))
+                throw new IllegalStateException("Attempt to send a request to 
node " + nodeId + " which is not ready.");
+
+            this.inFlightRequests.add(request);
+            sends.add(request.request());
+        }
+
+        // do the I/O
+        try {
+            this.selector.poll(timeout, sends);
+        } catch (IOException e) {
+            log.error("Unexpected error during I/O in producer network 
thread", e);
+        }
+
+        List<ClientResponse> responses = new ArrayList<ClientResponse>();
+        handleCompletedSends(responses, now);
+        handleCompletedReceives(responses, now);
+        handleDisconnections(responses, now);
+        handleConnections();
+
+        return responses;
+    }
+
+    /**
+     * Get the number of in-flight requests
+     */
+    @Override
+    public int inFlightRequestCount() {
+        return this.inFlightRequests.inFlightRequestCount();
+    }
+
+    /**
+     * Generate a request header for the given API key
+     * @param key The api key
+     * @return A request header with the appropriate client id and correlation 
id
+     */
+    @Override
+    public RequestHeader nextRequestHeader(ApiKeys key) {
+        return new RequestHeader(key.id, clientId, correlation++);
+    }
+
+    /**
+     * Interrupt the client if it is blocked waiting on I/O.
+     */
+    @Override
+    public void wakeup() {
+        this.selector.wakeup();
+    }
+
+    /**
+     * Close the network client
+     */
+    @Override
+    public void close() {
+        this.selector.close();
+    }
+
+    /**
+     * Choose the node with the fewest outstanding requests which is at least 
eligible for connection. This method will
+     * prefer a node with an existing connection, but will potentially choose 
a node for which we don't yet have a
+     * connection if all existing connections are in use. This method will 
never choose a node for which there is no
+     * existing connection and from which we have disconnected within the 
reconnect backoff period.
+     * @return The node with the fewest in-flight requests.
+     */
+    public Node leastLoadedNode(long now) {
+        List<Node> nodes = this.metadata.fetch().nodes();
+        int inflight = Integer.MAX_VALUE;
+        Node found = null;
+        for (int i = 0; i < nodes.size(); i++) {
+            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
+            Node node = nodes.get(idx);
+            int currInflight = 
this.inFlightRequests.inFlightRequestCount(node.id());
+            if (currInflight == 0 && 
this.connectionStates.isConnected(node.id())) {
+                // if we find an established connection with no in-flight 
requests we can stop right away
+                return node;
+            } else if (!this.connectionStates.isBlackedOut(node.id(), now) && 
currInflight < inflight) {
+                // otherwise if this is the best we have found so far, record 
that
+                inflight = currInflight;
+                found = node;
+            }
+        }
+
+        return found;
+    }
+
+    /**
+     * Handle any completed request send. In particular if no response is 
expected consider the request complete.
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleCompletedSends(List<ClientResponse> responses, long 
now) {
+        // if no response is expected then when the send is completed, return 
it
+        for (NetworkSend send : this.selector.completedSends()) {
+            ClientRequest request = 
this.inFlightRequests.lastSent(send.destination());
+            if (!request.expectResponse()) {
+                this.inFlightRequests.completeLastSent(send.destination());
+                responses.add(new ClientResponse(request, now, false, null));
+            }
+        }
+    }
+
+    /**
+     * Handle any completed receives and update the response list with the 
responses received.
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleCompletedReceives(List<ClientResponse> responses, long 
now) {
+        for (NetworkReceive receive : this.selector.completedReceives()) {
+            int source = receive.source();
+            ClientRequest req = inFlightRequests.completeNext(source);
+            ResponseHeader header = ResponseHeader.parse(receive.payload());
+            short apiKey = req.request().header().apiKey();
+            Struct body = (Struct) 
ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
+            correlate(req.request().header(), header);
+            if (apiKey == ApiKeys.METADATA.id) {
+                handleMetadataResponse(req.request().header(), body, now);
+            } else {
+                // need to add body/header to response here
+                responses.add(new ClientResponse(req, now, false, body));
+            }
+        }
+    }
+
+    private void handleMetadataResponse(RequestHeader header, Struct body, 
long now) {
+        this.metadataFetchInProgress = false;
+        MetadataResponse response = new MetadataResponse(body);
+        Cluster cluster = response.cluster();
+        // don't update the cluster if there are no valid nodes...the topic we 
want may still be in the process of being
+        // created which means we will get errors and no nodes until it exists
+        if (cluster.nodes().size() > 0)
+            this.metadata.update(cluster, now);
+        else
+            log.trace("Ignoring empty metadata response with correlation id 
{}.", header.correlationId());
+    }
+
+    /**
+     * Handle any disconnected connections
+     * @param responses The list of responses that completed with the 
disconnection
+     * @param now The current time
+     */
+    private void handleDisconnections(List<ClientResponse> responses, long 
now) {
+        for (int node : this.selector.disconnected()) {
+            connectionStates.disconnected(node);
+            log.debug("Node {} disconnected.", node);
+            for (ClientRequest request : this.inFlightRequests.clearAll(node)) 
{
+                log.trace("Cancelled request {} due to node {} being 
disconnected", request, node);
+                ApiKeys requestKey = 
ApiKeys.forId(request.request().header().apiKey());
+                if (requestKey == ApiKeys.METADATA)
+                    metadataFetchInProgress = false;
+                else
+                    responses.add(new ClientResponse(request, now, true, 
null));
+            }
+        }
+        // we got a disconnect so we should probably refresh our metadata and 
see if that broker is dead
+        if (this.selector.disconnected().size() > 0)
+            this.metadata.forceUpdate();
+    }
+
+    /**
+     * Record any newly completed connections
+     */
+    private void handleConnections() {
+        for (Integer id : this.selector.connected()) {
+            log.debug("Completed connection to node {}", id);
+            this.connectionStates.connected(id);
+        }
+    }
+
+    /**
+     * Validate that the response corresponds to the request we expect or else 
explode
+     */
+    private void correlate(RequestHeader requestHeader, ResponseHeader 
responseHeader) {
+        if (requestHeader.correlationId() != responseHeader.correlationId())
+            throw new IllegalStateException("Correlation id for response (" + 
responseHeader.correlationId() +
+                                            ") does not match request (" +
+                                            requestHeader.correlationId() +
+                                            ")");
+    }
+
+    /**
+     * Create a metadata request for the given topics
+     */
+    private ClientRequest metadataRequest(long now, int node, Set<String> 
topics) {
+        MetadataRequest metadata = new MetadataRequest(new 
ArrayList<String>(topics));
+        RequestSend send = new RequestSend(node, 
nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
+        return new ClientRequest(now, true, send, null);
+    }
+
+    /**
+     * Add a metadata request to the list of sends if we need to make one
+     */
+    private void maybeUpdateMetadata(List<NetworkSend> sends, long now) {
+        if (this.metadataFetchInProgress || !metadata.needsUpdate(now))
+            return;
+
+        Node node = this.leastLoadedNode(now);
+        if (node == null)
+            return;
+
+        if (connectionStates.isConnected(node.id()) && 
inFlightRequests.canSendMore(node.id())) {
+            Set<String> topics = metadata.topics();
+            this.metadataFetchInProgress = true;
+            ClientRequest metadataRequest = metadataRequest(now, node.id(), 
topics);
+            log.debug("Sending metadata request {} to node {}", 
metadataRequest, node.id());
+            sends.add(metadataRequest.request());
+            this.inFlightRequests.add(metadataRequest);
+        } else if (connectionStates.canConnect(node.id(), now)) {
+            // we don't have a connection to this node right now, make one
+            initiateConnect(node, now);
+        }
+    }
+
+    /**
+     * Initiate a connection to the given node
+     */
+    private void initiateConnect(Node node, long now) {
+        try {
+            log.debug("Initiating connection to node {} at {}:{}.", node.id(), 
node.host(), node.port());
+            selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
+            this.connectionStates.connecting(node.id(), now);
+        } catch (IOException e) {
+            /* attempt failed, we'll try again after the backoff */
+            connectionStates.disconnected(node.id());
+            /* maybe the problem is our metadata, update it */
+            metadata.forceUpdate();
+            log.debug("Error connecting to node {} at {}:{}:", node.id(), 
node.host(), node.port(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java
new file mode 100644
index 0000000..752a979
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+/**
+ * The state of our connection to a node
+ */
+final class NodeConnectionState {
+
+    ConnectionState state;
+    long lastConnectAttemptMs;
+
+    public NodeConnectionState(ConnectionState state, long lastConnectAttempt) 
{
+        this.state = state;
+        this.lastConnectAttemptMs = lastConnectAttempt;
+    }
+
+    public String toString() {
+        return "NodeState(" + state + ", " + lastConnectAttemptMs + ")";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index d15562a..00775ab 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.Metadata;
 import org.apache.kafka.clients.producer.internals.Partitioner;
@@ -119,19 +120,22 @@ public class KafkaProducer implements Producer {
                                                  metrics,
                                                  time);
         List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
-        this.metadata.update(Cluster.bootstrap(addresses), 
time.milliseconds());
-        this.sender = new Sender(new Selector(this.metrics, time),
+        this.metadata.update(Cluster.bootstrap(addresses), 0);
+
+        NetworkClient client = new NetworkClient(new Selector(this.metrics, 
time),
+                                                 this.metadata,
+                                                 clientId,
+                                                 
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+                                                 
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                                                 
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
+                                                 
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG));
+        this.sender = new Sender(client,
                                  this.metadata,
                                  this.accumulator,
-                                 clientId,
                                  
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
-                                 
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                                  (short) 
parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                                  config.getInt(ProducerConfig.RETRIES_CONFIG),
                                  config.getInt(ProducerConfig.TIMEOUT_CONFIG),
-                                 
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
-                                 
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
-                                 
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                                  this.metrics,
                                  new SystemTime());
         this.ioThread = new KafkaThread("kafka-producer-network-thread", 
this.sender, true);

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
index f47a461..57bc285 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
@@ -105,8 +105,8 @@ public final class Metadata {
      * since our last update and either (1) an update has been requested or 
(2) the current metadata has expired (more
      * than metadataExpireMs has passed since the last refresh)
      */
-    public synchronized boolean needsUpdate(long nowMs) {
-        long msSinceLastUpdate = nowMs - this.lastRefreshMs;
+    public synchronized boolean needsUpdate(long now) {
+        long msSinceLastUpdate = now - this.lastRefreshMs;
         boolean updateAllowed = msSinceLastUpdate >= this.refreshBackoffMs;
         boolean updateNeeded = this.forceUpdate || msSinceLastUpdate >= 
this.metadataExpireMs;
         return updateAllowed && updateNeeded;
@@ -129,9 +129,9 @@ public final class Metadata {
     /**
      * Update the cluster metadata
      */
-    public synchronized void update(Cluster cluster, long nowMs) {
+    public synchronized void update(Cluster cluster, long now) {
         this.forceUpdate = false;
-        this.lastRefreshMs = nowMs;
+        this.lastRefreshMs = now;
         this.cluster = cluster;
         notifyAll();
         log.debug("Updated cluster metadata to {}", cluster);

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 4010d42..1ed3c28 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -13,7 +13,15 @@
 package org.apache.kafka.clients.producer.internals;
 
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.kafka.clients.producer.Callback;
@@ -91,21 +99,21 @@ public final class RecordAccumulator {
         metrics.addMetric("waiting-threads",
                           "The number of user threads blocked waiting for 
buffer memory to enqueue their records",
                           new Measurable() {
-                              public double measure(MetricConfig config, long 
nowMs) {
+                              public double measure(MetricConfig config, long 
now) {
                                   return free.queued();
                               }
                           });
         metrics.addMetric("buffer-total-bytes",
                           "The maximum amount of buffer memory the client can 
use (whether or not it is currently used).",
                           new Measurable() {
-                              public double measure(MetricConfig config, long 
nowMs) {
+                              public double measure(MetricConfig config, long 
now) {
                                   return free.totalMemory();
                               }
                           });
         metrics.addMetric("buffer-available-bytes",
                           "The total amount of buffer memory that is not being 
used (either unallocated or in the free list).",
                           new Measurable() {
-                              public double measure(MetricConfig config, long 
nowMs) {
+                              public double measure(MetricConfig config, long 
now) {
                                   return free.availableMemory();
                               }
                           });
@@ -163,9 +171,9 @@ public final class RecordAccumulator {
     /**
      * Re-enqueue the given record batch in the accumulator to retry
      */
-    public void reenqueue(RecordBatch batch, long nowMs) {
+    public void reenqueue(RecordBatch batch, long now) {
         batch.attempts++;
-        batch.lastAttemptMs = nowMs;
+        batch.lastAttemptMs = now;
         Deque<RecordBatch> deque = dequeFor(batch.topicPartition);
         synchronized (deque) {
             deque.addFirst(batch);
@@ -175,8 +183,8 @@ public final class RecordAccumulator {
     /**
      * Get a list of nodes whose partitions are ready to be sent.
      * <p>
-     * A destination node is ready to send data if ANY one of its partition is 
not backing off the send
-     * and ANY of the following are true :
+     * A destination node is ready to send data if ANY one of its partition is 
not backing off the send and ANY of the
+     * following are true :
      * <ol>
      * <li>The record set is full
      * <li>The record set has sat in the accumulator for at least lingerMs 
milliseconds
@@ -185,7 +193,7 @@ public final class RecordAccumulator {
      * <li>The accumulator has been closed
      * </ol>
      */
-    public Set<Node> ready(Cluster cluster, long nowMs) {
+    public Set<Node> ready(Cluster cluster, long now) {
         Set<Node> readyNodes = new HashSet<Node>();
         boolean exhausted = this.free.queued() > 0;
 
@@ -198,9 +206,9 @@ public final class RecordAccumulator {
                 synchronized (deque) {
                     RecordBatch batch = deque.peekFirst();
                     if (batch != null) {
-                        boolean backingOff = batch.attempts > 0 && 
batch.lastAttemptMs + retryBackoffMs > nowMs;
+                        boolean backingOff = batch.attempts > 0 && 
batch.lastAttemptMs + retryBackoffMs > now;
                         boolean full = deque.size() > 1 || 
batch.records.isFull();
-                        boolean expired = nowMs - batch.createdMs >= lingerMs;
+                        boolean expired = now - batch.createdMs >= lingerMs;
                         boolean sendable = full || expired || exhausted || 
closed;
                         if (sendable && !backingOff)
                             readyNodes.add(leader);
@@ -227,18 +235,17 @@ public final class RecordAccumulator {
     }
 
     /**
-     * Drain all the data for the given nodes and collate them into a list of
-     * batches that will fit within the specified size on a per-node basis.
-     * This method attempts to avoid choosing the same topic-node over and 
over.
+     * Drain all the data for the given nodes and collate them into a list of 
batches that will fit within the specified
+     * size on a per-node basis. This method attempts to avoid choosing the 
same topic-node over and over.
      * 
      * @param cluster The current cluster metadata
      * @param nodes The list of node to drain
      * @param maxSize The maximum number of bytes to drain
-     * @param nowMs The current unix time in milliseconds
+     * @param now The current unix time in milliseconds
      * @return A list of {@link RecordBatch} for each node specified with 
total size less than the requested maxSize.
      *         TODO: There may be a starvation issue due to iteration order
      */
-    public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> 
nodes, int maxSize, long nowMs) {
+    public Map<Integer, List<RecordBatch>> drain(Cluster cluster, Set<Node> 
nodes, int maxSize, long now) {
         if (nodes.isEmpty())
             return Collections.emptyMap();
 
@@ -266,7 +273,7 @@ public final class RecordAccumulator {
                                 batch.records.close();
                                 size += batch.records.sizeInBytes();
                                 ready.add(batch);
-                                batch.drainedMs = nowMs;
+                                batch.drainedMs = now;
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/548d1ba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
index 5ee5455..dd0af8a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
@@ -42,9 +42,9 @@ public final class RecordBatch {
     private final ProduceRequestResult produceFuture;
     private final List<Thunk> thunks;
 
-    public RecordBatch(TopicPartition tp, MemoryRecords records, long nowMs) {
-        this.createdMs = nowMs;
-        this.lastAttemptMs = nowMs;
+    public RecordBatch(TopicPartition tp, MemoryRecords records, long now) {
+        this.createdMs = now;
+        this.lastAttemptMs = now;
         this.records = records;
         this.topicPartition = tp;
         this.produceFuture = new ProduceRequestResult();

Reply via email to