hi, Jason
can you explain the "head of line request blocking" in more detail? I am very
curious, thanks!
below is the code:
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends
KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new
ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new
Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
the requestQueue is consumed by multiple threads, so how it can guarantee the
response order the same as the request order?
------------------ ???????? ------------------
??????: "Jason Gustafson";<[email protected]>;
????????: 2016??9??27??(??????) ????2:11
??????: "dev"<[email protected]>;
????: Re: it this a bug? - message disorder in async send mode -- 0.9.0java
client sdk InFlightRequests
Hi there,
The Kafka server implements head of line request blocking, which means that
it will only handle one request a time from a given socket. That means that
the responses will always be returned in the same order as the requests
were sent.
-Jason
On Sat, Sep 24, 2016 at 1:19 AM, ???????? <[email protected]> wrote:
> We know that in the async send mode, kafka do not guarantee the message
> order even for the same partition.
>
>
> That is, if we send 3 request ( the same topic, the same partition) to a
> kafka server in the async mode,
> the send order is 1, 2, 3 (correlation id is 1, 2, 3), while the kafka
> server maybe save the 3 request in the log by the order 3, 2, 1, and
> return to the client by the order 2, 3, 1??
>
>
> This happens because Kafka server processes requests with multi
> threads(multi KafkaRequestHandler).
>
>
> If the above is true, below in the 0.9.0 java client idk maybe has
> problem:
>
>
> In the class NetworkClient, there is a collection inFlightRequests to
> maintain all the in flight request:
>
>
> private final InFlightRequests inFlightRequests;
> final class InFlightRequests {
>
> private final int maxInFlightRequestsPerConnection;
> private final Map<String, Deque<ClientRequest>> requests = new
> HashMap<String, Deque<ClientRequest>>(); ...}
> It use a Deque to maintain the in flight requests whose response has not
> come back.
> Whenever we send a request, we will enqueue the request, and when the
> response come back, we will dequeue the request.
> private void doSend(ClientRequest request, long now) {
> request.setSendTimeMs(now);
> this.inFlightRequests.add(request);
> selector.send(request.request());
> }private void handleCompletedReceives(List<ClientResponse> responses,
> long now) {
> for (NetworkReceive receive : this.selector.completedReceives()) {
> String source = receive.source();
> ClientRequest req = inFlightRequests.completeNext(source);
> ResponseHeader header = ResponseHeader.parse(receive.payload());
> // Always expect the response version id to be the same as the
> request version id
> short apiKey = req.request().header().apiKey();
> short apiVer = req.request().header().apiVersion();
> Struct body = (Struct) ProtoUtils.responseSchema(apiKey,
> apiVer).read(receive.payload());
> correlate(req.request().header(), header);
> if (!metadataUpdater.maybeHandleCompletedReceive(req, now, body))
> responses.add(new ClientResponse(req, now, false, body));
> }
> }
> but if the request order and the response order does not match, is it the
> Deque suitable? or it should be use a Map to maintain the request?
> By the way, in the above, there is a function correlate(xxx) to check the
> match, if not match, it will throw a exception.private void
> correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
> if (requestHeader.correlationId() != responseHeader.correlationId())
> throw new IllegalStateException("Correlation id for response (" +
> responseHeader.correlationId()
> + ") does not match request (" +
> requestHeader.correlationId() + ")");
> }
> But in the async mode, as mentioned above, the mismatch is normal, and
> likely happen.
> So here is it enough to process the problem by just throwing an exception ?