RE: Details of segment deletion

2018-06-14 Thread Simon Cooper
Thanks, that's answered all my questions!

Simon

-Original Message-
From: Gwen Shapira  
Sent: 13 June 2018 02:42
To: Users 
Subject: Re: Details of segment deletion

See below:

On Mon, Jun 11, 2018 at 3:36 AM, Simon Cooper < 
simon.coo...@featurespace.co.uk> wrote:

> Hi,
>
> I've ben trying to work out the details of when exactly kafka log 
> segments get deleted for to the retention period, so it would be 
> helpful if someone could clarify the behaviour:
>
>
>   *   Is a segment only deleted when all messages in that segment have
> 'timed out', or are messages deleted within each segment?
>

Kafka only deletes entire segments (except for compacted topics, which are a 
different story)



>   *   Does the server artificially limit the messages returned to clients
> to those within the retention period, even if they still exist in the 
> segment file?
>

Older messages can be read if the segment wasn't deleted yet. You can check the 
"beginning of log" offset JMX metric to see what is the oldest offset available 
to consumers on each partition.


>   *   Does the segment deletion happen when a new segment is created, or
> is it done as a separate operation by the log cleaner?
>

Separate operation by log cleaner, but note that active segment is never 
deleted so sometimes you are waiting for new segment to get created before a 
new one is deleted.


>
> Thanks for the help!
> Simon Cooper
>



--
*Gwen Shapira*
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter 
<https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Ftwitter.com%2FConfluentInc=02%7C01%7Csimon.cooper%40featurespace.co.uk%7C19c5069967754ce17f7808d5d0cee3e4%7C19e863aab068484d9f9f990b545c5a0f%7C0%7C0%7C636644509362773827=kXauaDyHFdS55Ce3i4Q4Gm5Z%2FNvKSacckLKz4l1WibY%3D=0>
 | blog 
<https://emea01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.confluent.io%2Fblog=02%7C01%7Csimon.cooper%40featurespace.co.uk%7C19c5069967754ce17f7808d5d0cee3e4%7C19e863aab068484d9f9f990b545c5a0f%7C0%7C1%7C636644509362773827=2Myog3LDhQjJZ9grPFAoMGyAUrlEvKPN2dr2t9vWH1U%3D=0>


Details of segment deletion

2018-06-11 Thread Simon Cooper
Hi,

I've ben trying to work out the details of when exactly kafka log segments get 
deleted for to the retention period, so it would be helpful if someone could 
clarify the behaviour:


  *   Is a segment only deleted when all messages in that segment have 'timed 
out', or are messages deleted within each segment?
  *   Does the server artificially limit the messages returned to clients to 
those within the retention period, even if they still exist in the segment file?
  *   Does the segment deletion happen when a new segment is created, or is it 
done as a separate operation by the log cleaner?

Thanks for the help!
Simon Cooper


RE: Getting the consumer to operate deterministically with small messages

2017-05-22 Thread Simon Cooper
Forgot to mention - this is on 0.9. We can't upgrade to 0.10 yet, as we haven't 
upgraded our brokers.

-Original Message-
From: Simon Cooper [mailto:simon.coo...@featurespace.co.uk] 
Sent: 22 May 2017 16:05
To: users@kafka.apache.org
Subject: Getting the consumer to operate deterministically with small messages

Hi,

I'm having significant problems getting the kafka consumer to operate 
deterministically with small message numbers & sizes (this is for local 
testing).

I'm controlling the offset manually, and using manual partition/topic 
assignment. I've set auto commit off, and set fetch.min.bytes to 1.

However, with 2 small (4-byte) messages in the queue, calling poll(0) on the 
consumer does not return them - it returns an empty list. poll(1) doesn't 
always return the messages either (sometimes it does, sometimes it doesn't).

How do I get the consumer to operate entirely deterministically - so that when 
I call poll(n) it is guaranteed to return all the messages available in the 
queue at that point, regardless of how small they are or how few messages there 
are in the queue, without blocking?

Thanks,
SimonC


Getting the consumer to operate deterministically with small messages

2017-05-22 Thread Simon Cooper
Hi,

I'm having significant problems getting the kafka consumer to operate 
deterministically with small message numbers & sizes (this is for local 
testing).

I'm controlling the offset manually, and using manual partition/topic 
assignment. I've set auto commit off, and set fetch.min.bytes to 1.

However, with 2 small (4-byte) messages in the queue, calling poll(0) on the 
consumer does not return them - it returns an empty list. poll(1) doesn't 
always return the messages either (sometimes it does, sometimes it doesn't).

How do I get the consumer to operate entirely deterministically - so that when 
I call poll(n) it is guaranteed to return all the messages available in the 
queue at that point, regardless of how small they are or how few messages there 
are in the queue, without blocking?

Thanks,
SimonC


0.9 client persistently high CPU usage

2016-06-21 Thread Simon Cooper
Hi all,

We've got a problem with high CPU usage on a 0.9 client. We've got a monitoring 
system that polls kafka topics for metadata (to get the last message offset) 
every so often, and this has started using very high CPU continuously. We're 
seeing the following being spammed in the logs every 100ms:

2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Initialize connection to node -1 for sending metadata request
2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Initiating connection to node -1 at :9092.
2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Error connecting to node -1 at :9092:
java.nio.channels.ClosedByInterruptException: null
at 
java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
 ~[na:1.8.0_60-ea]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:659) 
~[na:1.8.0_60-ea]
at org.apache.kafka.common.network.Selector.connect(Selector.java:153) 
~[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:489) 
[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient.access$400(NetworkClient.java:47) 
[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:624)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:543)
 [monitoring-collector.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:254) 
[monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.fetchCommittedOffsets(ConsumerCoordinator.java:290)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.refreshCommittedOffsetsIfNeeded(ConsumerCoordinator.java:272)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1299)
 [monitoring-collector.jar:na]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1106)
 [monitoring-collector.jar:na]
...
2016-06-21T13:21:10,355 | DEBUG | o.a.k.c.NetworkClient [pool-11-thread-8] | 
Give up sending metadata request since no node is available

This is a single-broker cluster (id: 1), all on a single machine. There is 
nothing being logged in the broker logs.

Can anyone help work out what is going wrong, and how we could fix it? In 
particular, the '-1' node id is suspicious, but we can't work out where this 
value is coming from.

Thanks,
SimonC


0.9 consumer log spam - Marking the coordinator dead

2016-02-05 Thread Simon Cooper
Hi,

We've just updated to the 0.9 client & broker, and we're suddenly seeing a lot 
of log spam in the consumers:

2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.

This is repeated constantly, roughly a whole bunch of messages every 10 
minutes. Otherwise the system seems to be operating fine.

Can anyone explain what these messages mean, and what, if anything, we need to 
do about it?

Thanks,
SimonC


FW: 0.9 consumer log spam - Marking the coordinator dead

2016-02-05 Thread Simon Cooper
Actually, this is incorrect - it looks like the consumer does not receive any 
messages until the *first* 'coordinator dead' message is logged!

Is anyone able to offer insight into what's going on here?

Simon

-Original Message-
From: Simon Cooper [mailto:simon.coo...@featurespace.co.uk] 
Sent: 05 February 2016 10:02
To: users@kafka.apache.org
Subject: 0.9 consumer log spam - Marking the coordinator dead

Hi,

We've just updated to the 0.9 client & broker, and we're suddenly seeing a lot 
of log spam in the consumers:

2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.
2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
[kafkaspout-thread-0] | Response-events:7 | Marking the coordinator 2147483646 
dead.

This is repeated constantly, roughly a whole bunch of messages every 10 
minutes. Otherwise the system seems to be operating fine.

Can anyone explain what these messages mean, and what, if anything, we need to 
do about it?

Thanks,
SimonC


RE: FW: 0.9 consumer log spam - Marking the coordinator dead

2016-02-05 Thread Simon Cooper
Thanks, I'll have a look - is there a 0.9.0.1 release planned soon?

Simon

-Original Message-
From: isma...@gmail.com [mailto:isma...@gmail.com] On Behalf Of Ismael Juma
Sent: 05 February 2016 13:15
To: users@kafka.apache.org
Subject: Re: FW: 0.9 consumer log spam - Marking the coordinator dead

Hi Simon,

It may be worth trying the 0.9.0 branch as it includes a number of important 
fixes to the new consumer.

Ismael

On Fri, Feb 5, 2016 at 12:33 PM, Simon Cooper < 
simon.coo...@featurespace.co.uk> wrote:

> Actually, this is incorrect - it looks like the consumer does not 
> receive any messages until the *first* 'coordinator dead' message is logged!
>
> Is anyone able to offer insight into what's going on here?
>
> Simon
>
> -Original Message-
> From: Simon Cooper [mailto:simon.coo...@featurespace.co.uk]
> Sent: 05 February 2016 10:02
> To: users@kafka.apache.org
> Subject: 0.9 consumer log spam - Marking the coordinator dead
>
> Hi,
>
> We've just updated to the 0.9 client & broker, and we're suddenly 
> seeing a lot of log spam in the consumers:
>
> 2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
> [kafkaspout-thread-0] | Response-events:7 | Marking the coordinator
> 2147483646 dead.
> 2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
> [kafkaspout-thread-0] | Response-events:7 | Marking the coordinator
> 2147483646 dead.
> 2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
> [kafkaspout-thread-0] | Response-events:7 | Marking the coordinator
> 2147483646 dead.
> 2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
> [kafkaspout-thread-0] | Response-events:7 | Marking the coordinator
> 2147483646 dead.
> 2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
> [kafkaspout-thread-0] | Response-events:7 | Marking the coordinator
> 2147483646 dead.
> 2016-02-05T03:31:34,182 | INFO | o.a.k.c.c.i.AbstractCoordinator 
> [kafkaspout-thread-0] | Response-events:7 | Marking the coordinator
> 2147483646 dead.
>
> This is repeated constantly, roughly a whole bunch of messages every 
> 10 minutes. Otherwise the system seems to be operating fine.
>
> Can anyone explain what these messages mean, and what, if anything, we 
> need to do about it?
>
> Thanks,
> SimonC
>


Does MirrorMaker delete topics?

2016-01-05 Thread Simon Cooper
Hi all,

In the kafka docs, it mentions that MirrorMaker will automatically create 
topics on the mirror cluster if the right options are set. Does it 
automatically delete topics on the mirror that are deleted on the main cluster 
as well?

Thanks,
SimonC


RE: Does MirrorMaker delete topics?

2016-01-05 Thread Simon Cooper
Hmm, could this feature be added in the future?

Although a better solution might be to turn mirrormaker into something like 
'hidden replicas', where a certain set of brokers in a cross-site super-cluster 
replicate topics onto the secondary site but don't take part in topic 
leadership - is this on the cards at all?

Thanks,
SimonC

-Original Message-
From: Stevo Slavić [mailto:ssla...@gmail.com] 
Sent: 05 January 2016 14:04
To: users@kafka.apache.org
Subject: Re: Does MirrorMaker delete topics?

AFAIK "if the right options are set" actually means "if 
auto.create.topics.enable is left to default, set to true". As on any Kafka 
cluster with this configuration option, this will allow implicit topic 
creation, e.g. on first message being published to the topic, if topic does not 
exist it will get created. There are no other guarantees when it comes to 
syncing topic metadata - topic metadata can be completely different e.g.
different number of partitions, different replica assignment including 
preferred lead, and also topic can get deleted on source and will not be 
deleted on mirror cluster. MirrorMaker only syncs data, not metadata.

Kind regards,
Stevo Slavic.

On Tue, Jan 5, 2016 at 2:53 PM, Simon Cooper < simon.coo...@featurespace.co.uk> 
wrote:

> Hi all,
>
> In the kafka docs, it mentions that MirrorMaker will automatically 
> create topics on the mirror cluster if the right options are set. Does 
> it automatically delete topics on the mirror that are deleted on the 
> main cluster as well?
>
> Thanks,
> SimonC
>


API to query cluster metadata on-demand

2015-09-03 Thread Simon Cooper
Is there a basic interface in the new client APIs to get the list of topics on 
a cluster, and get information on the topics (offsets, sizes, etc), without 
having to deal with a producer or consumer? I just want a basic synchronous API 
to query the metadata as-is. Does this exist in some form?

Thanks,
Simon


Topic partitions randomly failed on live system

2015-08-17 Thread Simon Cooper
Hi,

We've had an issue on a live system (3 brokers, ~10 topics, some replicated, 
some partitioned) where a partition wasn't properly reassigned, causing several 
other partitions to go down.

First, this exception happened on broker 1 (we weren't doing anything 
particular on the system at the time):

ERROR [AddPartitionsListener on 1]: Error while handling add partitions for 
data path /brokers/topics/topic1 
(kafka.controller.PartitionStateMachine$AddPartitionsListener)
java.util.NoSuchElementException: key not found: [topic1,0]
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:58)
at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
at 
kafka.controller.ControllerContext$$anonfun$replicasForPartition$1.apply(KafkaController.scala:112)
at 
kafka.controller.ControllerContext$$anonfun$replicasForPartition$1.apply(KafkaController.scala:111)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
   at 
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
at scala.collection.SetLike$class.map(SetLike.scala:93)
at scala.collection.AbstractSet.map(Set.scala:47)
at 
kafka.controller.ControllerContext.replicasForPartition(KafkaController.scala:111)
at 
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:485)
at 
kafka.controller.PartitionStateMachine$AddPartitionsListener$$anonfun$handleDataChange$1.apply$mcV$sp(PartitionStateMachine.scala:530)
at 
kafka.controller.PartitionStateMachine$AddPartitionsListener$$anonfun$handleDataChange$1.apply(PartitionStateMachine.scala:519)
at 
kafka.controller.PartitionStateMachine$AddPartitionsListener$$anonfun$handleDataChange$1.apply(PartitionStateMachine.scala:519)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at 
kafka.controller.PartitionStateMachine$AddPartitionsListener.handleDataChange(PartitionStateMachine.scala:518)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:547)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)

At this point, broker 2 started continually spammed these messages (mentioning 
other topics, not just topic1):

ERROR [ReplicaFetcherThread-0-1], Error for partition [othertopic1,2] to broker 
1:class kafka.common.UnknownException (kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-1], Error for partition [othertopic2,0] to broker 
1:class kafka.common.UnknownException (kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-1], Error for partition [othertopic3,0] to broker 
1:class kafka.common.UnknownException (kafka.server.ReplicaFetcherThread)
ERROR [ReplicaFetcherThread-0-1], Error for partition [topic1,0] to broker 
1:class kafka.common.UnknownException (kafka.server.ReplicaFetcherThread)

And broker 1 had these messages, but only for topic1:

ERROR [KafkaApi-1] error when handling request Name: FetchRequest; Version: 0; 
CorrelationId: 41182755; ClientId: ReplicaFetcherThread-0-1; ReplicaId: 2; 
MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [topic1,0] - 
PartitionFetchInfo(0,1048576) (kafka.server.KafkaApis)
kafka.common.NotAssignedReplicaException: Leader 1 failed to record follower 
2's position 0 since the replica is not recognized to be one of the assigned 
replicas 1 for partition [topic1,0]
at 
kafka.server.ReplicaManager.updateReplicaLEOAndPartitionHW(ReplicaManager.scala:574)
at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:388)
at 
kafka.server.KafkaApis$$anonfun$recordFollowerLogEndOffsets$2.apply(KafkaApis.scala:386)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at 
kafka.server.KafkaApis.recordFollowerLogEndOffsets(KafkaApis.scala:386)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:351)
at kafka.server.KafkaApis.handle(KafkaApis.scala:60)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
at java.lang.Thread.run(Thread.java:745)

At this time, any topic that had broker 1 as a leader were not working. ZK 
thought that everything was ok and in sync.


RE: new consumer api?

2015-08-04 Thread Simon Cooper
Reading on the consumer docs, there's no mention of a relatively simple 
consumer that doesn't need groups, coordinators, commits, anything like that - 
just read and poll from specified offsets of specific topic partitions - but 
automatically deals with leadership changes and connection losses (so one level 
up from SimpleConsumer).

Will the new API be able to be used in this relatively simple way?
SimonC

-Original Message-
From: Jun Rao [mailto:j...@confluent.io] 
Sent: 03 August 2015 18:19
To: users@kafka.apache.org
Subject: Re: new consumer api?

Jalpesh,

We are still iterating on the new consumer a bit and are waiting for some of 
the security jiras to be committed. So now, we are shooting for releasing 0.8.3 
in Oct (just updated 
https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).

Thanks,

Jun

On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia  
jalpesh.pata...@clickbank.com wrote:

 Hello guys,

 A while ago i read that the new consumer api was going to be released 
 sometime in July as part of the 0.8.3/0.9 release.
 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan


 Do we have an update when we think that can happen?


 Thanks,

 Jalpesh


 -- PRIVILEGED AND CONFIDENTIAL This transmission may contain 
 privileged, proprietary or confidential information. If you are not 
 the intended recipient, you are instructed not to review this 
 transmission. If you are not the intended recipient, please notify the 
 sender that you received this message and delete this transmission from your 
 system.



Is the new consumer API ready?

2015-07-10 Thread Simon Cooper
I'm updating the kafka APIs we use to the new standalone ones, but it look like 
the new consumer isn't ready yet (the code has got lots of placeholders etc), 
and there's only the producer in the Javadoc at 
http://kafka.apache.org/082/javadoc/index.html. Is there an ETA on when the new 
consumer will be ready?

SimonC
This message, and any files/attachments transmitted together with it, is 
intended for the use only of the person (or persons) to whom it is addressed. 
It may contain information which is confidential and/or protected by legal 
privilege. Accordingly, any dissemination, distribution, copying or use of this 
message, or any part of it or anything sent together with it, other than by 
intended recipients, may constitute a breach of civil or criminal law and is 
hereby prohibited. Unless otherwise stated, any views expressed in this message 
are those of the person sending it and not the sender's employer. No 
responsibility, legal or otherwise, of whatever nature, is accepted as to the 
accuracy of the contents of this message or for the completeness of the message 
as received. Anyone who is not the intended recipient of this message is 
advised to make no use of it and is requested to contact Featurespace Limited 
as soon as possible. Any recipient of this message who has knowledge or 
suspects that it may have been the subject of unauthorised interception or 
alteration is also requested to contact Featurespace Limited.


SimpleConsumer long poll test case randomly times out

2014-05-15 Thread Simon Cooper
I've got a very strange bug doing a long poll using SimpleConsumer on 0.8.1.1. 
The test app I've got (attached) uses a thread to do a long poll of a topic, 
and then sends single messages to that queue, one at a time, waiting for the 
poll to return the message before sending the next one.

The problem is that when a message is sent to the queue, the long poll returns 
immediately, but that does not contain the just-sent message. So the long poll 
restarts. Sometime this second poll returns immediately with the sent message, 
but sometimes it waits for the poll to timeout before returning the message. 
This second behaviour happens much more on a replicated topic, but is also 
reproducible with the test VMs on https://github.com/stealthly/scala-kafka.

A sample log file is below. The long poll timeout is 5000ms. Messages 1 and 2 
are delayed, messages 0 and 2 aren't. Every message requires two polls to 
actually be received:

410 [pool-1-thread-1] INFO Making request from offset 110
1457 [main] INFO Sending 0
1553 [pool-1-thread-1] WARN Received no messages
1553 [pool-1-thread-1] INFO Making request from offset 110
1560 [pool-1-thread-1] INFO Received 0
1561 [pool-1-thread-1] INFO Making request from offset 111
1561 [main] INFO Got response for 0 in 20ms
1561 [main] INFO Sending 1
1565 [pool-1-thread-1] WARN Received no messages
1565 [pool-1-thread-1] INFO Making request from offset 111
6567 [pool-1-thread-1] INFO Received 1
6567 [pool-1-thread-1] INFO Making request from offset 112
6568 [main] INFO Got response for 1 in 5004ms
6568 [main] INFO Sending 2
6574 [pool-1-thread-1] WARN Received no messages
6574 [pool-1-thread-1] INFO Making request from offset 112
11577 [pool-1-thread-1] INFO Received 2
11577 [pool-1-thread-1] INFO Making request from offset 113
11578 [main] INFO Got response for 2 in 5006ms
11578 [main] INFO Sending 3
11584 [pool-1-thread-1] WARN Received no messages
11584 [pool-1-thread-1] INFO Making request from offset 113
11585 [pool-1-thread-1] INFO Received 3
11586 [pool-1-thread-1] INFO Making request from offset 114
11586 [main] INFO Got response for 3 in 5ms

Ideally, the message sent would be returned immediately by the first long poll. 
But the obvious bug is the artificial delay introduced by the message not being 
returned until the long poll times out, when it should return immediately (as 
the minimum message size is 1 byte, and there's a 4 byte message available).

This looks like a bug in kafka, but I may be using the consumer API 
incorrectly...

Thanks,
SimonC


Finding out how underreplicated a broker is

2014-05-15 Thread Simon Cooper
In the output of kafka-topics.sh -describe, it gives the ISR. What I want to 
find out is why a particular broker is not in ISR, how far behind it is, and 
when it will get back into the ISR. I can't seem to find this information in 
the logs, cmdline output, or zookeeper. How can I find out how far behind the 
ISR a particular broker is?

SimonC


Consuming from a replica

2013-11-27 Thread Simon Cooper
Hi,

I've been looking at the SimpleConsumer example, and that I've noticed that it 
always reads from the leader, and reacts to leader changes by reconnecting to 
the new leader. Is it possible to read from a replica in ISR that's not the 
leader? If so, how does the consumer get notified the replica it's reading from 
is no longer in ISR?

Thanks,
SimonC


Relationship between socket timeout consumer wait timeout

2013-11-26 Thread Simon Cooper
I'm trying to use the SimpleConsumer to consume a queue, using long polling. 
I'm encountering some difficulties when using very long poll times (60s or 
more). It seems to be conflicting with the socket timeout.

I thought the socket timeout was the timeout to actually connect to the broker, 
then the wait timeout determined how long the long poll should be for. This 
doesn't seem to be the case - the poll is throwing a SocketTimeoutException 
even if it seems to have connected to the broker successfully. Can anyone 
advise on this behaviour, and explain the relationship between socket timeout 
and wait timeout?

Also, is there any way of specifying an indefinite long poll for a consumer?

Thanks,
SimonC