[ 
https://issues.apache.org/jira/browse/KAFKA-1779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14214841#comment-14214841
 ] 

Guozhang Wang commented on KAFKA-1779:
--------------------------------------

Hi Magnus,

The semantics of maxWait is not for the un-reached offsets, i.e. if you send a 
fetch request whose starting offset is "out-of-range" meaning the offset does 
not exist on the log, it will return you an empty response with the error code 
set to "OutOfRange" immediately.

maxWait is for minBytes, another config in the fetch request indicating the 
minimum number of bytes returned in the response. For example, if you send a 
fetch with starting offset 5 and minBytes 100, and assuming each message is 10 
bytes and the log has message from offset 0 to 10, then if it returns you the 
messageset of 5 to 10 it will only have 60 bytes, so instead it will hold the 
fetch request and wait until more messages are appended to the log, until there 
are messages with offset 14.

> FetchRequest.maxWait has no effect
> ----------------------------------
>
>                 Key: KAFKA-1779
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1779
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.8.2
>            Reporter: Magnus Vojbacke
>            Priority: Minor
>             Fix For: 0.8.2
>
>
> Setting the maxWait field in a kafka.api.FetchRequest  does not appear to 
> have any effect. Whereas my assumption is: If I send a fetch request for 
> messages after offset X for a partition where there are currently no messages 
> with offsets after X, I would expect that a Fetch request built with the 
> maxWait option should block on the broker side for $maxWait milliseconds for 
> a new message to arrive.
> Currently, the request seems to return an empty result immediately. As a 
> result, our client is forced to manually sleep on each fetch request that 
> returns empty results.
> On the mailing list, it was stated that this bug should be fixed in 0.8.2, 
> but I'm still seeing this issue on 0.8.2-beta.
> {code}
>   // Chose a suitable topic / partition / lead broker combination
>   val host = ???
>   val port = ???
>   val topic = ???
>   val partition = ???
>   val cons = new SimpleConsumer(host, port, 50000000, 50000000, "my-id")
>     val topicAndPartition = new TopicAndPartition(topic, partition)
>     println(topicAndPartition)
>     val requestInfo = Map(topicAndPartition -> new 
> PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime, 1))
>     println(requestInfo)
>     val request = new OffsetRequest(requestInfo)
>     println(request)
>     val response: OffsetResponse = cons.getOffsetsBefore(request)
>     println("code=" + 
> response.partitionErrorAndOffsets(topicAndPartition).error)
>     println(response)
>     val offset = 
> response.partitionErrorAndOffsets(topicAndPartition).offsets(0)
>     val req = new FetchRequestBuilder().clientId("my-id").addFetch(topic, 
> partition, offset, 500000).maxWait(10000)
> // The following requests appear to return within a few hundred milliseconds 
> of 
> // eachother, but my assumption is that maxWait 10000 milliseconds should
> // make each request block for at least 10000 milliseconds before returning an
> // empty result.
>     println(System.currentTimeMillis + " " + cons.fetch(req.build()))
>     println(System.currentTimeMillis + " " + cons.fetch(req.build()))
>     println(System.currentTimeMillis + " " + cons.fetch(req.build()))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to