[
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13843400#comment-13843400
]
Guozhang Wang commented on KAFKA-1150:
--------------------------------------
Another way of doing this would be change the highWatermarkValue from
AtomicLong to AtomicReference of a pair, with both logical and physical offset.
For backward-compatibility, at the first call HW gets incremented, search the
physical offset using the index just once.
> Fetch on a replicated topic does not return as soon as possible
> ---------------------------------------------------------------
>
> Key: KAFKA-1150
> URL: https://issues.apache.org/jira/browse/KAFKA-1150
> Project: Kafka
> Issue Type: Bug
> Components: core, replication
> Affects Versions: 0.8.0
> Reporter: Andrey Balmin
> Assignee: Neha Narkhede
>
> I see a huge performance difference between replicated and not replicated
> topics. On my laptop, running two brokers, I see producer-2-consumer latency
> of under 1ms for topics with one replica.
> However, with two replicas the same latency equals to the max fetch delay.
> Here is a simple test I just did:
> one producer thread in a loop sending one message and sleeping for 2500ms,
> and one consumer thread looping on the long poll with max fetch delay of 1000
> ms.
> Here is what happens with no replication:
> Produced 1 key: key1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:53.823
> Consumed up to 1 at time: 15:33:54.825
> Produced 2 key: key2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:56.326
> Consumed up to 2 at time: 15:33:57.328
> Produced 3 key: key3 at time: 15:33:57.827
> Consumed up to 3 at time: 15:33:57.827
> The are no delays between the message being produced and consumed -- this is
> the behavior I expected.
> Here is the same test, but for a topic with two replicas:
> Consumed up to 0 at time: 15:50:29.575
> Produced 1 key: key1 at time: 15:50:29.575
> Consumed up to 1 at time: 15:50:30.577
> Consumed up to 1 at time: 15:50:31.579
> Consumed up to 1 at time: 15:50:32.078
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> Consumed up to 2 at time: 15:50:34.081
> Consumed up to 2 at time: 15:50:34.581
> Produced 3 key: key3 at time: 15:50:34.581
> Consumed up to 3 at time: 15:50:35.584
> Notice how the fetch always returns as soon as the produce request is issued,
> but without the new message, which consistently arrives ~1002 ms later.
> Below is the request log snippet for this part:
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> You can see the first FetchRequest returns at the same time as the replica
> FetchRequest, but this fetch response is *empty* -- the message is not
> committed yet, so it cannot be returned. The message is committed at
> 15:50:32,079. However, the next FetchRequest (that does return the message)
> comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it
> waiting for the full 1000 ms, instead of returning right away?
> [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name:
> ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks:
> 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest;
> Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms;
> MinBytes: 1 bytes; RequestInfo: [test_topic,0] ->
> PartitionFetchInfo(129,1024000) from client
> /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
> (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest;
> Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0;
> ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0]
> -> PartitionFetchInfo(129,1048576) from client
> /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
> (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name:
> FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1;
> MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] ->
> PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest;
> Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs:
> 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client
> /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
> (kafka.request.logger)
> [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name:
> FetchRequest; Version: 0; CorrelationId: 3464; ClientId:
> ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576)
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,581] TRACE Completed request:Name: FetchRequest;
> Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0;
> ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0]
> -> PartitionFetchInfo(130,1048576) from client
> /127.0.0.1:63056;totalTime:503,queueTime:1,localTime:0,remoteTime:502,sendTime:0
> (kafka.request.logger)
> [2013-11-25 15:50:32,582] TRACE Processor 0 received request : Name:
> FetchRequest; Version: 0; CorrelationId: 3465; ClientId:
> ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes;
> RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576)
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:33,081] TRACE Completed request:Name: FetchRequest;
> Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms;
> MinBytes: 1 bytes; RequestInfo: [test_topic,0] ->
> PartitionFetchInfo(129,1024000) from client
> /0:0:0:0:0:0:0:1%0:63264;totalTime:1003,queueTime:0,localTime:1,remoteTime:1001,sendTime:1
> (kafka.request.logger)
> ----------
> Environment note: I first noticed this behavior on three brokers running on
> three Ubuntu EC2 instances. I then boiled it down to this simple test running
> two brokers on a Mac laptop.
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)