KAFKA-2066; Use client-side FetchRequest/FetchResponse on server Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Jun Rao <jun...@gmail.com> Closes #2069 from hachikuji/KAFKA-2066 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3b4c3479 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3b4c3479 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3b4c3479 Branch: refs/heads/trunk Commit: 3b4c347949c02b1e2b1dd473deda0f8d2304d027 Parents: 1f1d450 Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Nov 14 16:31:04 2016 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Nov 14 16:31:04 2016 -0800 ---------------------------------------------------------------------- checkstyle/import-control.xml | 2 + .../org/apache/kafka/clients/ClientRequest.java | 65 +++--- .../apache/kafka/clients/ClientResponse.java | 56 +++-- .../apache/kafka/clients/InFlightRequests.java | 46 ++-- .../org/apache/kafka/clients/KafkaClient.java | 1 - .../kafka/clients/ManualMetadataUpdater.java | 11 +- .../apache/kafka/clients/MetadataUpdater.java | 10 +- .../org/apache/kafka/clients/NetworkClient.java | 164 ++++++++------ .../consumer/internals/AbstractCoordinator.java | 33 +-- .../consumer/internals/ConsumerCoordinator.java | 12 - .../internals/ConsumerNetworkClient.java | 30 ++- .../clients/consumer/internals/Fetcher.java | 11 +- .../clients/producer/internals/Sender.java | 68 +++--- .../kafka/common/network/ByteBufferSend.java | 5 +- .../kafka/common/network/NetworkSend.java | 4 +- .../apache/kafka/common/protocol/ApiKeys.java | 2 +- .../apache/kafka/common/protocol/Protocol.java | 21 +- .../kafka/common/protocol/types/ArrayOf.java | 13 +- .../kafka/common/protocol/types/Schema.java | 18 +- .../kafka/common/protocol/types/Struct.java | 16 +- .../kafka/common/protocol/types/Type.java | 48 ++++ .../apache/kafka/common/record/FileRecords.java | 126 +++++++++++ .../kafka/common/record/LogInputStream.java | 35 +++ .../kafka/common/record/MemoryRecords.java | 186 ++++------------ .../org/apache/kafka/common/record/Records.java | 15 ++ .../kafka/common/record/RecordsIterator.java | 170 ++++++++++++++ .../kafka/common/requests/AbstractRequest.java | 8 +- .../requests/AbstractRequestResponse.java | 9 +- .../kafka/common/requests/AbstractResponse.java | 86 ++++++++ .../common/requests/ApiVersionsRequest.java | 2 +- .../common/requests/ApiVersionsResponse.java | 2 +- .../requests/ControlledShutdownRequest.java | 2 +- .../requests/ControlledShutdownResponse.java | 2 +- .../common/requests/CreateTopicsRequest.java | 2 +- .../common/requests/CreateTopicsResponse.java | 2 +- .../common/requests/DeleteTopicsRequest.java | 2 +- .../common/requests/DeleteTopicsResponse.java | 2 +- .../common/requests/DescribeGroupsRequest.java | 2 +- .../common/requests/DescribeGroupsResponse.java | 2 +- .../kafka/common/requests/FetchRequest.java | 17 +- .../kafka/common/requests/FetchResponse.java | 163 ++++++++++---- .../requests/GroupCoordinatorRequest.java | 2 +- .../requests/GroupCoordinatorResponse.java | 2 +- .../kafka/common/requests/HeartbeatRequest.java | 2 +- .../common/requests/HeartbeatResponse.java | 2 +- .../kafka/common/requests/JoinGroupRequest.java | 2 +- .../common/requests/JoinGroupResponse.java | 2 +- .../common/requests/LeaderAndIsrRequest.java | 2 +- .../common/requests/LeaderAndIsrResponse.java | 2 +- .../common/requests/LeaveGroupRequest.java | 2 +- .../common/requests/LeaveGroupResponse.java | 2 +- .../common/requests/ListGroupsRequest.java | 2 +- .../common/requests/ListGroupsResponse.java | 2 +- .../common/requests/ListOffsetRequest.java | 2 +- .../common/requests/ListOffsetResponse.java | 2 +- .../kafka/common/requests/MetadataRequest.java | 2 +- .../kafka/common/requests/MetadataResponse.java | 2 +- .../common/requests/OffsetCommitRequest.java | 8 +- .../common/requests/OffsetCommitResponse.java | 2 +- .../common/requests/OffsetFetchRequest.java | 10 +- .../common/requests/OffsetFetchResponse.java | 2 +- .../kafka/common/requests/ProduceRequest.java | 31 +-- .../kafka/common/requests/ProduceResponse.java | 8 +- .../kafka/common/requests/RecordsSend.java | 77 +++++++ .../kafka/common/requests/RequestSend.java | 55 ----- .../kafka/common/requests/ResponseSend.java | 41 ---- .../common/requests/SaslHandshakeRequest.java | 2 +- .../common/requests/SaslHandshakeResponse.java | 2 +- .../common/requests/StopReplicaRequest.java | 2 +- .../common/requests/StopReplicaResponse.java | 2 +- .../kafka/common/requests/SyncGroupRequest.java | 2 +- .../common/requests/SyncGroupResponse.java | 2 +- .../common/requests/UpdateMetadataRequest.java | 2 +- .../common/requests/UpdateMetadataResponse.java | 2 +- .../authenticator/SaslClientAuthenticator.java | 57 +++-- .../authenticator/SaslServerAuthenticator.java | 71 +++--- .../kafka/common/utils/AbstractIterator.java | 4 +- .../org/apache/kafka/clients/MockClient.java | 101 ++++----- .../apache/kafka/clients/NetworkClientTest.java | 31 ++- .../clients/consumer/KafkaConsumerTest.java | 64 +++--- .../internals/AbstractCoordinatorTest.java | 64 +++--- .../internals/ConsumerCoordinatorTest.java | 85 ++++--- .../internals/ConsumerNetworkClientTest.java | 10 +- .../clients/consumer/internals/FetcherTest.java | 40 ++-- .../clients/producer/internals/SenderTest.java | 40 ++-- .../common/requests/RequestResponseTest.java | 174 +++++++++++---- .../authenticator/SaslAuthenticatorTest.java | 54 ++--- .../distributed/WorkerCoordinatorTest.java | 41 ++-- .../main/scala/kafka/admin/AdminClient.scala | 18 +- .../src/main/scala/kafka/api/FetchRequest.scala | 3 +- .../main/scala/kafka/api/FetchResponse.scala | 3 +- .../kafka/api/GenericRequestAndHeader.scala | 5 +- .../kafka/api/GenericResponseAndHeader.scala | 5 +- .../main/scala/kafka/api/ProducerRequest.scala | 4 +- .../scala/kafka/api/RequestOrResponse.scala | 2 +- .../controller/ControllerChannelManager.scala | 59 +++-- .../kafka/controller/KafkaController.scala | 8 +- .../kafka/controller/TopicDeletionManager.scala | 7 +- .../main/scala/kafka/log/FileMessageSet.scala | 35 +-- .../kafka/message/ByteBufferMessageSet.scala | 17 +- .../main/scala/kafka/message/MessageSet.scala | 12 +- .../scala/kafka/network/RequestChannel.scala | 45 ++-- .../main/scala/kafka/server/DelayedFetch.scala | 9 +- .../src/main/scala/kafka/server/KafkaApis.scala | 219 +++++++++---------- .../main/scala/kafka/server/KafkaServer.scala | 58 +++-- .../kafka/server/ReplicaFetcherThread.scala | 24 +- .../scala/kafka/server/ReplicaManager.scala | 31 +-- .../kafka/server/ReplicationQuotaManager.scala | 2 +- .../kafka/utils/NetworkClientBlockingOps.scala | 14 +- .../kafka/api/AuthorizerIntegrationTest.scala | 20 +- .../kafka/message/BaseMessageSetTestCases.scala | 4 +- .../unit/kafka/network/SocketServerTest.scala | 7 +- .../kafka/server/CreateTopicsRequestTest.scala | 2 +- .../unit/kafka/server/EdgeCaseRequestTest.scala | 7 +- .../unit/kafka/server/FetchRequestTest.scala | 10 +- .../unit/kafka/server/LeaderElectionTest.scala | 4 +- .../unit/kafka/server/ProduceRequestTest.scala | 6 +- .../kafka/server/ReplicaManagerQuotasTest.scala | 11 +- .../unit/kafka/server/ReplicaManagerTest.scala | 9 +- .../unit/kafka/server/SimpleFetchTest.scala | 15 +- 120 files changed, 1870 insertions(+), 1394 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7716f43..58525ad 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -81,11 +81,13 @@ <subpackage name="protocol"> <allow pkg="org.apache.kafka.common.errors" /> <allow pkg="org.apache.kafka.common.protocol.types" /> + <allow pkg="org.apache.kafka.common.record" /> </subpackage> <subpackage name="record"> <allow pkg="net.jpountz" /> <allow pkg="org.apache.kafka.common.record" /> + <allow pkg="org.apache.kafka.common.network" /> </subpackage> <subpackage name="requests"> http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java index 117b0bf..de6e506 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java @@ -12,56 +12,50 @@ */ package org.apache.kafka.clients; -import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.RequestHeader; /** * A request being sent to the server. This holds both the network send as well as the client-level metadata. */ public final class ClientRequest { + private final String destination; + private final RequestHeader header; + private final AbstractRequest body; private final long createdTimeMs; private final boolean expectResponse; - private final RequestSend request; private final RequestCompletionHandler callback; - private final boolean isInitiatedByNetworkClient; - private long sendTimeMs; /** + * @param destination The brokerId to send the request to * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created. * @param expectResponse Should we expect a response message or is this request complete once it is sent? - * @param request The request + * @param header The request's header + * @param body The request's body * @param callback A callback to execute when the response has been received (or null if no callback is necessary) */ - public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request, + public ClientRequest(String destination, + long createdTimeMs, + boolean expectResponse, + RequestHeader header, + AbstractRequest body, RequestCompletionHandler callback) { - this(createdTimeMs, expectResponse, request, callback, false); - } - - /** - * @param createdTimeMs The unix timestamp in milliseconds for the time at which this request was created. - * @param expectResponse Should we expect a response message or is this request complete once it is sent? - * @param request The request - * @param callback A callback to execute when the response has been received (or null if no callback is necessary) - * @param isInitiatedByNetworkClient Is request initiated by network client, if yes, its - * response will be consumed by network client - */ - public ClientRequest(long createdTimeMs, boolean expectResponse, RequestSend request, - RequestCompletionHandler callback, boolean isInitiatedByNetworkClient) { + this.destination = destination; this.createdTimeMs = createdTimeMs; this.callback = callback; - this.request = request; + this.header = header; + this.body = body; this.expectResponse = expectResponse; - this.isInitiatedByNetworkClient = isInitiatedByNetworkClient; } @Override public String toString() { return "ClientRequest(expectResponse=" + expectResponse + ", callback=" + callback + - ", request=" + request + - (isInitiatedByNetworkClient ? ", isInitiatedByNetworkClient" : "") + + ", header=" + header + + ", body=" + body + ", createdTimeMs=" + createdTimeMs + - ", sendTimeMs=" + sendTimeMs + ")"; } @@ -69,12 +63,16 @@ public final class ClientRequest { return expectResponse; } - public RequestSend request() { - return request; + public RequestHeader header() { + return header; } - public boolean hasCallback() { - return callback != null; + public AbstractRequest body() { + return body; + } + + public String destination() { + return destination; } public RequestCompletionHandler callback() { @@ -85,15 +83,4 @@ public final class ClientRequest { return createdTimeMs; } - public boolean isInitiatedByNetworkClient() { - return isInitiatedByNetworkClient; - } - - public long sendTimeMs() { - return sendTimeMs; - } - - public void setSendTimeMs(long sendTimeMs) { - this.sendTimeMs = sendTimeMs; - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java index 3b6f955..3cd8f1a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientResponse.java @@ -12,30 +12,45 @@ */ package org.apache.kafka.clients; -import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestHeader; /** - * A response from the server. Contains both the body of the response as well as the correlated request that was - * originally sent. + * A response from the server. Contains both the body of the response as well as the correlated request + * metadata that was originally sent. */ public class ClientResponse { + private final RequestHeader requestHeader; + private final RequestCompletionHandler callback; + private final String destination; private final long receivedTimeMs; + private final long latencyMs; private final boolean disconnected; - private final ClientRequest request; - private final Struct responseBody; + private final AbstractResponse responseBody; /** - * @param request The original request + * @param requestHeader The header of the corresponding request + * @param callback The callback to be invoked + * @param createdTimeMs The unix timestamp when the corresponding request was created + * @param destination The node the corresponding request was sent to * @param receivedTimeMs The unix timestamp when this response was received * @param disconnected Whether the client disconnected before fully reading a response * @param responseBody The response contents (or null) if we disconnected or no response was expected */ - public ClientResponse(ClientRequest request, long receivedTimeMs, boolean disconnected, Struct responseBody) { - super(); + public ClientResponse(RequestHeader requestHeader, + RequestCompletionHandler callback, + String destination, + long createdTimeMs, + long receivedTimeMs, + boolean disconnected, + AbstractResponse responseBody) { + this.requestHeader = requestHeader; + this.callback = callback; + this.destination = destination; this.receivedTimeMs = receivedTimeMs; + this.latencyMs = receivedTimeMs - createdTimeMs; this.disconnected = disconnected; - this.request = request; this.responseBody = responseBody; } @@ -47,11 +62,15 @@ public class ClientResponse { return disconnected; } - public ClientRequest request() { - return request; + public RequestHeader requestHeader() { + return requestHeader; } - public Struct responseBody() { + public String destination() { + return destination; + } + + public AbstractResponse responseBody() { return responseBody; } @@ -60,16 +79,23 @@ public class ClientResponse { } public long requestLatencyMs() { - return receivedTimeMs() - this.request.createdTimeMs(); + return latencyMs; + } + + public void onComplete() { + if (callback != null) + callback.onComplete(this); } @Override public String toString() { return "ClientResponse(receivedTimeMs=" + receivedTimeMs + + ", latencyMs=" + + latencyMs + ", disconnected=" + disconnected + - ", request=" + - request + + ", requestHeader=" + + requestHeader + ", responseBody=" + responseBody + ")"; http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 91b9dba..f4f753e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -26,7 +26,7 @@ import java.util.Map; final class InFlightRequests { private final int maxInFlightRequestsPerConnection; - private final Map<String, Deque<ClientRequest>> requests = new HashMap<>(); + private final Map<String, Deque<NetworkClient.InFlightRequest>> requests = new HashMap<>(); public InFlightRequests(int maxInFlightRequestsPerConnection) { this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection; @@ -35,11 +35,12 @@ final class InFlightRequests { /** * Add the given request to the queue for the connection it was directed to */ - public void add(ClientRequest request) { - Deque<ClientRequest> reqs = this.requests.get(request.request().destination()); + public void add(NetworkClient.InFlightRequest request) { + String destination = request.destination; + Deque<NetworkClient.InFlightRequest> reqs = this.requests.get(destination); if (reqs == null) { reqs = new ArrayDeque<>(); - this.requests.put(request.request().destination(), reqs); + this.requests.put(destination, reqs); } reqs.addFirst(request); } @@ -47,8 +48,8 @@ final class InFlightRequests { /** * Get the request queue for the given node */ - private Deque<ClientRequest> requestQueue(String node) { - Deque<ClientRequest> reqs = requests.get(node); + private Deque<NetworkClient.InFlightRequest> requestQueue(String node) { + Deque<NetworkClient.InFlightRequest> reqs = requests.get(node); if (reqs == null || reqs.isEmpty()) throw new IllegalStateException("Response from server for which there are no in-flight requests."); return reqs; @@ -57,7 +58,7 @@ final class InFlightRequests { /** * Get the oldest request (the one that that will be completed next) for the given node */ - public ClientRequest completeNext(String node) { + public NetworkClient.InFlightRequest completeNext(String node) { return requestQueue(node).pollLast(); } @@ -65,7 +66,7 @@ final class InFlightRequests { * Get the last request we sent to the given node (but don't remove it from the queue) * @param node The node id */ - public ClientRequest lastSent(String node) { + public NetworkClient.InFlightRequest lastSent(String node) { return requestQueue(node).peekFirst(); } @@ -74,20 +75,20 @@ final class InFlightRequests { * @param node The node the request was sent to * @return The request */ - public ClientRequest completeLastSent(String node) { + public NetworkClient.InFlightRequest completeLastSent(String node) { return requestQueue(node).pollFirst(); } /** * Can we send more requests to this node? - * + * * @param node Node in question * @return true iff we have no requests still being sent to the given node */ public boolean canSendMore(String node) { - Deque<ClientRequest> queue = requests.get(node); + Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null || queue.isEmpty() || - (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection); + (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection); } /** @@ -96,7 +97,7 @@ final class InFlightRequests { * @return The request count. */ public int inFlightRequestCount(String node) { - Deque<ClientRequest> queue = requests.get(node); + Deque<NetworkClient.InFlightRequest> queue = requests.get(node); return queue == null ? 0 : queue.size(); } @@ -105,19 +106,19 @@ final class InFlightRequests { */ public int inFlightRequestCount() { int total = 0; - for (Deque<ClientRequest> deque : this.requests.values()) + for (Deque<NetworkClient.InFlightRequest> deque : this.requests.values()) total += deque.size(); return total; } /** * Clear out all the in-flight requests for the given node and return them - * + * * @param node The node * @return All the in-flight requests for that node that have been removed */ - public Iterable<ClientRequest> clearAll(String node) { - Deque<ClientRequest> reqs = requests.get(node); + public Iterable<NetworkClient.InFlightRequest> clearAll(String node) { + Deque<NetworkClient.InFlightRequest> reqs = requests.get(node); if (reqs == null) { return Collections.emptyList(); } else { @@ -126,7 +127,7 @@ final class InFlightRequests { } /** - * Returns a list of nodes with pending inflight request, that need to be timed out + * Returns a list of nodes with pending in-flight request, that need to be timed out * * @param now current time in milliseconds * @param requestTimeout max time to wait for the request to be completed @@ -134,13 +135,13 @@ final class InFlightRequests { */ public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) { List<String> nodeIds = new LinkedList<>(); - for (Map.Entry<String, Deque<ClientRequest>> requestEntry : requests.entrySet()) { + for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) { String nodeId = requestEntry.getKey(); - Deque<ClientRequest> deque = requestEntry.getValue(); + Deque<NetworkClient.InFlightRequest> deque = requestEntry.getValue(); if (!deque.isEmpty()) { - ClientRequest request = deque.peekLast(); - long timeSinceSend = now - request.sendTimeMs(); + NetworkClient.InFlightRequest request = deque.peekLast(); + long timeSinceSend = now - request.sendTimeMs; if (timeSinceSend > requestTimeout) nodeIds.add(nodeId); } @@ -148,4 +149,5 @@ final class InFlightRequests { return nodeIds; } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 8c6e39a..f171d13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -66,7 +66,6 @@ public interface KafkaClient extends Closeable { /** * Queue up the given request for sending. Requests can only be sent on ready connections. - * * @param request The request * @param now The current timestamp */ http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java index efbe664..1c9fa79 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java @@ -14,7 +14,8 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Node; -import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestHeader; import java.util.ArrayList; import java.util.List; @@ -60,13 +61,13 @@ public class ManualMetadataUpdater implements MetadataUpdater { } @Override - public boolean maybeHandleDisconnection(ClientRequest request) { - return false; + public void handleDisconnection(String destination) { + // Do nothing } @Override - public boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body) { - return false; + public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body) { + // Do nothing } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index 4669a68..21c50bd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -14,7 +14,8 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Node; -import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.RequestHeader; import java.util.List; @@ -41,7 +42,7 @@ interface MetadataUpdater { * be 0 if an update has been started as a result of this call). * * If the implementation relies on `NetworkClient` to send requests, the completed receive will be passed to - * `maybeHandleCompletedReceive`. + * `handleCompletedMetadataResponse`. * * The semantics of `needed` and `possible` are implementation-dependent and may take into account a number of * factors like node availability, how long since the last metadata update, etc. @@ -53,8 +54,9 @@ interface MetadataUpdater { * * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own * requests with special handling for disconnections of such requests. + * @param destination */ - boolean maybeHandleDisconnection(ClientRequest request); + void handleDisconnection(String destination); /** * If `request` is a metadata request, handles it and returns `true`. Otherwise, returns `false`. @@ -62,7 +64,7 @@ interface MetadataUpdater { * This provides a mechanism for the `MetadataUpdater` implementation to use the NetworkClient instance for its own * requests with special handling for completed receives of such requests. */ - boolean maybeHandleCompletedReceive(ClientRequest request, long now, Struct body); + void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse body); /** * Schedules an update of the current cluster metadata info. A subsequent call to `maybeUpdate` would trigger the http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 29c6d6f..124810d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -21,10 +21,10 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -173,8 +173,9 @@ public class NetworkClient implements KafkaClient { @Override public void close(String nodeId) { selector.close(nodeId); - for (ClientRequest request : inFlightRequests.clearAll(nodeId)) - metadataUpdater.maybeHandleDisconnection(request); + for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) + if (request.isInternalMetadataRequest) + metadataUpdater.handleDisconnection(request.destination); connectionStates.remove(nodeId); } @@ -230,22 +231,38 @@ public class NetworkClient implements KafkaClient { /** * Queue up the given request for sending. Requests can only be sent out to ready nodes. - * * @param request The request * @param now The current timestamp */ @Override public void send(ClientRequest request, long now) { - String nodeId = request.request().destination(); + doSend(request, false, now); + } + + private void sendInternalMetadataRequest(MetadataRequest metadataRequest, String nodeConnectionId, long now) { + ClientRequest clientRequest = new ClientRequest(nodeConnectionId, now, true, + nextRequestHeader(ApiKeys.METADATA), metadataRequest, null); + doSend(clientRequest, true, now); + } + + private void doSend(ClientRequest request, boolean isInternalMetadataRequest, long now) { + String nodeId = request.destination(); if (!canSendRequest(nodeId)) throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready."); - doSend(request, now); - } - private void doSend(ClientRequest request, long now) { - request.setSendTimeMs(now); - this.inFlightRequests.add(request); - selector.send(request.request()); + Send send = request.body().toSend(nodeId, request.header()); + InFlightRequest inFlightRequest = new InFlightRequest( + request.header(), + request.createdTimeMs(), + request.destination(), + request.callback(), + request.expectResponse(), + isInternalMetadataRequest, + send, + now); + + this.inFlightRequests.add(inFlightRequest); + selector.send(inFlightRequest.send); } /** @@ -277,12 +294,10 @@ public class NetworkClient implements KafkaClient { // invoke callbacks for (ClientResponse response : responses) { - if (response.request().hasCallback()) { - try { - response.request().callback().onComplete(response); - } catch (Exception e) { - log.error("Uncaught error in request completion:", e); - } + try { + response.onComplete(); + } catch (Exception e) { + log.error("Uncaught error in request completion:", e); } } @@ -376,14 +391,14 @@ public class NetworkClient implements KafkaClient { return found; } - public static Struct parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { + public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); // Always expect the response version id to be the same as the request version id short apiKey = requestHeader.apiKey(); short apiVer = requestHeader.apiVersion(); Struct responseBody = ProtoUtils.responseSchema(apiKey, apiVer).read(responseBuffer); correlate(requestHeader, responseHeader); - return responseBody; + return AbstractResponse.getResponse(apiKey, responseBody); } /** @@ -395,10 +410,12 @@ public class NetworkClient implements KafkaClient { */ private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) { connectionStates.disconnected(nodeId, now); - for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) { + for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId); - if (!metadataUpdater.maybeHandleDisconnection(request)) - responses.add(new ClientResponse(request, now, true, null)); + if (request.isInternalMetadataRequest) + metadataUpdater.handleDisconnection(request.destination); + else + responses.add(request.disconnected(now)); } } @@ -432,10 +449,10 @@ public class NetworkClient implements KafkaClient { private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it for (Send send : this.selector.completedSends()) { - ClientRequest request = this.inFlightRequests.lastSent(send.destination()); - if (!request.expectResponse()) { + InFlightRequest request = this.inFlightRequests.lastSent(send.destination()); + if (!request.expectResponse) { this.inFlightRequests.completeLastSent(send.destination()); - responses.add(new ClientResponse(request, now, false, null)); + responses.add(request.completed(null, now)); } } } @@ -449,10 +466,12 @@ public class NetworkClient implements KafkaClient { private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); - ClientRequest req = inFlightRequests.completeNext(source); - Struct body = parseResponse(receive.payload(), req.request().header()); - if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body)) - responses.add(new ClientResponse(req, now, false, body)); + InFlightRequest req = inFlightRequests.completeNext(source); + AbstractResponse body = parseResponse(receive.payload(), req.header); + if (req.isInternalMetadataRequest) + metadataUpdater.handleCompletedMetadataResponse(req.header, now, body); + else + responses.add(req.completed(body, now)); } } @@ -558,33 +577,23 @@ public class NetworkClient implements KafkaClient { } @Override - public boolean maybeHandleDisconnection(ClientRequest request) { - ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey()); - - if (requestKey == ApiKeys.METADATA && request.isInitiatedByNetworkClient()) { - Cluster cluster = metadata.fetch(); - if (cluster.isBootstrapConfigured()) { - int nodeId = Integer.parseInt(request.request().destination()); - Node node = cluster.nodeById(nodeId); - if (node != null) - log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port()); - } - - metadataFetchInProgress = false; - return true; + public void handleDisconnection(String destination) { + Cluster cluster = metadata.fetch(); + if (cluster.isBootstrapConfigured()) { + int nodeId = Integer.parseInt(destination); + Node node = cluster.nodeById(nodeId); + if (node != null) + log.warn("Bootstrap broker {}:{} disconnected", node.host(), node.port()); } - return false; + metadataFetchInProgress = false; } @Override - public boolean maybeHandleCompletedReceive(ClientRequest req, long now, Struct body) { - short apiKey = req.request().header().apiKey(); - if (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient()) { - handleResponse(req.request().header(), body, now); - return true; - } - return false; + public void handleCompletedMetadataResponse(RequestHeader requestHeader, long now, AbstractResponse response) { + if (!(response instanceof MetadataResponse)) + throw new IllegalStateException("Unexpected response type in metadata handler: " + response); + handleMetadataResponse(requestHeader, (MetadataResponse) response, now); } @Override @@ -592,9 +601,8 @@ public class NetworkClient implements KafkaClient { this.metadata.requestUpdate(); } - private void handleResponse(RequestHeader header, Struct body, long now) { + private void handleMetadataResponse(RequestHeader header, MetadataResponse response, long now) { this.metadataFetchInProgress = false; - MetadataResponse response = new MetadataResponse(body); Cluster cluster = response.cluster(); // check if any topics metadata failed to get updated Map<String, Errors> errors = response.errors(); @@ -612,14 +620,6 @@ public class NetworkClient implements KafkaClient { } /** - * Create a metadata request for the given topics - */ - private ClientRequest request(long now, String node, MetadataRequest metadata) { - RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct()); - return new ClientRequest(now, true, send, null, true); - } - - /** * Return true if there's at least one connection establishment is currently underway */ private boolean isAnyNodeConnecting() { @@ -644,9 +644,9 @@ public class NetworkClient implements KafkaClient { metadataRequest = MetadataRequest.allTopics(); else metadataRequest = new MetadataRequest(new ArrayList<>(metadata.topics())); - ClientRequest clientRequest = request(now, nodeConnectionId, metadataRequest); + log.debug("Sending metadata request {} to node {}", metadataRequest, node.id()); - doSend(clientRequest, now); + sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); return requestTimeoutMs; } @@ -674,4 +674,42 @@ public class NetworkClient implements KafkaClient { } + static class InFlightRequest { + final RequestHeader header; + final String destination; + final RequestCompletionHandler callback; + final boolean expectResponse; + final boolean isInternalMetadataRequest; // used to flag metadata fetches which are triggered internally by NetworkClient + final Send send; + final long sendTimeMs; + final long createdTimeMs; + + public InFlightRequest(RequestHeader header, + long createdTimeMs, + String destination, + RequestCompletionHandler callback, + boolean expectResponse, + boolean isInternalMetadataRequest, + Send send, + long sendTimeMs) { + this.header = header; + this.destination = destination; + this.callback = callback; + this.expectResponse = expectResponse; + this.isInternalMetadataRequest = isInternalMetadataRequest; + this.send = send; + this.sendTimeMs = sendTimeMs; + this.createdTimeMs = createdTimeMs; + } + + public ClientResponse completed(AbstractResponse response, long timeMs) { + return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, false, response); + } + + public ClientResponse disconnected(long timeMs) { + return new ClientResponse(header, callback, destination, createdTimeMs, timeMs, true, null); + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 59319ef..c205273 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -393,17 +393,11 @@ public abstract class AbstractCoordinator implements Closeable { } private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> { - - @Override - public JoinGroupResponse parse(ClientResponse response) { - return new JoinGroupResponse(response.responseBody()); - } - @Override public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) { Errors error = Errors.forCode(joinResponse.errorCode()); if (error == Errors.NONE) { - log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct()); + log.debug("Received successful join group response for group {}: {}", groupId, joinResponse); sensors.joinLatency.record(response.requestLatencyMs()); synchronized (AbstractCoordinator.this) { @@ -483,12 +477,6 @@ public abstract class AbstractCoordinator implements Closeable { } private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> { - - @Override - public SyncGroupResponse parse(ClientResponse response) { - return new SyncGroupResponse(response.responseBody()); - } - @Override public void handle(SyncGroupResponse syncResponse, RequestFuture<ByteBuffer> future) { @@ -540,7 +528,7 @@ public abstract class AbstractCoordinator implements Closeable { public void onSuccess(ClientResponse resp, RequestFuture<Void> future) { log.debug("Received group coordinator response {}", resp); - GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody()); + GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody(); // use MAX_VALUE - node.id as the coordinator id to mimic separate connections // for the coordinator in the underlying network client layer // TODO: this needs to be better handled in KAFKA-1935 @@ -653,12 +641,6 @@ public abstract class AbstractCoordinator implements Closeable { } private class LeaveGroupResponseHandler extends CoordinatorResponseHandler<LeaveGroupResponse, Void> { - - @Override - public LeaveGroupResponse parse(ClientResponse response) { - return new LeaveGroupResponse(response.responseBody()); - } - @Override public void handle(LeaveGroupResponse leaveResponse, RequestFuture<Void> future) { Errors error = Errors.forCode(leaveResponse.errorCode()); @@ -680,12 +662,6 @@ public abstract class AbstractCoordinator implements Closeable { } private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> { - - @Override - public HeartbeatResponse parse(ClientResponse response) { - return new HeartbeatResponse(response.responseBody()); - } - @Override public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) { sensors.heartbeatLatency.record(response.requestLatencyMs()); @@ -722,8 +698,6 @@ public abstract class AbstractCoordinator implements Closeable { protected abstract class CoordinatorResponseHandler<R, T> extends RequestFutureAdapter<ClientResponse, T> { protected ClientResponse response; - public abstract R parse(ClientResponse response); - public abstract void handle(R response, RequestFuture<T> future); @Override @@ -735,10 +709,11 @@ public abstract class AbstractCoordinator implements Closeable { } @Override + @SuppressWarnings("unchecked") public void onSuccess(ClientResponse clientResponse, RequestFuture<T> future) { try { this.response = clientResponse; - R responseObj = parse(clientResponse); + R responseObj = (R) clientResponse.responseBody(); handle(responseObj, future); } catch (RuntimeException e) { if (!future.isDone()) http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 2621c09..4889872 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -12,7 +12,6 @@ */ package org.apache.kafka.clients.consumer.internals; -import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; @@ -624,11 +623,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } @Override - public OffsetCommitResponse parse(ClientResponse response) { - return new OffsetCommitResponse(response.responseBody()); - } - - @Override public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) { sensors.commitLatency.record(response.requestLatencyMs()); Set<String> unauthorizedTopics = new HashSet<>(); @@ -718,12 +712,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator { } private class OffsetFetchResponseHandler extends CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> { - - @Override - public OffsetFetchResponse parse(ClientResponse response) { - return new OffsetFetchResponse(response.responseBody()); - } - @Override public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(response.responseData().size()); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 2495b23..d9baa56 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -25,7 +25,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.RequestHeader; -import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.utils.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,14 +97,14 @@ public class ConsumerNetworkClient implements Closeable { } private RequestFuture<ClientResponse> send(Node node, - ApiKeys api, - short version, - AbstractRequest request) { + ApiKeys api, + short version, + AbstractRequest request) { long now = time.milliseconds(); RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler(); RequestHeader header = client.nextRequestHeader(api, version); - RequestSend send = new RequestSend(node.idString(), header, request.toStruct()); - put(node, new ClientRequest(now, true, send, completionHandler)); + ClientRequest clientRequest = new ClientRequest(node.idString(), now, true, header, request, completionHandler); + put(node, clientRequest); // wakeup the client in case it is blocking in poll so that we can send the queued request client.wakeup(); @@ -336,9 +335,9 @@ public class ConsumerNetworkClient implements Closeable { // coordinator failures traversing the unsent list again. iterator.remove(); for (ClientRequest request : requestEntry.getValue()) { - RequestFutureCompletionHandler handler = - (RequestFutureCompletionHandler) request.callback(); - handler.onComplete(new ClientResponse(request, now, true, null)); + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); + handler.onComplete(new ClientResponse(request.header(), request.callback(), request.destination(), + request.createdTimeMs(), now, true, null)); } } } @@ -369,8 +368,8 @@ public class ConsumerNetworkClient implements Closeable { synchronized (this) { List<ClientRequest> unsentRequests = unsent.remove(node); if (unsentRequests != null) { - for (ClientRequest request : unsentRequests) { - RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); + for (ClientRequest unsentRequest : unsentRequests) { + RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) unsentRequest.callback(); handler.onFailure(e); } } @@ -468,12 +467,11 @@ public class ConsumerNetworkClient implements Closeable { if (e != null) { future.raise(e); } else if (response.wasDisconnected()) { - ClientRequest request = response.request(); - RequestSend send = request.request(); - ApiKeys api = ApiKeys.forId(send.header().apiKey()); - int correlation = send.header().correlationId(); + RequestHeader requestHeader = response.requestHeader(); + ApiKeys api = ApiKeys.forId(requestHeader.apiKey()); + int correlation = requestHeader.correlationId(); log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected", - api, request, correlation, send.destination()); + api, requestHeader, correlation, response.destination()); future.raise(DisconnectException.INSTANCE); } else { future.complete(response); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index bfc1a0b..703ea29 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -41,7 +41,6 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.record.LogEntry; -import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.requests.FetchRequest; @@ -155,7 +154,7 @@ public class Fetcher<K, V> { .addListener(new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { - FetchResponse response = new FetchResponse(resp.responseBody()); + FetchResponse response = (FetchResponse) resp.responseBody(); if (!matchesRequestedPartitions(request, response)) { // obviously we expect the broker to always send us valid responses, so this check // is mainly for test cases where mock fetch responses must be manually crafted. @@ -256,7 +255,7 @@ public class Fetcher<K, V> { throw future.exception(); if (future.succeeded()) { - MetadataResponse response = new MetadataResponse(future.value().responseBody()); + MetadataResponse response = (MetadataResponse) future.value().responseBody(); Cluster cluster = response.cluster(); Set<String> unauthorizedTopics = cluster.unauthorizedTopics(); @@ -549,7 +548,7 @@ public class Fetcher<K, V> { .compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetAndTimestamp>>() { @Override public void onSuccess(ClientResponse response, RequestFuture<Map<TopicPartition, OffsetAndTimestamp>> future) { - ListOffsetResponse lor = new ListOffsetResponse(response.responseBody()); + ListOffsetResponse lor = (ListOffsetResponse) response.responseBody(); log.trace("Received ListOffsetResponse {} from broker {}", lor, node); handleListOffsetResponse(timestampsToSearch, lor, future); } @@ -673,10 +672,8 @@ public class Fetcher<K, V> { return null; } - ByteBuffer buffer = partition.recordSet; - MemoryRecords records = MemoryRecords.readableRecords(buffer); List<ConsumerRecord<K, V>> parsed = new ArrayList<>(); - for (LogEntry logEntry : records) { + for (LogEntry logEntry : partition.records) { // Skip the messages earlier than current position. if (logEntry.offset() >= position) { parsed.add(parseRecord(tp, logEntry)); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c71bb67..7555b71 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -12,20 +12,13 @@ */ package org.apache.kafka.clients.producer.internals; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.RequestCompletionHandler; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidMetadataException; @@ -34,7 +27,6 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Avg; @@ -42,15 +34,22 @@ import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.ProduceResponse; -import org.apache.kafka.common.requests.RequestSend; +import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + /** * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes. @@ -210,19 +209,17 @@ public class Sender implements Runnable { this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); sensors.updateProduceRequestMetrics(batches); - List<ClientRequest> requests = createProduceRequests(batches, now); + // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes // with sendable data that aren't ready to send since they would cause busy looping. long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); - if (result.readyNodes.size() > 0) { + if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); - log.trace("Created {} produce requests: {}", requests.size(), requests); pollTimeout = 0; } - for (ClientRequest request : requests) - client.send(request, now); + sendProduceRequests(batches, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, @@ -254,20 +251,16 @@ public class Sender implements Runnable { * Handle a produce response */ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, RecordBatch> batches, long now) { - int correlationId = response.request().request().header().correlationId(); + int correlationId = response.requestHeader().correlationId(); if (response.wasDisconnected()) { - log.trace("Cancelled request {} due to node {} being disconnected", response, response.request() - .request() - .destination()); + log.trace("Cancelled request {} due to node {} being disconnected", response, response.destination()); for (RecordBatch batch : batches.values()) completeBatch(batch, Errors.NETWORK_EXCEPTION, -1L, Record.NO_TIMESTAMP, correlationId, now); } else { - log.trace("Received produce response from node {} with correlation id {}", - response.request().request().destination(), - correlationId); + log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it if (response.hasResponse()) { - ProduceResponse produceResponse = new ProduceResponse(response.responseBody()); + ProduceResponse produceResponse = (ProduceResponse) response.responseBody(); for (Map.Entry<TopicPartition, ProduceResponse.PartitionResponse> entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); @@ -275,7 +268,7 @@ public class Sender implements Runnable { RecordBatch batch = batches.get(tp); completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now); } - this.sensors.recordLatency(response.request().request().destination(), response.requestLatencyMs()); + this.sensors.recordLatency(response.destination(), response.requestLatencyMs()); this.sensors.recordThrottleTime(produceResponse.getThrottleTime()); } else { // this is the acks = 0 case, just complete all requests @@ -339,35 +332,36 @@ public class Sender implements Runnable { /** * Transfer the record batches into a list of produce requests on a per-node basis */ - private List<ClientRequest> createProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { - List<ClientRequest> requests = new ArrayList<ClientRequest>(collated.size()); + private void sendProduceRequests(Map<Integer, List<RecordBatch>> collated, long now) { for (Map.Entry<Integer, List<RecordBatch>> entry : collated.entrySet()) - requests.add(produceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue())); - return requests; + sendProduceRequest(now, entry.getKey(), acks, requestTimeout, entry.getValue()); } /** * Create a produce request from the given record batches */ - private ClientRequest produceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { - Map<TopicPartition, ByteBuffer> produceRecordsByPartition = new HashMap<TopicPartition, ByteBuffer>(batches.size()); - final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<TopicPartition, RecordBatch>(batches.size()); + private void sendProduceRequest(long now, int destination, short acks, int timeout, List<RecordBatch> batches) { + Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); + final Map<TopicPartition, RecordBatch> recordsByPartition = new HashMap<>(batches.size()); for (RecordBatch batch : batches) { TopicPartition tp = batch.topicPartition; - produceRecordsByPartition.put(tp, batch.records.buffer()); + produceRecordsByPartition.put(tp, batch.records); recordsByPartition.put(tp, batch); } - ProduceRequest request = new ProduceRequest(acks, timeout, produceRecordsByPartition); - RequestSend send = new RequestSend(Integer.toString(destination), - this.client.nextRequestHeader(ApiKeys.PRODUCE), - request.toStruct()); + + ProduceRequest produceRequest = new ProduceRequest(acks, timeout, produceRecordsByPartition); + RequestHeader header = this.client.nextRequestHeader(ApiKeys.PRODUCE); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; - return new ClientRequest(now, acks != 0, send, callback); + String nodeId = Integer.toString(destination); + ClientRequest clientRequest = new ClientRequest(nodeId, now, acks != 0, header, produceRequest, callback); + + client.send(clientRequest, now); + log.trace("Sent produce request to {}: {}", nodeId, produceRequest); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index 9e213ec..3683283 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -29,11 +29,10 @@ public class ByteBufferSend implements Send { private boolean pending = false; public ByteBufferSend(String destination, ByteBuffer... buffers) { - super(); this.destination = destination; this.buffers = buffers; - for (int i = 0; i < buffers.length; i++) - remaining += buffers[i].remaining(); + for (ByteBuffer buffer : buffers) + remaining += buffer.remaining(); this.size = remaining; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java index 49964b0..5e4bf2c 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java @@ -29,8 +29,8 @@ public class NetworkSend extends ByteBufferSend { private static ByteBuffer[] sizeDelimit(ByteBuffer[] buffers) { int size = 0; - for (int i = 0; i < buffers.length; i++) - size += buffers[i].remaining(); + for (ByteBuffer buffer : buffers) + size += buffer.remaining(); ByteBuffer[] delimited = new ByteBuffer[buffers.length + 1]; delimited[0] = ByteBuffer.allocate(4); delimited[0].putInt(size); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 6178b80..e07c3c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -63,7 +63,7 @@ public enum ApiKeys { /** an english description of the api--this is for debugging and can change */ public final String name; - private ApiKeys(int id, String name) { + ApiKeys(int id, String name) { this.id = (short) id; this.name = name; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 9e21f3b..cd4e6e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -31,6 +31,7 @@ import static org.apache.kafka.common.protocol.types.Type.BYTES; import static org.apache.kafka.common.protocol.types.Type.INT16; import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; +import static org.apache.kafka.common.protocol.types.Type.RECORDS; import static org.apache.kafka.common.protocol.types.Type.STRING; import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; @@ -135,7 +136,7 @@ public class Protocol { public static final Schema TOPIC_PRODUCE_DATA_V0 = new Schema(new Field("topic", STRING), new Field("data", new ArrayOf(new Schema(new Field("partition", INT32), - new Field("record_set", BYTES))))); + new Field("record_set", RECORDS))))); public static final Schema PRODUCE_REQUEST_V0 = new Schema(new Field("acks", INT16, @@ -500,14 +501,15 @@ public class Protocol { new ArrayOf(FETCH_REQUEST_TOPIC_V0), "Topics to fetch in the order provided.")); - public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", - INT32, - "Topic partition id."), - new Field("error_code", INT16), - new Field("high_watermark", - INT64, - "Last committed offset."), - new Field("record_set", BYTES)); + public static final Schema FETCH_RESPONSE_PARTITION_HEADER_V0 = new Schema(new Field("partition", + INT32, + "Topic partition id."), + new Field("error_code", INT16), + new Field("high_watermark", + INT64, + "Last committed offset.")); + public static final Schema FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition_header", FETCH_RESPONSE_PARTITION_HEADER_V0), + new Field("record_set", RECORDS)); public static final Schema FETCH_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING), new Field("partition_responses", @@ -515,6 +517,7 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); + public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", INT32, "Duration in milliseconds for which the request was throttled" + http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java index 207f108..1ba8e44 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java @@ -54,8 +54,9 @@ public class ArrayOf extends Type { Object[] objs = (Object[]) o; int size = objs.length; buffer.putInt(size); - for (int i = 0; i < size; i++) - type.write(buffer, objs[i]); + + for (Object obj : objs) + type.write(buffer, obj); } @Override @@ -81,8 +82,8 @@ public class ArrayOf extends Type { return size; Object[] objs = (Object[]) o; - for (int i = 0; i < objs.length; i++) - size += type.sizeOf(objs[i]); + for (Object obj : objs) + size += type.sizeOf(obj); return size; } @@ -102,8 +103,8 @@ public class ArrayOf extends Type { return null; Object[] array = (Object[]) item; - for (int i = 0; i < array.length; i++) - type.validate(array[i]); + for (Object obj : array) + type.validate(obj); return array; } catch (ClassCastException e) { throw new SchemaException("Not an Object[]."); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java index e8dce31..efbfd14 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java @@ -31,7 +31,7 @@ public class Schema extends Type { */ public Schema(Field... fs) { this.fields = new Field[fs.length]; - this.fieldsByName = new HashMap<String, Field>(); + this.fieldsByName = new HashMap<>(); for (int i = 0; i < this.fields.length; i++) { Field field = fs[i]; if (fieldsByName.containsKey(field.name)) @@ -47,13 +47,12 @@ public class Schema extends Type { @Override public void write(ByteBuffer buffer, Object o) { Struct r = (Struct) o; - for (int i = 0; i < fields.length; i++) { - Field f = fields[i]; + for (Field field : fields) { try { - Object value = f.type().validate(r.get(f)); - f.type.write(buffer, value); + Object value = field.type().validate(r.get(field)); + field.type.write(buffer, value); } catch (Exception e) { - throw new SchemaException("Error writing field '" + f.name + + throw new SchemaException("Error writing field '" + field.name + "': " + (e.getMessage() == null ? e.getClass().getName() : e.getMessage())); } @@ -85,8 +84,8 @@ public class Schema extends Type { public int sizeOf(Object o) { int size = 0; Struct r = (Struct) o; - for (int i = 0; i < fields.length; i++) - size += fields[i].type.sizeOf(r.get(fields[i])); + for (Field field : fields) + size += field.type.sizeOf(r.get(field)); return size; } @@ -146,8 +145,7 @@ public class Schema extends Type { public Struct validate(Object item) { try { Struct struct = (Struct) item; - for (int i = 0; i < this.fields.length; i++) { - Field field = this.fields[i]; + for (Field field : fields) { try { field.type.validate(struct.get(field)); } catch (SchemaException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 212d701..0165ce6 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -12,6 +12,8 @@ */ package org.apache.kafka.common.protocol.types; +import org.apache.kafka.common.record.Records; + import java.nio.ByteBuffer; import java.util.Arrays; @@ -108,6 +110,10 @@ public class Struct { return (Byte) get(name); } + public Records getRecords(String name) { + return (Records) get(name); + } + public Short getShort(Field field) { return (Short) get(field); } @@ -272,16 +278,6 @@ public class Struct { this.schema.validate(this); } - /** - * Create a byte buffer containing the serialized form of the values in this struct. This method can choose to break - * the struct into multiple ByteBuffers if need be. - */ - public ByteBuffer[] toBytes() { - ByteBuffer buffer = ByteBuffer.allocate(sizeOf()); - writeTo(buffer); - return new ByteBuffer[] {buffer}; - } - @Override public String toString() { StringBuilder b = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java index 43b4a37..e2f7baf 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java @@ -18,6 +18,9 @@ package org.apache.kafka.common.protocol.types; import java.nio.ByteBuffer; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.Utils; /** @@ -424,5 +427,50 @@ public abstract class Type { } }; + public static final Type RECORDS = new Type() { + @Override + public boolean isNullable() { + return true; + } + + @Override + public void write(ByteBuffer buffer, Object o) { + if (o instanceof FileRecords) + throw new IllegalArgumentException("FileRecords must be written to the channel directly"); + MemoryRecords records = (MemoryRecords) o; + NULLABLE_BYTES.write(buffer, records.buffer().duplicate()); + } + + @Override + public Object read(ByteBuffer buffer) { + ByteBuffer recordsBuffer = (ByteBuffer) NULLABLE_BYTES.read(buffer); + return MemoryRecords.readableRecords(recordsBuffer); + } + + @Override + public int sizeOf(Object o) { + if (o == null) + return 4; + + Records records = (Records) o; + return 4 + records.sizeInBytes(); + } + + @Override + public String toString() { + return "RECORDS"; + } + + @Override + public Records validate(Object item) { + if (item == null) + return null; + + if (item instanceof Records) + return (Records) item; + + throw new SchemaException(item + " is not an instance of " + Records.class.getName()); + } + }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3b4c3479/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java new file mode 100644 index 0000000..bdae08d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.network.TransportLayer; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; + +/** + * File-backed record set. + */ +public class FileRecords implements Records { + private final File file; + private final FileChannel channel; + private final long start; + private final long end; + private final long size; + + public FileRecords(File file, + FileChannel channel, + int start, + int end, + boolean isSlice) throws IOException { + this.file = file; + this.channel = channel; + this.start = start; + this.end = end; + + if (isSlice) + this.size = end - start; + else + this.size = Math.min(channel.size(), end) - start; + } + + @Override + public int sizeInBytes() { + return (int) size; + } + + @Override + public long writeTo(GatheringByteChannel destChannel, long offset, int length) throws IOException { + long newSize = Math.min(channel.size(), end) - start; + if (newSize < size) + throw new KafkaException(String.format("Size of FileRecords %s has been truncated during write: old size %d, new size %d", file.getAbsolutePath(), size, newSize)); + + if (offset > size) + throw new KafkaException(String.format("The requested offset %d is out of range. The size of this FileRecords is %d.", offset, size)); + + long position = start + offset; + long count = Math.min(length, this.size - offset); + if (destChannel instanceof TransportLayer) { + TransportLayer tl = (TransportLayer) destChannel; + return tl.transferFrom(this.channel, position, count); + } else { + return this.channel.transferTo(position, count, destChannel); + } + } + + @Override + public RecordsIterator iterator() { + return new RecordsIterator(new FileLogInputStream(channel, start, end), false); + } + + private static class FileLogInputStream implements LogInputStream { + private long position; + protected final long end; + protected final FileChannel channel; + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD); + + public FileLogInputStream(FileChannel channel, long start, long end) { + this.channel = channel; + this.position = start; + this.end = end; + } + + @Override + public LogEntry nextEntry() throws IOException { + if (position + Records.LOG_OVERHEAD >= end) + return null; + + logHeaderBuffer.rewind(); + channel.read(logHeaderBuffer, position); + if (logHeaderBuffer.hasRemaining()) + return null; + + logHeaderBuffer.rewind(); + long offset = logHeaderBuffer.getLong(); + int size = logHeaderBuffer.getInt(); + if (size < 0) + throw new IllegalStateException("Record with size " + size); + + if (position + Records.LOG_OVERHEAD + size > end) + return null; + + ByteBuffer recordBuffer = ByteBuffer.allocate(size); + channel.read(recordBuffer, position + Records.LOG_OVERHEAD); + if (recordBuffer.hasRemaining()) + return null; + recordBuffer.rewind(); + + Record record = new Record(recordBuffer); + LogEntry logEntry = new LogEntry(offset, record); + position += logEntry.size(); + return logEntry; + } + } +}