I'm finding that if I continuously produce values to a topic (say,
once every 2 seconds), and in another thread, query the head and tail
offsets of a topic, then sometimes I see the head offset increasing,
sometimes its frozen. What's up with that?
I'm using scala client: 0.8.2 and server: 2.9.2-0.8.1.1
I querying the head and tail offsets like this:
private def getOffset(consumer: SimpleConsumer, topic: String,
partition: Int, whichTime: Long): Long = {
val topicAndPartition = new TopicAndPartition(topic, partition);
val response = consumer.earliestOrLatestOffset(
topicAndPartition,
earliestOrLatest = whichTime,
consumerId= 0);
return response;
}
case class HeadAndTailOffsets(head: Long, tail: Long)
def getHeadAndTailOffsets(consumer: SimpleConsumer, topic: String,
partition: Int = 0): HeadAndTailOffsets =
HeadAndTailOffsets(
head = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.EarliestTime),
tail = getOffset(consumer, topic, partition,
kafka.api.OffsetRequest.LatestTime))
-----
If I run a producer, consumer, and offset reporter threads. On the
first run I might get something like this:
-----
producer consumer offsets
offset,message head,tail
"MSG-0" 0, "MSG-0" 0,1
"MSG-1" 1, "MSG-1" 0,2
"MSG-2" 2, "MSG-2" 0,3
... ... ...
-----
On subsequent runs, I might see something like this:
-----
producer consumer offsets
offset,message head,tail
"MSG-0" 10, "MSG-0” 0,21 ** tail is frozen
"MSG-1" 11, "MSG-1" 0,21
"MSG-2" 12, "MSG-2” 0,21 ** lies, damn lies
..
"MSG-31" 31,"MSG-21" 0,21
"MSG-32" 31,"MSG-22" 0,21
... ... ...
-----
i.e. the consumer sees increasing offsets with the received messages,
but the thread reporting the topic's head and tail offsets is frozen.
Is this a client bug or an issue with my usage?
I have a fuller code sample here:
http://stackoverflow.com/questions/28663714/why-is-kafkas-latest-offset-report-sometimes-frozen
Thanks
- Stuart