Re: 0.9.0.0 remaining jiras

2015-09-14 Thread Jiangjie Qin
HI Jun,

Can we also include KAFKA-2448 in 0.9 as well? We see this issue a few
times before and that cause replica fetcher threads not startup.

Thanks,

Jiangjie (Becket) Qin

On Sat, Sep 12, 2015 at 9:40 AM, Jun Rao  wrote:

> The following is a candidate list of jiras that we want to complete in the
> upcoming release (0.9.0.0). Our goal is to finish at least all the blockers
> and as many as the non-blockers possible in that list.
>
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20fixVersion%20%3D%200.9.0.0
>
> Anything should be added/removed from this list?
>
> We are shooting to cut an 0.9.0.0 release branch in early October.
>
> Thanks,
>
> Jun
>


Re: configuring log compaction

2015-08-09 Thread Jiangjie Qin
Actually Kafka only support two mutually exclusive log cleanup policy: 1)
delete logs after retention period passed. 2) compact the log to only keep
the last updated value of a key.

log.retention.hours is only used by (1). For log compaction, currently it
is not compacting the logs by its age, but by the dirty ratio in bytes
(uncompacted log size / total log size). The config is
log.cleaner.min.cleanable.ratio.

So it might be a little bit hard to enforce the policy of "compact log
after 7 days".

Jiangjie (Becket) Qin

​


Re: Get last snapshot from compacted topic

2015-08-08 Thread Jiangjie Qin
You are looking for a K-V store here, so the general answer is no. But
Kafka does have an internal K-V store but only for consumer offsets. So
there are some tricks we can play:

If your processing node is consuming from Kafka and the offset of a
snapshot is the offset of a Kafka partition, you can simply commit the
offsets to Kafka. Later on when the node restarts, you can fetch the
committed offset and that will be what you want.

If you snapshot offsets has nothing to do with Kafka, you may do the
following:
1. produce your snapshot to the snapshot checkpoint topic and get back the
offset of the produced message.
2. use a consumer to commit the offset returned by broker in step 1 using
the node name as the group name for the checkpoint topic partition.
3. When restart, fetch the offset of using node name as group name and that
offset is what you need.

Thanks,

Jiangjie (Becket) Qin

On Tue, Aug 4, 2015 at 3:16 AM, Aki  wrote:

> I'd like to save a snapshot of a processing node's state in a compacted
> kafka topic. A large number of nodes would save their snapshots in the same
> partition.
>
> What is an efficient way for a (restarted) node to find the offset of its
> latest snapshot? Using just Kafka (no database, local file, etc.), is there
> a more efficient way than to consume the partition from the earliest
> available offset (potentially reading a lot of snapshots of other nodes).
>
> Thanks!


Re: Consumer limit for pub-sub mode

2015-08-08 Thread Jiangjie Qin
That's not true Shaninder. Assuming you are using the high level consumer,
if all the consumers have different group id, each of them will get the all
the messages.

Also, another way to think about this a little bit is that maybe you can
produce the same data to many different topics each have one partition, so
the consumers needs only to consume from one of the topic.

Thanks,

Jiangjie (Becket) Qin

On Mon, Aug 3, 2015 at 10:40 AM, Sharninder  wrote:

> I don't know of any limits as such but I don't think your problem is
> suitable for Kafka. The third point especially wouldn't work with Kafka.
> Using Kafka, only one consumer will get a message out of the 30k
>
>
>
> > On 03-Aug-2015, at 10:39 am, Vaibhav Kirte 
> wrote:
> >
> > Hi,
> >
> > I need to know how many consumers can subscribe to a single topic ( with
> > one partition ).
> >
> > I have a requirement such that,
> >1. The producer will post to 1 topic having 1 partition.
> >1. *20,000-30,000 consumers *should to be able to consume messages.
> >2. All of the consumers should receive all messages that are produced.
> >
> > will this be possible using kafka ?
> > is there a limit on number of consumers ?
> > what will be the number of machines that I will need to satisfy the
> > requirements ?
> >
> > --
> > Regards,
> > Vaibhav Kirte
>


Re: New consumer - offset storage options

2015-08-01 Thread Jiangjie Qin
Yes. That is correct. New consumer will only store offsets in Kafka.

On Tue, Jul 21, 2015 at 8:57 AM, Stevo Slavić  wrote:

> Hello Apache Kafka community,
>
> It seems new high level consumer coming in 0.8.3 will support only offset
> storage in Kafka topic.
> Can somebody please confirm/comment?
>
> Kind regards,
> Stevo Slavic.
>


Re: Regarding using of apache kafka

2015-08-01 Thread Jiangjie Qin
Since Kafka's performance largely depends on the operating system page
cache. So usually people want to use dedicated machines for Kafka. But that
depends on your performance requirement as well. If you only have 5 million
messages per month, I think letting Kafka share the machines with other
systems should be fine.

Jiangjie (Becket) Qin

On Mon, Jul 20, 2015 at 6:25 AM, Jeetendra G 
wrote:

> Hi all
>
> is it good to Kafka only when very less  events daily means 5 millions
> events per month.
>
> I am thinking to use apache Kafka because it integrate nicely with spark
> streaming in any of the big data service providers like
> hortonworks,dataBricks
>
> Do i need to setup and assign different machine for Kafka or we can enable
> the Kafka in the same cluster?
>
> Regards
> Jeetendra
>


Re: Connection to zk shell on Kafka

2015-07-30 Thread Jiangjie Qin
This looks an issue to be fixed. I created KAFKA-2385 for this.

Thanks,

Jiangjie (Becket) Qin

On Wed, Jul 29, 2015 at 10:33 AM, Chris Barlock  wrote:

> I'm a user of Kafka/ZooKeeper not one of its developers, so I can't give
> you a technical explanation.  I do agree that Kafka should ship the jline
> JAR if its zookeeper-shell depends on it.
>
> Chris
>
>
>
>
> From:   Prabhjot Bharaj 
> To: u...@zookeeper.apache.org, d...@kafka.apache.org
> Cc: users@kafka.apache.org
> Date:   07/29/2015 01:27 PM
> Subject:Re: Connection to zk shell on Kafka
>
>
>
> Sure. It would be great if you could as well explain the reason why the
> absence of the jar creates this problem
>
> Also, I'm surprised that zookeeper that comes bundled with kafka 0.8.2
> does
> not have the jline jar
>
> Regards,
> prabcs
>
> On Wed, Jul 29, 2015 at 10:45 PM, Chris Barlock 
> wrote:
>
> > You need the jline JAR file that ships with ZooKeeper.
> >
> > Chris
> >
> > IBM Tivoli Systems
> > Research Triangle Park, NC
> > (919) 224-2240
> > Internet:  barl...@us.ibm.com
> >
> >
> >
> > From:   Prabhjot Bharaj 
> > To: users@kafka.apache.org, u...@zookeeper.apache.org
> > Date:   07/29/2015 01:13 PM
> > Subject:Connection to zk shell on Kafka
> >
> >
> >
> > Hi folks,
> >
> > */kafka/bin# ./zookeeper-shell.sh localhost:2182/*
> >
> > *Connecting to localhost:2182/*
> >
> > *Welcome to ZooKeeper!*
> >
> > *JLine support is disabled*
> >
> >
> >  *WATCHER::*
> >
> >
> >  *WatchedEvent state:SyncConnected type:None path:null*
> >
> >
> > **
> >
> > I'm running 5 node zookeeper cluster on 5-node kafka cluster (each kafka
> > broker has 1 zookeeper server running)
> >
> > When I try connecting to the shell, the shell never says 'Connected'
> >
> >
> >
> > However, if I try connecting on another standalone zookeeper  which has
> no
> > links to kafka, I'm able to connect:-
> >
> >
> > */kafka/bin# /zookeeper/scripts/zkCli.sh -server 127.0.0.1:2181
> > *
> >
> > *Connecting to 127.0.0.1:2181 *
> >
> > *Welcome to ZooKeeper!*
> >
> > *JLine support is enabled*
> >
> >
> > *WATCHER::*
> >
> >
> > *WatchedEvent state:SyncConnected type:None path:null*
> >
> > *[zk: 127.0.0.1:2181(CONNECTED) 0]*
> >
> > Am I missing something?
> >
> >
> > Thanks,
> >
> > prabcs
> >
> >
>
>
> --
> -
> "There are only 10 types of people in the world: Those who understand
> binary, and those who don't"
>
>


Re: KAfka Mirror Maker

2015-07-29 Thread Jiangjie Qin
Mirror Maker does not have specific restrictions on cluster size.

The error you saw was because consumer was not able to talk to the broker.
Can you try to use kafka-console-consumer to consume some data from your
source cluster and see if it works? It should be under KAFKA_HOME/bin/

Jiangjie (Becket) Qin

On Tue, Jul 28, 2015 at 11:01 AM, Prabhjot Bharaj 
wrote:

> Hi,
>
> I'm using Mirror Maker with a cluster of 3 nodes and cluster of 5 nodes.
>
> I would like to ask - is the number of nodes a restriction for Mirror
> Maker?
> Also, are there any other restrictions or properties that should be common
> across both the clusters so that they continue mirroring.
>
>
> I'm asking this because I've got this error while mirroring:-
>
> [2015-07-28 17:51:10,943] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:3,host:a10.2.3.4,port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-07-28 17:51:18,955] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:2,host:10.2.3.5,port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-07-28 17:51:27,043] WARN Fetching topic metadata with correlation id
> 0 for topics [Set(fromIndiaWithLove)] from broker
> [id:5,host:a10.2.3.6port:9092] failed (kafka.client.ClientUtils$)
>
> java.nio.channels.ClosedChannelException
>
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>
> at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
>
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
>
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
>
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
>
> This is what my *consumer config* looks like:-
>
> *zookeeper.connect=10.2.3.4:2182 *
>
> *zookeeper.connection.timeout.ms
> =100*
>
> *consumer.timeout.ms =-1*
>
> *group.id =dp-mirrorMaker-test-datap1*
>
> *shallow.iterator.enable=true*
>
> *auto.create.topics.enable=true*
>
>
>
> I've used the default* producer.properties* in kafka/config/ which has
> these properteis:-
>
> *metadata.broker.list=localhost:9092*
>
>
> *producer.type=sync*
>
> *compression.codec=none*
>
>
> *serializer.class=kafka.serializer.DefaultEncoder*
>
>
> I'm running Mirror Maker via this command:-
>
>
>  /kafka_2.10-0.8.2.0/bin/kafka-run-class.sh kafka.tools.MirrorMaker
> --consumer.config ~/sourceCluster1Consumer.config  --num.streams 1
> --producer.config producer.properties --whitelist=".*"
>
> Regards,
>
> prabcs
>


Re: consumer memory footprint

2015-07-16 Thread Jiangjie Qin
I think the rough calculation of max memory footprint for each high level
consumer would be:

(Number Of Partitions For All Topics) * fetch.message.max.bytes *
queued.max.message.chunks + (some decompression memory cost for a message)

In your case, it would be 10 times above.

Thanks,

Jiangjie (Becket) Qin


On 7/16/15, 1:40 PM, "Kris K"  wrote:

>Hi All,
>
>Is there a way to calculate the amount of memory used per thread in case
>of
>a high level consumer?
>
>I am particularly interested in calculating the memory required by a
>process running 10 high level consumer threads for 15 topics with max.
>file
>size set to 100 MB.
>
>Thanks,
>Kris



Re: Idea how to ensure exactly once message deliver without external storage

2015-07-16 Thread Jiangjie Qin
Hey Stefan,

I think if you follow the one-commit-per-message approach, you will be
able to achieve exact once semantic. However this would be very expensive
and also everything has to be synchronous in order to make it work.

In that sense, de-duplication on client side seems a more favorable option
to me. That implies there has to be some way to persist the delivered
offset on client side so we can still do de-duplication in
crash-then-resume use case.

Thanks,

Jiangjie (Becket) Qin

On 7/16/15, 9:25 AM, "Jason Gustafson"  wrote:

>Hey Stefan,
>
>I only see a commit in the failure case. Were you planning to use
>auto-commits otherwise? You'd probably want to handle all commits directly
>or you'd always be left guessing. But even if you did, I think the main
>problem is that your process could fail before a needed commit is sent to
>the broker. After it resumes, it wouldn't know that a commit had been
>pending and might reprocess some messages.
>
>I think some Kafka devs have been thinking about exactly once semantics,
>but I don't think there's anything solid yet.
>
>-Jason
>
>On Thu, Jul 16, 2015 at 4:07 AM, Stefan Miklosovic 
>wrote:
>
>> Hi,
>>
>> In the old consumer, I have got just a simple stream of messages, one
>> by one and if I detected something was wrong, I would destroy my
>> consumer immediately without commit so once I restart consumer, I will
>> get the same messages once again because they will be delivered to me
>> from the last offset committed (if I understand that correctly).
>>
>> While this can work, I have at-least-once delivery guarantee and that
>> is not good in my case. I need to have exactly-once guarantee.
>>
>> While looking into new consumer, I noticed that there is the
>> possiblity to kind of "rewind" in a partition.
>>
>> My new algorithm is something like this:
>>
>> Partition myPartition;
>>
>> consumer.subscribe(myTopic);
>>
>> ConsumerRecords = consumer.poll(0);
>>
>> for (Record record: ConsumerRecords) {
>>
>> processRecord(record);
>>
>> processedMessages++;
>>
>> if (failure) {
>> int offsetOfLastProcessedRecord = record.offset();
>>
>> // this will effectively rewind me back so I get messages
>> which are not processed yet
>>
>> consumer.seek(myPartition, offsetOfLastProcessedRecord -
>> processedMessages);
>>
>> // here i commit the position of the lastly processed record
>> so on the next poll
>> // i should get messages which were polled before but stayed
>> unprocessed because of the
>> // error
>> consumer.commit(map,
>> CommitType.SYNC);
>> }
>> }
>>
>> Does this approach make sense?
>>
>> --
>> Stefan Miklosovic
>>



Re: Load Balancing Kafka

2015-07-15 Thread Jiangjie Qin
AhŠ It seems you are more focusing on producer side workload balanceŠ If
that is the case, please ignore my previous comments.

Jiangjie (Becket) Qin

On 7/15/15, 6:01 PM, "Jiangjie Qin"  wrote:

>If you have pretty balanced traffic on each partition and have set
>auto.leader.rebalance.enabled to true or false, you might not need to do
>further workload balance.
>
>However, in most cases you probably still need to do some sort of load
>balancing based on the traffic and disk utilization of each broker. You
>might want to do leader migration and/or partition reassignment.
>
>Leader migration is a cheaper rebalance and mostly addresses CPU and
>Network unbalance. Partition reassignment is a much more expensive
>operation as it moves actual data, this can help with disk utilization in
>addition to CPU and network.
>
>Thanks,
>
>Jiangjie (Becket) Qin
>
>On 7/15/15, 5:19 PM, "Sandy Waters"  wrote:
>
>>Hi all,
>>
>>Do I need to load balance against the brokers?  I am using the python
>>driver and it seems to only want a single kafka broker host.  However, in
>>a
>>situation where I have 10 brokers, is it still fine to just give it one
>>host.  Does zookeeper and kafka handle the load balancing and redirect my
>>push somewhere else?
>>
>>Would it hurt if I load balanced with Nginx and had it do round robin to
>>the brokers?
>>
>>Much thanks for any help.
>>
>>-Sandy
>



Re: Load Balancing Kafka

2015-07-15 Thread Jiangjie Qin
If you have pretty balanced traffic on each partition and have set
auto.leader.rebalance.enabled to true or false, you might not need to do
further workload balance.

However, in most cases you probably still need to do some sort of load
balancing based on the traffic and disk utilization of each broker. You
might want to do leader migration and/or partition reassignment.

Leader migration is a cheaper rebalance and mostly addresses CPU and
Network unbalance. Partition reassignment is a much more expensive
operation as it moves actual data, this can help with disk utilization in
addition to CPU and network.

Thanks,

Jiangjie (Becket) Qin

On 7/15/15, 5:19 PM, "Sandy Waters"  wrote:

>Hi all,
>
>Do I need to load balance against the brokers?  I am using the python
>driver and it seems to only want a single kafka broker host.  However, in
>a
>situation where I have 10 brokers, is it still fine to just give it one
>host.  Does zookeeper and kafka handle the load balancing and redirect my
>push somewhere else?
>
>Would it hurt if I load balanced with Nginx and had it do round robin to
>the brokers?
>
>Much thanks for any help.
>
>-Sandy



Re: Offset not committed

2015-07-15 Thread Jiangjie Qin
If that is the case, I guess that might still be some value to try to run
broker and clients locally and see if the issue still exist.

Thanks,

Jiangjie (Becket) Qin

On 7/15/15, 1:23 PM, "Vadim Bobrov"  wrote:

>it is pretty random
>
>On Wed, Jul 15, 2015 at 4:22 PM, Jiangjie Qin 
>wrote:
>
>> I’m not sure if it is related to running in cloud. Do you see this
>> disconnection issue always happening on committing offsets or it happens
>> randomly?
>>
>> Jiangjie (becket) qin
>>
>> On 7/15/15, 12:53 PM, "Vadim Bobrov"  wrote:
>>
>> >there are lots of files under logs directory of the broker, just in
>>case I
>> >checked all modified around the time of error and found nothing unusual
>> >both client and broker are 0.8.2.1
>> >
>> >could it have something to do with running it in the cloud? we are on
>> >Linode and I remember having random disconnections problem with MySQL
>>on
>> >other nodes that since gone forever
>> >
>> >On Wed, Jul 15, 2015 at 3:43 PM, Jiangjie Qin
>>
>> >wrote:
>> >
>> >> Is there anything on the broker log?
>> >> Is it possible that your client and broker are not running on the
>>same
>> >> version?
>> >>
>> >> Jiangjie (Becket) Qin
>> >>
>> >> On 7/15/15, 11:40 AM, "Vadim Bobrov"  wrote:
>> >>
>> >> >caught it, thanks for help!
>> >> >any ideas what to do?
>> >> >
>> >> >TRACE 2015-07-15 18:37:58,070 [chaos-akka.actor.jms-dispatcher-1019
>>]
>> >> >kafka.network.BoundedByteBufferSend - 113 bytes written.
>> >> >ERROR 2015-07-15 18:37:58,078 [chaos-akka.actor.jms-dispatcher-1019
>>]
>> >> >kafka.consumer.ZookeeperConsumerConnector -
>> >> >[chaos-test_ds2.outsmartinc.com-1436984542831-4d399e71], Error while
>> >> >committing offsets.
>> >> >java.io.EOFException: Received -1 when reading from channel, socket
>>has
>> >> >likely been closed.
>> >> >at kafka.utils.Utils$.read(Utils.scala:381)
>> >> >at
>> >>
>> 
>>>>>kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferRecei
>>>>>ve
>> >>>.s
>> >> >cala:54)
>> >> >at
>> >> >kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >> >at
>> >>
>> 
>>>>>kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBuffe
>>>>>rR
>> >>>ec
>> >> >eive.scala:29)
>> >> >at
>> >> >kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>> >> >at
>> >>
>> 
>>>>>kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsu
>>>>>me
>> >>>rC
>> >> >onnector.scala:313)
>> >> >at
>> >>
>> 
>>>>>kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsu
>>>>>me
>> >>>rC
>> >> >onnector.scala:310)
>> >> >at
>> >>
>> 
>>>>>kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(Zookee
>>>>>pe
>> >>>rC
>> >> >onsumerConnector.scala:111)
>> >> >at
>> >>
>> 
>>>>>com.os.messaging.kafka.KafkaMessageService$class.acknowledge(KafkaMess
>>>>>ag
>> >>>eS
>> >> >ervice.scala:55)
>> >> >at
>> >>com.os.actor.Main$$anonfun$2$$anon$2.acknowledge(Main.scala:84)
>> >> >at
>> >>
>> 
>>>>>com.os.actor.acquisition.MessageListenerActor$$anonfun$8.applyOrElse(M
>>>>>es
>> >>>sa
>> >> >geListenerActor.scala:180)
>> >> >at
>> >>
>> 
>>>>>com.os.actor.acquisition.MessageListenerActor$$anonfun$8.applyOrElse(M
>>>>>es
>> >>>sa
>> >> >geListenerActor.scala:164)
>> >> >at
>> >>scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>> >> >at akka.actor.FSM$class.processEvent(FSM.scala:607)
>> >> >at
>> >>
>> 
>>>>>com.os.actor.acquisition.MessageListenerActor.processEvent(MessageList

Re: Offset not committed

2015-07-15 Thread Jiangjie Qin
I’m not sure if it is related to running in cloud. Do you see this
disconnection issue always happening on committing offsets or it happens
randomly?

Jiangjie (becket) qin

On 7/15/15, 12:53 PM, "Vadim Bobrov"  wrote:

>there are lots of files under logs directory of the broker, just in case I
>checked all modified around the time of error and found nothing unusual
>both client and broker are 0.8.2.1
>
>could it have something to do with running it in the cloud? we are on
>Linode and I remember having random disconnections problem with MySQL on
>other nodes that since gone forever
>
>On Wed, Jul 15, 2015 at 3:43 PM, Jiangjie Qin 
>wrote:
>
>> Is there anything on the broker log?
>> Is it possible that your client and broker are not running on the same
>> version?
>>
>> Jiangjie (Becket) Qin
>>
>> On 7/15/15, 11:40 AM, "Vadim Bobrov"  wrote:
>>
>> >caught it, thanks for help!
>> >any ideas what to do?
>> >
>> >TRACE 2015-07-15 18:37:58,070 [chaos-akka.actor.jms-dispatcher-1019 ]
>> >kafka.network.BoundedByteBufferSend - 113 bytes written.
>> >ERROR 2015-07-15 18:37:58,078 [chaos-akka.actor.jms-dispatcher-1019 ]
>> >kafka.consumer.ZookeeperConsumerConnector -
>> >[chaos-test_ds2.outsmartinc.com-1436984542831-4d399e71], Error while
>> >committing offsets.
>> >java.io.EOFException: Received -1 when reading from channel, socket has
>> >likely been closed.
>> >at kafka.utils.Utils$.read(Utils.scala:381)
>> >at
>> 
>>>kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive
>>>.s
>> >cala:54)
>> >at
>> >kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>> >at
>> 
>>>kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferR
>>>ec
>> >eive.scala:29)
>> >at
>> >kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>> >at
>> 
>>>kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsume
>>>rC
>> >onnector.scala:313)
>> >at
>> 
>>>kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsume
>>>rC
>> >onnector.scala:310)
>> >at
>> 
>>>kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(Zookeepe
>>>rC
>> >onsumerConnector.scala:111)
>> >at
>> 
>>>com.os.messaging.kafka.KafkaMessageService$class.acknowledge(KafkaMessag
>>>eS
>> >ervice.scala:55)
>> >at 
>>com.os.actor.Main$$anonfun$2$$anon$2.acknowledge(Main.scala:84)
>> >at
>> 
>>>com.os.actor.acquisition.MessageListenerActor$$anonfun$8.applyOrElse(Mes
>>>sa
>> >geListenerActor.scala:180)
>> >at
>> 
>>>com.os.actor.acquisition.MessageListenerActor$$anonfun$8.applyOrElse(Mes
>>>sa
>> >geListenerActor.scala:164)
>> >at 
>>scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>> >at akka.actor.FSM$class.processEvent(FSM.scala:607)
>> >at
>> 
>>>com.os.actor.acquisition.MessageListenerActor.processEvent(MessageListen
>>>er
>> >Actor.scala:32)
>> >at 
>>akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598)
>> >at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592)
>> >at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>> >at
>> 
>>>com.os.actor.acquisition.MessageListenerActor.aroundReceive(MessageListe
>>>ne
>> >rActor.scala:32)
>> >at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> >at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> >at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>> >at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>> >at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>> >at
>> >scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >at
>> 
>>>scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.ja
>>>va
>> >:1339)
>> >at
>> 
>>>scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >at
>> 
>>>scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.
>>>ja
>> >va:107)
>> >
>> >
>> >On Wed, Jul 1

Re: Offset not committed

2015-07-15 Thread Jiangjie Qin
Is there anything on the broker log?
Is it possible that your client and broker are not running on the same
version?

Jiangjie (Becket) Qin

On 7/15/15, 11:40 AM, "Vadim Bobrov"  wrote:

>caught it, thanks for help!
>any ideas what to do?
>
>TRACE 2015-07-15 18:37:58,070 [chaos-akka.actor.jms-dispatcher-1019 ]
>kafka.network.BoundedByteBufferSend - 113 bytes written.
>ERROR 2015-07-15 18:37:58,078 [chaos-akka.actor.jms-dispatcher-1019 ]
>kafka.consumer.ZookeeperConsumerConnector -
>[chaos-test_ds2.outsmartinc.com-1436984542831-4d399e71], Error while
>committing offsets.
>java.io.EOFException: Received -1 when reading from channel, socket has
>likely been closed.
>at kafka.utils.Utils$.read(Utils.scala:381)
>at
>kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.s
>cala:54)
>at 
>kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>at
>kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferRec
>eive.scala:29)
>at 
>kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
>at
>kafka.consumer.ZookeeperConsumerConnector.liftedTree2$1(ZookeeperConsumerC
>onnector.scala:313)
>at
>kafka.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperConsumerC
>onnector.scala:310)
>at
>kafka.javaapi.consumer.ZookeeperConsumerConnector.commitOffsets(ZookeeperC
>onsumerConnector.scala:111)
>at
>com.os.messaging.kafka.KafkaMessageService$class.acknowledge(KafkaMessageS
>ervice.scala:55)
>at com.os.actor.Main$$anonfun$2$$anon$2.acknowledge(Main.scala:84)
>at
>com.os.actor.acquisition.MessageListenerActor$$anonfun$8.applyOrElse(Messa
>geListenerActor.scala:180)
>at
>com.os.actor.acquisition.MessageListenerActor$$anonfun$8.applyOrElse(Messa
>geListenerActor.scala:164)
>at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>at akka.actor.FSM$class.processEvent(FSM.scala:607)
>at
>com.os.actor.acquisition.MessageListenerActor.processEvent(MessageListener
>Actor.scala:32)
>at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598)
>at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592)
>at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>at
>com.os.actor.acquisition.MessageListenerActor.aroundReceive(MessageListene
>rActor.scala:32)
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>at
>scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>at
>scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java
>:1339)
>at
>scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>at
>scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.ja
>va:107)
>
>
>On Wed, Jul 15, 2015 at 1:36 PM, Vadim Bobrov 
>wrote:
>
>> thanks Joel and Jiangjie,
>> I have figured it out. In addition to my log4j 2 config file I also
>>needed
>> a log4j 1 config file, then it works. Let me trace what happens when the
>> offsets are not committed and report back
>>
>> On Wed, Jul 15, 2015 at 1:33 PM, Joel Koshy  wrote:
>>
>>> - You can also change the log4j level dynamically via the
>>>   kafka.Log4jController mbean.
>>> - You can also look at offset commit request metrics (mbeans) on the
>>>   broker (just to check if _any_ offset commits are coming through
>>>   during the period you see no moving offsets).
>>> - The alternative is to just consume the offsets topic.
>>>
>>> On Wed, Jul 15, 2015 at 05:30:17PM +, Jiangjie Qin wrote:
>>> > I am not sure how your project was setup. But I think it depends on
>>>what
>>> > log4j property file you specified when you started your application.
>>>Can
>>> > you check if you have log4j appender defined and the loggers are
>>> directed
>>> > to the correct appender?
>>> >
>>> > Thanks,
>>> >
>>> > Jiangjie (Becket) Qin
>>> >
>>> > On 7/15/15, 8:10 AM, "Vadim Bobrov"  wrote:
>>> >
>>> > >Thanks Jiangjie,
>>> > >
>>> > >unfortunately turning trace level on does not seem to work (any log
>>> level
>>> > >actually) I am using log4j2 (through slf4j) and de

Re: Offset not committed

2015-07-15 Thread Jiangjie Qin
I am not sure how your project was setup. But I think it depends on what
log4j property file you specified when you started your application. Can
you check if you have log4j appender defined and the loggers are directed
to the correct appender?

Thanks,

Jiangjie (Becket) Qin

On 7/15/15, 8:10 AM, "Vadim Bobrov"  wrote:

>Thanks Jiangjie,
>
>unfortunately turning trace level on does not seem to work (any log level
>actually) I am using log4j2 (through slf4j) and despite including log4j1
>bridge and these lines:
>
>
>
>
>in my conf file I could not squeeze out any logging from kafka. Logging
>for
>all other libs (like zookeeper e.g.) work perfectly. Am I doing something
>wrong?
>
>
>On Tue, Jul 14, 2015 at 6:55 PM, Jiangjie Qin 
>wrote:
>
>> Hi Vadim,
>>
>> Can you turn on trace level logging on your consumer and search for
>> "offset commit response² in the log?
>> Also maybe take a look at the log to see if there is any exception
>>thrown.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On 7/14/15, 11:06 AM, "Vadim Bobrov"  wrote:
>>
>> >just caught this error again. I issue commitOffsets - no error but no
>> >committng offsets either. __consumer_offsets watching shows no new
>> >messages
>> >either. Then in a few minutes I issue commitOffsets again - all
>>committed.
>> >Unless I am doing something terribly wrong this is very unreliable
>> >
>> >On Tue, Jul 14, 2015 at 1:49 PM, Joel Koshy 
>>wrote:
>> >
>> >> Actually, how are you committing offsets? Are you using the old
>> >> (zookeeperconsumerconnector) or new KafkaConsumer?
>> >>
>> >> It is true that the current APIs don't return any result, but it
>>would
>> >> help to check if anything is getting into the offsets topic - unless
>> >> you are seeing errors in the logs, the offset commit should succeed
>> >> (if you are indeed explicitly committing offsets).
>> >>
>> >> Thanks,
>> >>
>> >> Joel
>> >>
>> >> On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
>> >> > Thanks, Joel, I will but regardless of my findings the basic
>>problem
>> >>will
>> >> > still be there: there is no guarantee that the offsets will be
>> >>committed
>> >> > after commitOffsets. Because commitOffsets does not return its exit
>> >> status,
>> >> > nor does it block as I understand until offsets are committed. In
>> >>other
>> >> > words, there is no way to know that it has, in fact, commited the
>> >>offsets
>> >> >
>> >> > or am I missing something? And then another question - why does it
>> >>seem
>> >> to
>> >> > depend on the number of consumed messages?
>> >> >
>> >> > On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy 
>> >> wrote:
>> >> >
>> >> > > Can you take a look at the kafka commit rate mbean on your
>>consumer?
>> >> > > Also, can you consume the offsets topic while you are committing
>> >> > > offsets and see if/what offsets are getting committed?
>> >> > > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
>> >> > >
>> >> > > Thanks,
>> >> > >
>> >> > > Joel
>> >> > >
>> >> > > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
>> >> > > > I am trying to replace ActiveMQ with Kafka in our environment
>> >> however I
>> >> > > > have encountered a strange problem that basically prevents from
>> >>using
>> >> > > Kafka
>> >> > > > in production. The problem is that sometimes the offsets are
>>not
>> >> > > committed.
>> >> > > >
>> >> > > > I am using Kafka 0.8.2.1, offset storage = kafka, high level
>> >> consumer,
>> >> > > > auto-commit = off. Every N messages I issue commitOffsets().
>>Now
>> >> here is
>> >> > > > the problem - if N is below a certain number (180 000 for me)
>>it
>> >> works
>> >> > > and
>> >> > > > the offset is moving. If N is 180 000 or more the offset is not
>> >> updated
>> >> > > > after commitOffsets
>> >> > > >
>> >> > > > I am looking at offsets using kafka-run-class.sh
>> >> > > > kafka.tools.ConsumerOffsetChecker
>> >> > > > Any help?
>> >> > >
>> >> > >
>> >>
>> >>
>>
>>



Re: Java API for fetching Consumer group from Kafka Server(Not Zookeeper)

2015-07-15 Thread Jiangjie Qin
It looks kafka.admin.ConsumerGroupCommand class is what you need.

Jiangjie (Becket) Qin

On 7/14/15, 8:23 PM, "Swati Suman"  wrote:

>Hi Team,
>
>Currently, I am able to fetch the Topic,Partition,Leader,Log Size through
>TopicMetadataRequest API available in Kafka.
>
>Is there any java api that gives me the consumer groups?
>
>Best Regards,
>Swati Suman



Re: Offset not committed

2015-07-14 Thread Jiangjie Qin
Hi Vadim, 

Can you turn on trace level logging on your consumer and search for
"offset commit response² in the log?
Also maybe take a look at the log to see if there is any exception thrown.

Thanks,

Jiangjie (Becket) Qin

On 7/14/15, 11:06 AM, "Vadim Bobrov"  wrote:

>just caught this error again. I issue commitOffsets - no error but no
>committng offsets either. __consumer_offsets watching shows no new
>messages
>either. Then in a few minutes I issue commitOffsets again - all committed.
>Unless I am doing something terribly wrong this is very unreliable
>
>On Tue, Jul 14, 2015 at 1:49 PM, Joel Koshy  wrote:
>
>> Actually, how are you committing offsets? Are you using the old
>> (zookeeperconsumerconnector) or new KafkaConsumer?
>>
>> It is true that the current APIs don't return any result, but it would
>> help to check if anything is getting into the offsets topic - unless
>> you are seeing errors in the logs, the offset commit should succeed
>> (if you are indeed explicitly committing offsets).
>>
>> Thanks,
>>
>> Joel
>>
>> On Tue, Jul 14, 2015 at 12:19:01PM -0400, Vadim Bobrov wrote:
>> > Thanks, Joel, I will but regardless of my findings the basic problem
>>will
>> > still be there: there is no guarantee that the offsets will be
>>committed
>> > after commitOffsets. Because commitOffsets does not return its exit
>> status,
>> > nor does it block as I understand until offsets are committed. In
>>other
>> > words, there is no way to know that it has, in fact, commited the
>>offsets
>> >
>> > or am I missing something? And then another question - why does it
>>seem
>> to
>> > depend on the number of consumed messages?
>> >
>> > On Tue, Jul 14, 2015 at 11:36 AM, Joel Koshy 
>> wrote:
>> >
>> > > Can you take a look at the kafka commit rate mbean on your consumer?
>> > > Also, can you consume the offsets topic while you are committing
>> > > offsets and see if/what offsets are getting committed?
>> > > (http://www.slideshare.net/jjkoshy/offset-management-in-kafka/32)
>> > >
>> > > Thanks,
>> > >
>> > > Joel
>> > >
>> > > On Tue, Jul 14, 2015 at 11:12:03AM -0400, Vadim Bobrov wrote:
>> > > > I am trying to replace ActiveMQ with Kafka in our environment
>> however I
>> > > > have encountered a strange problem that basically prevents from
>>using
>> > > Kafka
>> > > > in production. The problem is that sometimes the offsets are not
>> > > committed.
>> > > >
>> > > > I am using Kafka 0.8.2.1, offset storage = kafka, high level
>> consumer,
>> > > > auto-commit = off. Every N messages I issue commitOffsets(). Now
>> here is
>> > > > the problem - if N is below a certain number (180 000 for me) it
>> works
>> > > and
>> > > > the offset is moving. If N is 180 000 or more the offset is not
>> updated
>> > > > after commitOffsets
>> > > >
>> > > > I am looking at offsets using kafka-run-class.sh
>> > > > kafka.tools.ConsumerOffsetChecker
>> > > > Any help?
>> > >
>> > >
>>
>>



Re: Got conflicted ephemeral node exception for several hours

2015-07-12 Thread Jiangjie Qin
Hi Tao,

We see this error from time to time but did not think of this as a big
issue. Any reason it bothers you much?
I¹m not sure if throwing exception to user on this exception is a good
handling or not. What are user supposed to do in that case other than
retry?

Thanks,

Jiangjie (Becket) Qin

On 7/12/15, 7:16 PM, "tao xiao"  wrote:

>We saw the error again in our cluster.  Anyone has the same issue before?
>
>On Fri, 10 Jul 2015 at 13:26 tao xiao  wrote:
>
>> Bump the thread. Any help would be appreciated.
>>
>> On Wed, 8 Jul 2015 at 20:09 tao xiao  wrote:
>>
>>> Additional info
>>> Kafka version: 0.8.2.1
>>> zookeeper: 3.4.6
>>>
>>> On Wed, 8 Jul 2015 at 20:07 tao xiao  wrote:
>>>
 Hi team,

 I have 10 high level consumers connecting to Kafka and one of them
kept
 complaining "conflicted ephemeral node" for about 8 hours. The log was
 filled with below exception

 [2015-07-07 14:03:51,615] INFO conflict in
 /consumers/group/ids/test-1435856975563-9a9fdc6c data:
 
{"version":1,"subscription":{"test.*":1},"pattern":"white_list","timest
amp":"1436275631510"}
 stored data:
 
{"version":1,"subscription":{"test.*":1},"pattern":"white_list","timest
amp":"1436275558570"}
 (kafka.utils.ZkUtils$)
 [2015-07-07 14:03:51,616] INFO I wrote this conflicted ephemeral node
 
[{"version":1,"subscription":{"test.*":1},"pattern":"white_list","times
tamp":"1436275631510"}]
 at /consumers/group/ids/test-1435856975563-9a9fdc6c a while back in a
 different session, hence I will backoff for this node to be deleted by
 Zookeeper and retry (kafka.utils.ZkUtils$)

 In the meantime zookeeper reported below exception for the same time
span

 2015-07-07 22:45:09,687 [myid:3] - INFO  [ProcessThread(sid:3
 cport:-1)::PrepRequestProcessor@645] - Got user-level KeeperException
 when processing sessionid:0x44e657ff19c0019 type:create cxid:0x7a26
 zxid:0x3015f6e77 txntype:-1 reqpath:n/a Error
 Path:/consumers/group/ids/test-1435856975563-9a9fdc6c
Error:KeeperErrorCode
 = NodeExists for /consumers/group/ids/test-1435856975563-9a9fdc6c

 At the end zookeeper timed out the session and consumers triggered
 rebalance.

 I know that conflicted ephemeral node warning is to handle a zookeeper
 bug that session expiration and ephemeral node deletion are not done
 atomically but as indicated from zookeeper log the zookeeper never
got a
 chance to delete the ephemeral node which made me think that the
session
 was not expired at that time. And for some reason zookeeper fired
session
 expire event which subsequently invoked ZKSessionExpireListener.  I
was
 just wondering if anyone have ever encountered similar issue before
and
 what I can do at zookeeper side to prevent this?

 Another problem is that createEphemeralPathExpectConflictHandleZKBug
 call is wrapped in a while(true) loop which runs forever until the
 ephemeral node is created. Would it be better that we can employ an
 exponential retry policy with a max number of retries so that it has a
 chance to re-throw the exception back to caller and let caller handle
it in
 situation like above?





Re: New consumer API used in mirror maker

2015-07-12 Thread Jiangjie Qin
Yes, we are going to use new consumer after it is ready.


Jiangjie (Becket) Qin

On 7/12/15, 8:21 PM, "tao xiao"  wrote:

>Hi team,
>
>The trunk code of mirror maker now uses the old consumer API, Is there any
>plan to use new Java consumer api in mirror maker?



Re: Custom topic metadata

2015-07-12 Thread Jiangjie Qin
Hi Stevo,

Kafka does not support customized topic metadata. What we are doing now is
having topic metadata store in a separate zookeeper structure. But I agree
this is probably a good candidate feature to consider.

Thanks,

Jiangjie (Becket) Qin

On 7/12/15, 4:16 PM, "Stevo Slavić"  wrote:

>Hello Apache Kafka Community,
>
>Is it possible to store and retrieve additional custom topic metadata
>along
>with existing Kafka managed ones, using some Kafka API? If not would it be
>a problem (e.g. for Kafka broker or some client APIs) if I was to
>store/retrieve additional custom topic metadata using ZooKeeper API?
>
>Kind regards,
>Stevo Slavic.



Re: How to monitor Kafka offset

2015-07-09 Thread Jiangjie Qin
You can take a look at Burrow. We use it in LinkedIn.

Thanks,

Jiangjie (Becket) Qin

On 7/9/15, 8:53 PM, "Anandh Kumar"  wrote:

>Hi
>
>We have any monitor tool which monitor kafka offset?
>
>I need some opensource admin tool for kafka.
>
>Please guide me.
>
>Regards,
>-Anandh Kumar



Re: Kafka settings for (more) reliable/durable messaging

2015-07-07 Thread Jiangjie Qin
The replica lag definition now is time based, so as long as a replica can
catch up with leader in replica.lag.time.max.ms, it is in ISR, no matter
how many messages it is behind.

And yes, your understanding is correct - ACK is sent back either when all
replica in ISR got the message or the request timeout.

I had some related slides here might help a bit.
http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kaf
ka-49753844

Thanks,

Jiangjie (Becket) Qin

On 7/7/15, 9:28 AM, "Stevo Slavić"  wrote:

>Thanks for heads up and code reference!
>
>Traced back required offset to
>https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/serve
>r/ReplicaManager.scala#L303
>
>Have to investigate more, but from initial check was expecting to see
>there
>reference to "replica.lag.max.messages" (so even when replica is between 0
>and maxLagMessages behind to be considered on required offset to be
>considered as insync). Searching through trunk cannot find where in main
>code is "replica.lag.max.messages" configuration property used.
>
>Used search query
>https://github.com/apache/kafka/search?utf8=%E2%9C%93&q=%22replica.lag.max
>.messages%22&type=Code
>
>Maybe it's going to be removed in next release?!
>
>Time based lag is still there.
>
>Anyway, if I understood correctly, with request.required.acks=-1, when a
>message/batch is published, it's first written to lead, then other
>partition replicas either continuously poll and get in sync with lead, or
>through zookeeper get notified that they are behind and poll and get in
>sync with lead, and as soon as enough (min.insync.replicas - 1) replicas
>are detected to be fully in sync with lead, ACK is sent to producer
>(unless
>timeout occurs first).
>
>On Tue, Jul 7, 2015 at 5:15 PM, Gwen Shapira 
>wrote:
>
>> Ah, I think I see the confusion: Replicas don't actually ACK at all.
>> What happens is that the replica manager waits for enough ISR replicas
>> to reach the correct offset
>> Partition.checkEnoughReplicasReachOffset(...) has this logic. A
>> replica can't reach offset of second batch, without first having
>> written the first batch. So I believe we are safe in this scenario.
>>
>> Gwen
>>
>> On Tue, Jul 7, 2015 at 8:01 AM, Stevo Slavić  wrote:
>> > Hello Gwen,
>> >
>> > Thanks for fast response!
>> >
>> > Btw, congrats on officially becoming a Kafka committer and thanks,
>>among
>> > other things, for great "Intro to Kafka" video
>> > http://shop.oreilly.com/product/0636920038603.do !
>> >
>> > Have to read more docs and/or source. I thought this scenario is
>>possible
>> > because replica can fall behind (replica.lag.max.messages) and still
>>be
>> > considered ISR. Then I assumed also write can be ACKed by any ISR, and
>> then
>> > why not by one which has fallen more behind.
>> >
>> > Kind regards,
>> > Stevo Slavic.
>> >
>> > On Tue, Jul 7, 2015 at 4:47 PM, Gwen Shapira 
>> wrote:
>> >
>> >> I am not sure "different replica" can ACK the second back of messages
>> >> while not having the first - from what I can see, it will need to be
>> >> up-to-date on the latest messages (i.e. correct HWM) in order to ACK.
>> >>
>> >> On Tue, Jul 7, 2015 at 7:13 AM, Stevo Slavić 
>>wrote:
>> >> > Hello Apache Kafka community,
>> >> >
>> >> > Documentation for min.insync.replicas in
>> >> > http://kafka.apache.org/documentation.html#brokerconfigs states:
>> >> >
>> >> > "When used together, min.insync.replicas and request.required.acks
>> allow
>> >> > you to enforce greater durability guarantees. A typical scenario
>> would be
>> >> > to create a topic with a replication factor of 3, set
>> min.insync.replicas
>> >> > to 2, and produce with request.required.acks of -1. This will
>>ensure
>> that
>> >> > the producer raises an exception if a majority of replicas do not
>> >> receive a
>> >> > write."
>> >> >
>> >> > Correct me if wrong (doc reference?), I assume min.insync.replicas
>> >> includes
>> >> > lead, so with min.insync.replicas=2, lead and one more replica
>>besides
>> >> lead
>> >> > will have to ACK writes.
>> >> >
>> >> > In such setup, with minimalistic 3 brokers cluster, given that
>> >> > - all 3 replicas are insync
>> >> > - a batch of messages is written and ends up on lead and one
>>replica
>> ACKs
>> >> > - another batch of messages ends up on lead and different replica
>>ACKs
>> >> >
>> >> > Is it possible that when lead crashes, while replicas didn't catch
>>up,
>> >> > (part of) one batch of messages could be lost (since one replica
>> becomes
>> >> a
>> >> > new lead, and it's only serving all reads and requests, and
>> replication
>> >> is
>> >> > one way)?
>> >> >
>> >> > Kind regards,
>> >> > Stevo Slavic.
>> >>
>>



Re: Origin of product name Kafka

2015-07-06 Thread Jiangjie Qin
:), Maybe we should put this in to Kafka FAQ?



On 7/6/15, 7:25 PM, "Gwen Shapira"  wrote:

>Nice :)
>
>I always thought its a reference to the Kafkaesque process of building
>data pipelines in a large organization :)
>
>On Mon, Jul 6, 2015 at 6:52 PM, luo.fucong  wrote:
>> I just found the answer in Quora:
>>
>> 
>>http://www.quora.com/What-is-the-relation-between-Kafka-the-writer-and-Ap
>>ache-Kafka-the-distributed-messaging-system
>>
>>> 在 2015年7月6日,下午8:30,jakob.vollenwei...@bkw.ch 写道:
>>>
>>> Hi Admin,
>>>
>>> I'm just wondering from where the product name Kafka originated,
>>>particularly since the general groove in Kafka's famous novels is often
>>>marked by a senseless, disorienting, often menacing complexity.
>>>
>>> Thanks,
>>> Jakob
>>>
>>>
>>



Re: How to achieve Failover or HighAvailable in SimpleConsumer?

2015-07-06 Thread Jiangjie Qin
I guess it is called SimpleConsumer for a reason. Simple consumer is
really simple and does not support any failure recovery. You might need to
implement you own logic, it is probably not trivial though. As a
reference, high level consumer uses Zookeeper ephemeral path to monitor
the liveliness of consumers.
Also, you might want to take a look at KafkaConsumer in the latest trunk,
it is designed to replace most simple consume use cases. If you are going
to implement your own auto recovery logic, maybe it is better to use
KafkaConsumer instead of SimpleConsumer.

Thanks,

Jiangjie (Becket) Qin

On 7/5/15, 7:37 PM, "luo.fucong"  wrote:

>Hi all:
>
>The failover or re-balancing support seems only exist in High Level
>Consumer. But we have some special considerations that we have to go with
>the SimpleConsumer. I googled the problem but there are no answers.
>
>When the SimpleConsumer went down(due to some hardware errors or others
>unexpected), what can I do to resume the consuming **automatically**?
>
>



Re: Questions re: auto-commit and camel-kafka

2015-07-04 Thread Jiangjie Qin
If you disabled auto offset commit. Offset won’t be committed unless you did it 
manually.
So in your shutdown case, if there is some exception you should not commit 
offsets. Similarly if there is something outbound is still inflight, you need 
to wait rather than commit offsets for them.

For the initial message loss. Kafka actually create topic asynchronously which 
means topic might have not been created yet when you started producing (We know 
it is not ideal). I would try to increase the retry back off time or increase 
retries to see if it resolve the issue.

Jiangjie (Becket) Qin

From: "Michael J. Kitchin" mailto:mcoyote...@gmail.com>>
Reply-To: "mcoy...@mcoyote.com<mailto:mcoy...@mcoyote.com>" 
mailto:mcoy...@mcoyote.com>>
Date: Friday, July 3, 2015 at 3:01 PM
To: Jiangjie Qin mailto:j...@linkedin.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Questions re: auto-commit and camel-kafka

Hi there,

Thanks for getting back to me so quickly.

Per auto-commit:
--
That's as I thought. Here is the setup, so far:
- Auto-commit off
- Same/fixed consumer group
- Auto offset reset set to "largest"

When the camel route is shut down in an orderly fashion the consumer and 
pending operation appears to clean up and commit the results, even though 
exchanges are in-flight.

I walked through this in the debugger, FWIW. If interested, you can see the 
code, here:
- 
https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java

...Specifically:
- Processing is underway line 148: processor.process(exchange);
- Camel triggers a shutdown and this returns, as normal without any exchange 
exception check
- The barrier await hits at line 165: 
berrier.await(endpoint.getBarrierAwaitTimeoutMs(), TimeUnit.MILLISECONDS);
- This aligns with the rest of the streams and triggers the barrier action, 
which in turn performs the commit at line 193: consumer.commitOffsets();

Since any exception from line 148 is suppressed and there's no subsequent 
interrupted() or exchange exception check it looks like there's no way to to 
signal not to commit and in-flight exchanges are a guaranteed loss.

Does this sound correct?

If so, barring a change from the maintainers I figure I might fork this code 
and (optionally) bypass the consumer.commitOffsets(); during shutdown.Thoughts?

Per the lost initial message:

The consumer is started first and uses a consistent/fixed group id, as 
mentioned. I did notice suppressed exception when I walked through in the 
debugger, however  "failed to send after 3 tries".

This may be a simpler nut to crack however -- the observed behavior isn't 
consistent and doesn't occur on camel route restart (i.e., when not restarting 
the process), suggesting a race condition. I'm using an embedded broker, so 
it's realistic it may not be completely started when this happens. I'll 
configure the producer to retry again with longer delays and/or delay route 
startup to see if that helps.





Please let me know if I may provide any additional information. Thanks.

-Regards,
MjK

On Fri, Jul 3, 2015 at 1:24 AM, Jiangjie Qin 
mailto:j...@linkedin.com>> wrote:
Hi Michael,

For the consumer side question. Yes, turning off auto offset commit is
what you want. But you should also commit offset manually after you have
written the processed data to somewhere else. Also, the offset is only
associated with a particular consumer group. So if you restart your
consumer with a different consumer group, by default it will consume from
the log end of the partitions. And this might also the reason why you see
the first message ³eaten² by producer - if you start producer before
starting consumer and the consumer uses a new group id, it will consume
from log end, which might miss the messages that are already produced.

Can you maybe clarify what did you do with the above?

Thanks,

Jiangjie (Becket) Qin

On 7/2/15, 9:47 PM, "Michael J. Kitchin" 
mailto:mcoyote...@gmail.com>> wrote:

>Hi there,
>
>These are questions re: the official camel-kafka integration. Since the
>issues touch on both camel and kafka I'm sending to both users lists in
>search of answers.
>
>- - - -
>
>I have a simple, inonly, point-to-point, synchronous camel route (a)
>consuming from kafka topic (consumer), (b) running the resulting exchanges
>(messages) through a processor chain and (c) forwarding the outcome on to
>somewhere else.
>
>If the runtime dies while exchanges are in this pipeline, I want things to
>pick up where they left off when restarted. I'm not picky about seeing the
>same data more than once (as long as it's recent), I just can't lose

Re: Questions re: auto-commit and camel-kafka

2015-07-03 Thread Jiangjie Qin
Hi Michael,

For the consumer side question. Yes, turning off auto offset commit is
what you want. But you should also commit offset manually after you have
written the processed data to somewhere else. Also, the offset is only
associated with a particular consumer group. So if you restart your
consumer with a different consumer group, by default it will consume from
the log end of the partitions. And this might also the reason why you see
the first message ³eaten² by producer - if you start producer before
starting consumer and the consumer uses a new group id, it will consume
from log end, which might miss the messages that are already produced.

Can you maybe clarify what did you do with the above?

Thanks,

Jiangjie (Becket) Qin

On 7/2/15, 9:47 PM, "Michael J. Kitchin"  wrote:

>Hi there,
>
>These are questions re: the official camel-kafka integration. Since the
>issues touch on both camel and kafka I'm sending to both users lists in
>search of answers.
>
>- - - -
>
>I have a simple, inonly, point-to-point, synchronous camel route (a)
>consuming from kafka topic (consumer), (b) running the resulting exchanges
>(messages) through a processor chain and (c) forwarding the outcome on to
>somewhere else.
>
>If the runtime dies while exchanges are in this pipeline, I want things to
>pick up where they left off when restarted. I'm not picky about seeing the
>same data more than once (as long as it's recent), I just can't lose
>anything.
>
>In brief, everything's working great except this failure/recovery part --
>in-flight exchanges are getting lost and there is no, apparent re-delivery
>on restart. My reading of the JMX data suggests the kafka logs are intact.
>
>I think this has to do with consumer auto-commit, which is the default
>behavior. My reading of the kafka and camel-kafka docs suggests disabling
>auto-commit will give me what I want, but when I try it I'm not seeing
>re-delivery kick off when I restart.
>
>So, first question:
>(1) Is auto-commit off the key to getting what I want and/or what else
>might I need to do?
>
>- - - - -
>
>Meanwhile, on the producer side I'm seeing the first (and only the first)
>message apparently get eaten. It's possible it's being buffered, but it
>never seems to timeout. There are no error messages on startup and the
>camel context, routes, etc. appear to have started successfully. The
>second
>message and everything that follows is golden.
>
>The payloads are ~70-character byte arrays, if it makes a difference.
>
>Second question, then:
>(2) Is there a batching setting or something else I might be overlooking
>behind this behavior?
>
>- - - - -
>
>Thanks, in advance for your time and consideration. We've been impressed
>with kafka so far and are looking forward to employing it in production.
>
>
>Please let me know if I may provide any additional information. Thanks.
>
>
>-Regards,
>
>MjK
>
>
>- - - - -
>
>
>
>*Michael J. Kitchin*
>
>Senior Software Engineer
>
>Operational Systems, Inc.
>
>4450 Arapahoe Avenue, Suite 100
>
>Boulder, CO 80303
>
>
>Phone: 719-271-6476
>
>Email: michael.kitc...@opsysinc.com
>
>Web: www.opsysinc.com



Re: EOL JDK 1.6 for Kafka

2015-07-01 Thread Jiangjie Qin
+1

On 7/1/15, 1:00 PM, "Gwen Shapira"  wrote:

>Huge +1.
>
>I don't think there is any other project that still supports 1.6.
>
>On Wed, Jul 1, 2015 at 8:05 AM, Harsha  wrote:
>> Hi,
>> During our SSL Patch KAFKA-1690. Some of the reviewers/users
>> asked for support this config
>> 
>>https://docs.oracle.com/javase/8/docs/api/javax/net/ssl/SSLParameters.htm
>>l#setEndpointIdentificationAlgorithm-java.lang.String-
>> It allows clients to verify the server and prevent potential MITM. This
>> api doesn't exist in Java 1.6.
>> Are there any users still want 1.6 support or can we stop supporting 1.6
>> from next release on wards.
>>
>> Thanks,
>> Harsha



Re: querying messages based on timestamps

2015-06-30 Thread Jiangjie Qin
Yes, we have plan to add better support for this.

Thanks,

Jiangjie (Becket) Qin

On 6/30/15, 4:23 PM, "Zaiming Shi"  wrote:

>Hi Jiangjie !
>Does the word 'Currently' imply any plan in introducing timestamp in log
>entries?
>
>Regards
>/Zaiming
>On Jun 30, 2015 11:08 PM, "Jiangjie Qin" 
>wrote:
>
>> Currently Kafka only have a very coarse solution to find offset by time,
>> which is based on the segment last modified time.
>> This interface is only available in simple consumer. You may issue an
>> OffsetRequest to specify a timestamp. The offset returned will be the
>> first offset of segments whose last modification time is earlier than
>>the
>> timestamp you provided.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/30/15, 1:16 PM, "Adam Dubiel"  wrote:
>>
>> >We faced similar problem and ended up with implementing variant of
>>golden
>> >section search, that reads message using simple consumer and checks the
>> >timestamp (timestamps are appended by our producer though, they do not
>> >come
>> >from any Kafka metadata) till it finds message closest to given date.
>> >
>> >Adam
>> >
>> >2015-06-30 21:52 GMT+02:00 Shushant Arora :
>> >
>> >> Is it possible using low level consumer to get kafka messages based
>>on
>> >> timestamp, say I want to get all messages from last 5 minutes. I
>>don't
>> >>know
>> >> what were offsets of partitions 5 minutes back.
>> >>
>> >> In low level consumer : when I gave epoch for  whichTime , it failed.
>> >>  requestInfo.put(topicAndPartition, new
>> >> PartitionOffsetRequestInfo(whichTime,
>> >> 1));
>> >>
>> >> Is only latest and earliest supported in timestamp,Is there any way
>>to
>> >> filter messages based on timestamp?
>> >>
>> >> Thanks
>> >> Shushant
>> >>
>>
>>



Re: querying messages based on timestamps

2015-06-30 Thread Jiangjie Qin
Currently Kafka only have a very coarse solution to find offset by time,
which is based on the segment last modified time.
This interface is only available in simple consumer. You may issue an
OffsetRequest to specify a timestamp. The offset returned will be the
first offset of segments whose last modification time is earlier than the
timestamp you provided.

Thanks,

Jiangjie (Becket) Qin

On 6/30/15, 1:16 PM, "Adam Dubiel"  wrote:

>We faced similar problem and ended up with implementing variant of golden
>section search, that reads message using simple consumer and checks the
>timestamp (timestamps are appended by our producer though, they do not
>come
>from any Kafka metadata) till it finds message closest to given date.
>
>Adam
>
>2015-06-30 21:52 GMT+02:00 Shushant Arora :
>
>> Is it possible using low level consumer to get kafka messages based on
>> timestamp, say I want to get all messages from last 5 minutes. I don't
>>know
>> what were offsets of partitions 5 minutes back.
>>
>> In low level consumer : when I gave epoch for  whichTime , it failed.
>>  requestInfo.put(topicAndPartition, new
>> PartitionOffsetRequestInfo(whichTime,
>> 1));
>>
>> Is only latest and earliest supported in timestamp,Is there any way to
>> filter messages based on timestamp?
>>
>> Thanks
>> Shushant
>>



Re: At-least-once guarantees with high-level consumer

2015-06-21 Thread Jiangjie Qin
Yes, your approach works. I am not sure if we should take this as default
solution, though. User can have a simple wrapper + customized rebalance
listener. The tricky part is that the rebalance listener might need
different implementations. So it looks the current API provides enough
simplicity and enough flexibility.

For the new consumer, if there is only one user thread, this might not be
a issue. If the consumer is shared by multiple threads (which is not
recommended), similar principle applies - commit offsets only after
processing them.

Thanks,

Jiangjie (Becket) Qin

On 6/21/15, 10:50 PM, "Carl Heymann"  wrote:

>Thanks Jiangjie
>
>So you agree that with the modified ConsumerIterator.next() code, the high
>level consumer becomes at-least-once, even with auto-commit enabled? That
>is what I really want to know.
>
>I'll have a look at the rebalancing code. I think I understand: during
>rebalancing, with auto-commit enabled, the offsets are committed
>in ZookeeperConsumerConnector.closeFetchersForQueues(..). Some processing
>might still be happening at this point. The rebalance listener is called
>only after this commit. So the current code (without my change) would lead
>to fewer duplicate messages, because it assumes that these transactions
>normally complete. This seems prudent, since rebalancing happens much more
>frequently than java processes being killed unexpectedly. On the other
>hand
>it means giving up at-least-once guarantees for message processing, when a
>java process actually does die unexpectedly.
>
>So I see it should be better to create a custom offset tracking&commit
>component, with some ability to wait a reasonable amount of time for
>consumer threads on streams to complete their current transaction, on
>rebalance, before committing from a rebalance listener.
>
>Is it OK to block for a second or two
>in consumerRebalanceListener.beforeReleasingPartitions(..), to wait for
>processing threads to complete? Will this hold up the whole cluster's
>rebalancing?
>
>The new KafkaConsumer code doesn't appear to do a commit in the same way
>during rebalance, when autocommit is enabled. So if current users of the
>high level consumer switch to the new consumer, they might get more
>duplicates on rebalance, right?
>
>Regards
>Carl
>
>
>On Sun, Jun 21, 2015 at 1:43 AM, Jiangjie Qin 
>wrote:
>
>> Hi Carl,
>>
>> Generally, you approach works to guarantee at least once consumption -
>> basically people have to commit offset only after they have processed
>>the
>> message.
>> The only problem is that in old high level consumer, during consumer
>> rebalance consumer will (and should) commit offsets. To guarantee
>> at-least-once and avoid unecessary duplicates on rebalance, ideally we
>> should wait until all the messages returned by iterator to be processed
>> before commit offsets.
>>
>> In LinkedIn, we have wrapper around open source consumer iterator where
>>we
>> can implants those logics.
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/19/15, 12:22 AM, "Carl Heymann"  wrote:
>>
>> >Thanks Bhavesh.
>> >
>> >I understand that to get "exactly once" processing of a message
>>requires
>> >some de-duplication. What I'm saying, is that the current high level
>> >consumer, with automatic offset commits enabled, gives neither "at most
>> >once" nor "at least once" guarantees: A consumer group might get
>>duplicate
>> >messages, but might also never fully process some messages (which is a
>> >bigger problem for me).
>> >
>> >With the code change I propose, I think it changes to "at least once",
>> >i.e.
>> >one can then do the deduplication you describe, without worrying about
>> >"losing" messages. Messages should not get committed without being
>>fully
>> >processed. I want to know if this code change has any obvious problems.
>> >
>> >Regards
>> >Carl
>> >
>> >
>> >On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
>> >> >> wrote:
>> >
>> >> HI Carl,
>> >>
>> >> Produce side retry can produce duplicated message being sent to
>>brokers
>> >> with different offset with same message. Also, you may get duplicated
>> >>when
>> >> the High Level Consumer offset is not being saved or commit but you
>>have
>> >> processed data and your server restart etc...
>> >>
>> >>
>> >>
>> >> To guaranteed at-least one processing across p

Re: OutOfMemoryError in mirror maker

2015-06-21 Thread Jiangjie Qin
Should we still store the value bytes when logAsString is set to TRUE and
only store the length when logAsString is set to FALSE.

On 6/21/15, 7:29 PM, "tao xiao"  wrote:

>The patch I submitted did the what you suggested. It store the size only
>and print it out when error occurs.
>
>On Mon, Jun 22, 2015 at 5:26 AM Jiangjie Qin 
>wrote:
>
>> Yes, we can expose a user callback in MM, just like we did for rebalance
>> listener.
>> I still think ErrorLoggingCallback needs some change, though. Can we
>>only
>> store the value bytes when logAsString is set to true? That looks more
>> reasonable to me.
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/21/15, 3:02 AM, "tao xiao"  wrote:
>>
>> >Yes, I agree with that. It is even better if we can supply our own
>> >callback. For people who want to view the content of message when
>>failure
>> >they still can do so
>> >
>> >On Sun, Jun 21, 2015 at 2:20 PM Guozhang Wang 
>>wrote:
>> >
>> >> Hi Tao / Jiangjie,
>> >>
>> >> I think a better fix here may be not letting
>> >>MirrorMakerProducerCallback to
>> >> extend from ErrorLoggingCallback, but rather change the
>> >> ErrorLoggingCallback itself as it defeats the usage of logAsString,
>> >>which I
>> >> think is useful for a general error logging purposes. Rather we can
>> >> let MirrorMakerProducerCallback
>> >> to not take the value bytes itself but just the length if people
>>agree
>> >>that
>> >> for MM we probably are not interested in its message value in
>>callback.
>> >> Thoughts?
>> >>
>> >> Guozhang
>> >>
>> >> On Wed, Jun 17, 2015 at 1:06 AM, tao xiao 
>>wrote:
>> >>
>> >> > Thank you for the reply.
>> >> >
>> >> > Patch submitted https://issues.apache.org/jira/browse/KAFKA-2281
>> >> >
>> >> > On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin
>>
>> >> > wrote:
>> >> >
>> >> > > Hi Tao,
>> >> > >
>> >> > > Yes, the issue that ErrorLoggingCallback keeps value as local
>> >>variable
>> >> is
>> >> > > known for a while and we probably should fix it as the value is
>>not
>> >> used
>> >> > > except logging the its size. Can you open a ticket and maybe also
>> >> submit
>> >> > a
>> >> > > patch?
>> >> > >
>> >> > > For unreachable objects I don¹t think it is memory leak. As you
>> >>said,
>> >> GC
>> >> > > should take care of this. In LinkedIn we are using G1GC with some
>> >> tunings
>> >> > > made by our SRE. You can try that if interested.
>> >> > >
>> >> > > Thanks,
>> >> > >
>> >> > > Jiangjie (Becket) Qin
>> >> > >
>> >> > > On 6/13/15, 11:39 AM, "tao xiao"  wrote:
>> >> > >
>> >> > > >Hi,
>> >> > > >
>> >> > > >I am using mirror maker in trunk to replica data across two data
>> >> > centers.
>> >> > > >While the destination broker was having busy load and
>>unresponsive
>> >>the
>> >> > > >send
>> >> > > >rate of mirror maker was very low and the available producer
>>buffer
>> >> was
>> >> > > >quickly filled up. At the end mirror maker threw OOME. Detailed
>> >> > exception
>> >> > > >can be found here
>> >> > > >
>> >> > >
>> >> >
>> >>
>> >>
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-excepti
>> >>o
>> >> > > >n-L1
>> >> > > >
>> >> > > >I started up mirror maker with 1G memory and 256M producer
>>buffer.
>> >>I
>> >> > used
>> >> > > >eclipse MAT to analyze the heap dump and found out the retained
>> >>heap
>> >> > size
>> >> > > >of all RecordBatch objects were more than 500MB half of which
>>were
>> >> used
>> >> > to
>> >> > > >retain data that were to send to destination broker which makes
>> >>sense
>&g

Re: Manual Offset Commits with High Level Consumer skipping messages

2015-06-21 Thread Jiangjie Qin
Hmm, it might be a little bit difficult to tell what happened without
looking at your test code.
Can you try with the settings I mentioned? We can try to use only two
threads here:
1. Producer thread: produce some messages -> stop/start Kafka server ->
produce some more messages.
2. Consumer thread: loop on consume -> process -> commit offset every N
messages. So we can make sure there is no weird race condition.

Thanks,

Jiangjie (Becket) Qin

On 6/21/15, 6:23 AM, "noah"  wrote:

>On Sun, Jun 21, 2015 at 1:10 AM Jiangjie Qin 
>wrote:
>
>> Hey Noah,
>>
>> Carl is right about the offset. The offset to be commit should be the
>> largest-consumed-offset + 1. But this should not break the at least once
>> guarantee.
>> From what I can see, your consumer should not skip messages. Just to
>>make
>> sure I understand your test correctly,
>> 1. There is a consumer running from the start until the end of the test,
>> consuming from one partition. The consumer is committing offsets to
>>Kafka
>> instead of to zookeeper.
>> 2. There is only one Kafka server. The server has one topic with one
>> partition of replication factor equals to 1.
>>
>
>Yes.
>I've tried 2 partitions and 2 consumers as well with slightly different
>results. After the Kafka server shutdown & restart one of the
>consumers/partitions gets a few more messages (including some produced
>after the failure,) but then often stops receiving new messages (in one
>run, one partition fully recovered, but the other did not.) The other
>partition consistently doesn't get anything.
>
>
>> 3. There is a producer producing some messages to the Kafka server, then
>> restarts Kafka server, then produces some more messages.
>> 4. Your consumer test code has a loop to receive messages. After
>>receiving
>> a message, your test code record the message, then commit offset.
>> Is it what your test were doing?
>>
>
>More or less. The offset commit code runs on a separate thread so it wont
>block consumption and with some rate limiting, so it ends up committing
>every 5th offset or so. It's never committing an offset that has not been
>processed though.
>
>
>
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/18/15, 2:21 PM, "noah"  wrote:
>>
>> >We are in a situation where we need at least once delivery. We have a
>> >thread that pulls messages off the consumer, puts them in a queue where
>> >they go through a few async steps, and then after the final step, we
>>want
>> >to commit the offset to the messages we have completed. There may be
>>items
>> >we have not completed still being processed, so
>> >consumerConnector.commitOffsets() isn't an option for us.
>> >
>> >We are manually committing offsets to Kafka (0.8.2.1) (auto commit is
>> >off.)
>> >
>> >We have a simple test case that is supposed to verify that we don't
>>lose
>> >any messages if the Kafka server is shut down:
>> >
>> >// there are 25 messages, we send a few now and a few after the server
>> >comes back up
>> >for (TestMessageClass mess : messages.subList(0, mid)) {
>> >producer.send(mess);
>> >}
>> >
>> >stopKafka(); // in memory KafkaServer
>> >startKafka();
>> >
>> >for (TestMessageClass mess : messages.subList(mid, total)) {
>> >producer.send(mess);
>> >}
>> >
>> >int tries = 0;
>> >while(testConsumer.received.size() < total && tries++ < 10) {
>> >Thread.sleep(200);
>> >}
>> >assertEquals(keys(testConsumer.received),
>> >keys(ImmutableSet.copyOf(messages)));
>> >
>> >The test consumer is very simple:
>> >
>> >ConsumerIterator iterator;
>> >while(iterator.hasNext()) {
>> >process(iterator.next());
>> >}
>> >
>> >// end of process:
>> >   commit(messageAndMetadata.offset());
>> >
>> >commit is basically the commit code from this page:
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching
>>+
>> >consumer+offsets+in+Kafka,
>> >but runs the commit in a separate thread so it wont interfere with the
>> >consumer.
>> >
>> >Here is the strange thing: If we do not commit, the test passes every
>> >time.
>> >Kafka comes back up and the high level consumer picks up right where it
>> >left off. But if we do commit, it does not recover, or we lose
>>messages.
>> >With 1 partition, we only get some prefix of the messages produced
>>before
>> >stopKafka(). With 2, one of the partitions never gets any of the
>>messages
>> >sent in the second half, while the other gets a prefix, but not all of
>>the
>> >messages for that partition.
>> >
>> >It seems like the most likely thing is that we are committing the wrong
>> >offsets, but I cannot figure out how that is happening. Does the
>>offset in
>> >MessageAndMetadata not correspond to the offset in OffsetAndMetadata?
>> >
>> >Or do we have to abandon the high level consumer entirely if we want to
>> >manually commit in this way?
>>
>>



Re: OutOfMemoryError in mirror maker

2015-06-21 Thread Jiangjie Qin
Yes, we can expose a user callback in MM, just like we did for rebalance
listener.
I still think ErrorLoggingCallback needs some change, though. Can we only
store the value bytes when logAsString is set to true? That looks more
reasonable to me.

Jiangjie (Becket) Qin

On 6/21/15, 3:02 AM, "tao xiao"  wrote:

>Yes, I agree with that. It is even better if we can supply our own
>callback. For people who want to view the content of message when failure
>they still can do so
>
>On Sun, Jun 21, 2015 at 2:20 PM Guozhang Wang  wrote:
>
>> Hi Tao / Jiangjie,
>>
>> I think a better fix here may be not letting
>>MirrorMakerProducerCallback to
>> extend from ErrorLoggingCallback, but rather change the
>> ErrorLoggingCallback itself as it defeats the usage of logAsString,
>>which I
>> think is useful for a general error logging purposes. Rather we can
>> let MirrorMakerProducerCallback
>> to not take the value bytes itself but just the length if people agree
>>that
>> for MM we probably are not interested in its message value in callback.
>> Thoughts?
>>
>> Guozhang
>>
>> On Wed, Jun 17, 2015 at 1:06 AM, tao xiao  wrote:
>>
>> > Thank you for the reply.
>> >
>> > Patch submitted https://issues.apache.org/jira/browse/KAFKA-2281
>> >
>> > On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin 
>> > wrote:
>> >
>> > > Hi Tao,
>> > >
>> > > Yes, the issue that ErrorLoggingCallback keeps value as local
>>variable
>> is
>> > > known for a while and we probably should fix it as the value is not
>> used
>> > > except logging the its size. Can you open a ticket and maybe also
>> submit
>> > a
>> > > patch?
>> > >
>> > > For unreachable objects I don¹t think it is memory leak. As you
>>said,
>> GC
>> > > should take care of this. In LinkedIn we are using G1GC with some
>> tunings
>> > > made by our SRE. You can try that if interested.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On 6/13/15, 11:39 AM, "tao xiao"  wrote:
>> > >
>> > > >Hi,
>> > > >
>> > > >I am using mirror maker in trunk to replica data across two data
>> > centers.
>> > > >While the destination broker was having busy load and unresponsive
>>the
>> > > >send
>> > > >rate of mirror maker was very low and the available producer buffer
>> was
>> > > >quickly filled up. At the end mirror maker threw OOME. Detailed
>> > exception
>> > > >can be found here
>> > > >
>> > >
>> >
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-excepti
>>o
>> > > >n-L1
>> > > >
>> > > >I started up mirror maker with 1G memory and 256M producer buffer.
>>I
>> > used
>> > > >eclipse MAT to analyze the heap dump and found out the retained
>>heap
>> > size
>> > > >of all RecordBatch objects were more than 500MB half of which were
>> used
>> > to
>> > > >retain data that were to send to destination broker which makes
>>sense
>> to
>> > > >me
>> > > >as it is close to 256MB producer buffer but the other half of which
>> were
>> > > >used by kafka.tools.MirrorMaker$MirrorMakerProducerCallback. As
>>every
>> > > >producer callback in mirror maker takes the message value and hold
>>it
>> > > >until
>> > > >the message is successfully delivered. In my case since the
>> destination
>> > > >broker was very unresponsive the message value held by callback
>>would
>> > stay
>> > > >forever which I think is a waste and it is a major contributor to
>>the
>> > OOME
>> > > >issue. screenshot of MAT
>> > > >
>> > >
>> >
>> 
>>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensh
>>o
>> > > >t-png
>> > > >
>> > > >The other interesting problem I observed is that when I turned on
>> > > >unreachable object parsing in MAT more than 400MB memory was
>>occupied
>> by
>> > > >unreachable objects. It surprised me that gc didn't clean them up
>> before
>> > > >OOME was thrown. As suggested in 

Re: Manual Offset Commits with High Level Consumer skipping messages

2015-06-20 Thread Jiangjie Qin
Hey Noah,

Carl is right about the offset. The offset to be commit should be the
largest-consumed-offset + 1. But this should not break the at least once
guarantee.
>From what I can see, your consumer should not skip messages. Just to make
sure I understand your test correctly,
1. There is a consumer running from the start until the end of the test,
consuming from one partition. The consumer is committing offsets to Kafka
instead of to zookeeper.
2. There is only one Kafka server. The server has one topic with one
partition of replication factor equals to 1.
3. There is a producer producing some messages to the Kafka server, then
restarts Kafka server, then produces some more messages.
4. Your consumer test code has a loop to receive messages. After receiving
a message, your test code record the message, then commit offset.
Is it what your test were doing?

Jiangjie (Becket) Qin

On 6/18/15, 2:21 PM, "noah"  wrote:

>We are in a situation where we need at least once delivery. We have a
>thread that pulls messages off the consumer, puts them in a queue where
>they go through a few async steps, and then after the final step, we want
>to commit the offset to the messages we have completed. There may be items
>we have not completed still being processed, so
>consumerConnector.commitOffsets() isn't an option for us.
>
>We are manually committing offsets to Kafka (0.8.2.1) (auto commit is
>off.)
>
>We have a simple test case that is supposed to verify that we don't lose
>any messages if the Kafka server is shut down:
>
>// there are 25 messages, we send a few now and a few after the server
>comes back up
>for (TestMessageClass mess : messages.subList(0, mid)) {
>producer.send(mess);
>}
>
>stopKafka(); // in memory KafkaServer
>startKafka();
>
>for (TestMessageClass mess : messages.subList(mid, total)) {
>producer.send(mess);
>}
>
>int tries = 0;
>while(testConsumer.received.size() < total && tries++ < 10) {
>Thread.sleep(200);
>}
>assertEquals(keys(testConsumer.received),
>keys(ImmutableSet.copyOf(messages)));
>
>The test consumer is very simple:
>
>ConsumerIterator iterator;
>while(iterator.hasNext()) {
>process(iterator.next());
>}
>
>// end of process:
>   commit(messageAndMetadata.offset());
>
>commit is basically the commit code from this page:
>https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+
>consumer+offsets+in+Kafka,
>but runs the commit in a separate thread so it wont interfere with the
>consumer.
>
>Here is the strange thing: If we do not commit, the test passes every
>time.
>Kafka comes back up and the high level consumer picks up right where it
>left off. But if we do commit, it does not recover, or we lose messages.
>With 1 partition, we only get some prefix of the messages produced before
>stopKafka(). With 2, one of the partitions never gets any of the messages
>sent in the second half, while the other gets a prefix, but not all of the
>messages for that partition.
>
>It seems like the most likely thing is that we are committing the wrong
>offsets, but I cannot figure out how that is happening. Does the offset in
>MessageAndMetadata not correspond to the offset in OffsetAndMetadata?
>
>Or do we have to abandon the high level consumer entirely if we want to
>manually commit in this way?



Re: At-least-once guarantees with high-level consumer

2015-06-20 Thread Jiangjie Qin
Hi Carl,

Generally, you approach works to guarantee at least once consumption -
basically people have to commit offset only after they have processed the
message. 
The only problem is that in old high level consumer, during consumer
rebalance consumer will (and should) commit offsets. To guarantee
at-least-once and avoid unecessary duplicates on rebalance, ideally we
should wait until all the messages returned by iterator to be processed
before commit offsets.

In LinkedIn, we have wrapper around open source consumer iterator where we
can implants those logics.

Jiangjie (Becket) Qin

On 6/19/15, 12:22 AM, "Carl Heymann"  wrote:

>Thanks Bhavesh.
>
>I understand that to get "exactly once" processing of a message requires
>some de-duplication. What I'm saying, is that the current high level
>consumer, with automatic offset commits enabled, gives neither "at most
>once" nor "at least once" guarantees: A consumer group might get duplicate
>messages, but might also never fully process some messages (which is a
>bigger problem for me).
>
>With the code change I propose, I think it changes to "at least once",
>i.e.
>one can then do the deduplication you describe, without worrying about
>"losing" messages. Messages should not get committed without being fully
>processed. I want to know if this code change has any obvious problems.
>
>Regards
>Carl
>
>
>On Thu, Jun 18, 2015 at 11:19 PM, Bhavesh Mistry
>> wrote:
>
>> HI Carl,
>>
>> Produce side retry can produce duplicated message being sent to brokers
>> with different offset with same message. Also, you may get duplicated
>>when
>> the High Level Consumer offset is not being saved or commit but you have
>> processed data and your server restart etc...
>>
>>
>>
>> To guaranteed at-least one processing across partitions (and across
>> servers), you will need to store message hash or primary key into
>> distributed LRU cache (with eviction policy )  like Hazelcast
>>  and do dedupping across partitions.
>>
>>
>>
>> I hope this help !
>>
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>> On Wed, Jun 17, 2015 at 1:49 AM, yewton  wrote:
>>
>> > So Carl Heymann's ConsumerIterator.next hack approach is not
>>reasonable?
>> >
>> > 2015-06-17 08:12:50 + 上のメッセージ Stevo Slavić:
>> >
>> >  --047d7bfcf30ed09b460518b241db
>> >>
>> >> Content-Type: text/plain; charset=UTF-8
>> >>
>> >>
>> >>
>> >>
>> >> With auto-commit one can only have at-most-once delivery guarantee -
>> after
>> >>
>> >> commit but before message is delivered for processing, or even after
>>it
>> is
>> >>
>> >> delivered but before it is processed, things can fail, causing event
>>not
>> >> to
>> >>
>> >> be processed, which is basically same outcome as if it was not
>> delivered.
>> >>
>> >>
>> >>
>> >> On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann 
>> >> wrote:
>> >>
>> >>
>> >>
>> >> > Hi
>> >>
>> >> >
>> >>
>> >> > ** Disclaimer: I know there's a new consumer API on the way, this
>>mail
>> >> is
>> >>
>> >> > about the currently available API. I also apologise if the below
>>has
>> >>
>> >> > already been discussed previously. I did try to check previous
>> >> discussions
>> >>
>> >> > on ConsumerIterator **
>> >>
>> >> >
>> >>
>> >> > It seems to me that the high-level consumer would be able to
>>support
>> >>
>> >> > at-least-once messaging, even if one uses auto-commit, by changing
>> >>
>> >> > kafka.consumer.ConsumerIterator.next() to call
>> >>
>> >> > currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This
>> >> way, a
>> >>
>> >> > consumer thread for a KafkaStream could just loop:
>> >>
>> >> >
>> >>
>> >> > while (true) {
>> >>
>> >> > MyMessage message = iterator.next().message();
>> >>
>> >> > process(message);
>> >>
>> >> > }
>> >>
>> >> >
>> >>
>> >> > Each call to "iterator.next()" then updates the offset to commit to
>> the
>> >> end
>> >>
>> >> > of the message that was just processed. When offsets are committed
>>for
>> >> the
>> >>
>> >> > ConsumerConnector (either automatically or manually), the commit
>>will
>> >> not
>> >>
>> >> > include offsets of messages that haven't been fully processed.
>> >>
>> >> >
>> >>
>> >> > I've tested the following ConsumerIterator.next(), and it seems to
>> work
>> >> as
>> >>
>> >> > I expect:
>> >>
>> >> >
>> >>
>> >> >   override def next(): MessageAndMetadata[K, V] = {
>> >>
>> >> > // New code: reset consumer offset to the end of the previously
>> >>
>> >> > consumed message:
>> >>
>> >> > if (consumedOffset > -1L && currentTopicInfo != null) {
>> >>
>> >> > currentTopicInfo.resetConsumeOffset(consumedOffset)
>> >>
>> >> > val topic = currentTopicInfo.topic
>> >>
>> >> > trace("Setting %s consumed offset to %d".format(topic,
>> >>
>> >> > consumedOffset))
>> >>
>> >> > }
>> >>
>> >> >
>> >>
>> >> > // Old code, excluding reset:
>> >>
>> >> > val item = super.next()
>> >>
>> >> > if(consumedOffset < 0)
>> >>
>> >> >   throw new KafkaException("Offset returne

Re: Broker Fails to restart

2015-06-20 Thread Jiangjie Qin
It seems that your log.index.size.max.bytes was 1K and probably was too
small. This will cause your index file to reach its upper limit before
fully index the log segment.

Jiangjie (Becket) Qin

On 6/18/15, 4:52 PM, "Zakee"  wrote:

>Any ideas on why one of the brokers which was down for a day, fails to
>restart with exception as below? The 10-node cluster has been up and
>running fine for quite a few weeks.
>
>[2015-06-18 16:44:25,746] ERROR [app=broker] [main] There was an error in
>one of the threads during logs loading:
>java.lang.IllegalArgumentException: requirement failed: Attempt to append
>to a full index (size = 128). (kafka.log.LogManager)
>[2015-06-18 16:44:25,747] FATAL [app=broker] [main] [Kafka Server 13],
>Fatal error during KafkaServer startup. Prepare to shutdown
>(kafka.server.KafkaServer)
>java.lang.IllegalArgumentException: requirement failed: Attempt to append
>to a full index (size = 128).
>at scala.Predef$.require(Predef.scala:233)
>at 
>kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:198
>)
>at 
>kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>at 
>kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>at kafka.utils.Utils$.inLock(Utils.scala:535)
>at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>at kafka.log.LogSegment.recover(LogSegment.scala:187)
>at kafka.log.Log.recoverLog(Log.scala:205)
>at kafka.log.Log.loadSegments(Log.scala:177)
>at kafka.log.Log.(Log.scala:67)
>at 
>kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$7$$anon
>fun$apply$1.apply$mcV$sp(LogManager.scala:142)
>at kafka.utils.Utils$$anon$1.run(Utils.scala:54)
>at 
>java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>at 
>java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>at 
>java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.
>java:895)
>at 
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
>:918)
>at java.lang.Thread.run(Thread.java:662)
>
>
>Thanks
>Zakee
>
>
>
>
>Old School Yearbook Pics
>View Class Yearbooks Online Free. Search by School & Year. Look Now!
>http://thirdpartyoffers.netzero.net/TGL3231/558359b1bf13159b1361dst03vuc



Re: Kafka 0.8.3 - New Consumer - user implemented partition.assignment.strategies?

2015-06-18 Thread Jiangjie Qin
Hi Olof,

I am just wondering what is the benefit of rebalancing with minimal number
of reassignments here?

I am asking this because in new consumer, the rebalance actually is quite
cheap on the consumer side - just updating a topic partition set. That
means the actually rebalance time on consumer side is almost ignorable no
matter how many partitions are reassigned.

Is it because the consumer has some sort of sticking partition
requirements? If that is the case,  that seems need an immutable partition
assignment policy.

Just curious about the motivation behind this.

Thanks,

Jiangjie (Becket) Qin


On 6/11/15, 9:30 AM, "Johansson, Olof" 
wrote:

>Thanks Guozhang,
>
>I agree that it seems to be a common reassignment strategy that should be
>one of the pre-defined strategies. Do you have a Jira ticket for this
>specific case, and/or a Jira ticket to add user defined
>partitions.assignment.strategies that I can watch?
>
>/ Olof
>
>On 10/06/2015 14:35, "Guozhang Wang"  wrote:
>
>>Hi Olof,
>>
>>Yes we have plans to allow user defined partitions.assignment strategy to
>>pass in to the consumer coordinator; I am not sure if this feature will
>>not
>>be available in the first release of the new consumer in 0.8.3 though.
>>Currently users still have to choose one from the server-defined strategy
>>upon registering themselves.
>>
>>On the other hand, I think "rebalance with a minimal number of
>>reassignment" should be quite a common reassignment strategy, and I think
>>it is possible to just add it into the server-defined strategies list.
>>
>>Guozhang
>>
>>
>>On Wed, Jun 10, 2015 at 9:32 AM, Johansson, Olof <
>>olof.johans...@thingworx.com> wrote:
>>
>>> Is it possible to have a consumer rebalancing partition assignment
>>> strategy that will rebalance with a minimal number of reassignments?
>>>
>>> Per the "Kafka 0.9 Consumer Rewrite Design" it should be possible to
>>> define my own partition assignment strategy:
>>> "partition.assignment.strategies - may take a comma separated list of
>>> properties that map the strategy's friendly name to to the class that
>>> implements the strategy. This is used for any strategy implemented by
>>>the
>>> user and released to the Kafka cluster. By default, Kafka will include
>>>a
>>> set of strategies that can be used by the consumer."
>>>
>>> Is there a Jira ticket that tracks adding user defined
>>> partitions.assignment.strategies? In the latest source, range, and
>>> roundrobin are still the only possible values (hard-coded).
>>>
>>> I assume that any user implemented strategy would have to implement the
>>> PartitionAssignor trait. If so, by naively looking at the 0.8.3 source,
>>>a
>>> strategy that should do a minimal number of partition reassignments
>>>would
>>> need the ConsumerMetaData. That's not currently available in the
>>> PartitionAssignor contract - assign(topicsPerConsumer,
>>>partitionsPerTopic).
>>> Have there been any discussion to change the contract to pass
>>> ConsumerMetaData?
>>>
>>
>>
>>
>>-- 
>>-- Guozhang
>



Re: OutOfMemoryError in mirror maker

2015-06-14 Thread Jiangjie Qin
Hi Tao,

Yes, the issue that ErrorLoggingCallback keeps value as local variable is
known for a while and we probably should fix it as the value is not used
except logging the its size. Can you open a ticket and maybe also submit a
patch?

For unreachable objects I don¹t think it is memory leak. As you said, GC
should take care of this. In LinkedIn we are using G1GC with some tunings
made by our SRE. You can try that if interested.

Thanks,

Jiangjie (Becket) Qin

On 6/13/15, 11:39 AM, "tao xiao"  wrote:

>Hi,
>
>I am using mirror maker in trunk to replica data across two data centers.
>While the destination broker was having busy load and unresponsive the
>send
>rate of mirror maker was very low and the available producer buffer was
>quickly filled up. At the end mirror maker threw OOME. Detailed exception
>can be found here
>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-exceptio
>n-L1
>
>I started up mirror maker with 1G memory and 256M producer buffer. I used
>eclipse MAT to analyze the heap dump and found out the retained heap size
>of all RecordBatch objects were more than 500MB half of which were used to
>retain data that were to send to destination broker which makes sense to
>me
>as it is close to 256MB producer buffer but the other half of which were
>used by kafka.tools.MirrorMaker$MirrorMakerProducerCallback. As every
>producer callback in mirror maker takes the message value and hold it
>until
>the message is successfully delivered. In my case since the destination
>broker was very unresponsive the message value held by callback would stay
>forever which I think is a waste and it is a major contributor to the OOME
>issue. screenshot of MAT
>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensho
>t-png
>
>The other interesting problem I observed is that when I turned on
>unreachable object parsing in MAT more than 400MB memory was occupied by
>unreachable objects. It surprised me that gc didn't clean them up before
>OOME was thrown. As suggested in gc log
>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-gc-log-L
>1
>Full GC was unable to reclaim any memory and when facing OOME these
>unreachable objects should have been cleaned up. so either eclipse MAT has
>issue parsing the heap dump or there is hidden memory leak that is hard to
>find. I attached the sample screenshot of the unreachable objects here
>https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-unreachable-o
>bjects-png
>
>The consumer properties
>
>zookeeper.connect=zk
>zookeeper.connection.timeout.ms=100
>group.id=mm
>auto.offset.reset=smallest
>partition.assignment.strategy=roundrobin
>
>The producer properties
>
>bootstrap.servers=brokers
>client.id=mirror-producer
>producer.type=async
>compression.codec=none
>serializer.class=kafka.serializer.DefaultEncoder
>key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
>value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
>buffer.memory=268435456
>batch.size=1048576
>max.request.size=5242880
>send.buffer.bytes=1048576
>
>The java command to start mirror maker
>java -Xmx1024M -Xms512M -XX:+HeapDumpOnOutOfMemoryError
>-XX:HeapDumpPath=/home/kafka/slc-phx-mm-cg.hprof
>-XX:+PrintTenuringDistribution -XX:MaxTenuringThreshold=3 -server
>-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
>-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
>-Djava.awt.headless=true
>-Xloggc:/var/log/kafka/kafka-phx/cg/mirrormaker-gc.log -verbose:gc
>-XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps
>-XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10
>-XX:GCLogFileSize=10M -Dcom.sun.management.jmxremote
>-Dcom.sun.management.jmxremote.authenticate=false
>-Dcom.sun.management.jmxremote.ssl=false
>-Dkafka.logs.dir=/var/log/kafka/kafka-phx/cg
>-Dlog4j.configuration=file:/usr/share/kafka/bin/../config/tools-log4j.prop
>erties
>-cp libs/* kafka.tools.MirrorMaker --consumer.config
>consumer.properties --num.streams 10 --producer.config
>producer.properties --whitelist test.*



Re: Mirror maker doesn't rebalance after getting ZkNoNodeException

2015-06-09 Thread Jiangjie Qin
Which version of MM are you running?

On 6/9/15, 4:49 AM, "tao xiao"  wrote:

>Hi,
>
>I have two mirror makers A and B both subscripting to the same whitelist.
>During topic rebalancing one of the mirror maker A encountered
>ZkNoNodeException and then stopped all connections. but mirror maker B
>didn't pick up the topics that were consumed by A and left some of the
>topics unassigned. I think this is due to A not releasing ownership of
>those topics. My question is why A didn't release ownership upon receiving
>error?
>
>Here is the stack trace
>
>[2015-06-08 15:55:46,379] INFO [test_some-ip-1433797157389-247c1fc4],
>exception during rebalance  (kafka.consumer.ZookeeperConsumerConnector)
>org.I0Itec.zkclient.exception.ZkNoNodeException:
>org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
>NoNode for /consumers/test/ids/test_some-ip-1433797157389-247c1fc4
>at
>org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>at
>org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>at kafka.utils.ZkUtils$.readData(ZkUtils.scala:473)
>at
>kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:61)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consu
>mer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperCo
>nsumerConnector.scala:657)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerCon
>nector.scala:629)
>at 
>scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:620)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$sy
>ncedRebalance$1.apply(ZookeeperConsumerConnector.scala:620)
>at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebal
>ance(ZookeeperConsumerConnector.scala:619)
>at
>kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run
>(ZookeeperConsumerConnector.scala:572)
>Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
>KeeperErrorCode = NoNode for
>/consumers/test/ids/test_some-ip-1433797157389-247c1fc4
>at
>org.apache.zookeeper.KeeperException.create(KeeperException.java:111)
>at
>org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
>at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1155)
>at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1184)
>at 
>org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
>at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
>at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
>at
>org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
>... 13 more
>
>
>
>Here is the last part of the log
>
>[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
>Stopping leader finder thread (kafka.consumer.ConsumerFetcherManager)
>
>[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399]
>Stopping all fetchers (kafka.consumer.ConsumerFetcherManager)
>
>[2015-06-08 15:55:49,848] INFO [ConsumerFetcherManager-1433797157399] All
>connections stopped (kafka.consumer.ConsumerFetcherManager)
>
>[2015-06-08 15:55:49,848] INFO [test_some-ip-1433797157389-247c1fc4],
>Cleared all relevant queues for this fetcher
>(kafka.consumer.ZookeeperConsumerConnector)
>
>[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
>Cleared the data chunks in all the consumer message iterators
>(kafka.consumer.ZookeeperConsumerConnector)
>
>[2015-06-08 15:55:49,849] INFO [test_some-ip-1433797157389-247c1fc4],
>Invoking rebalance listener before relasing partition ownerships.
>(kafka.consumer.ZookeeperConsumerConnector)
>
>
>As seen in the log Mirror maker A didn't release ownership and it didn't
>attempt to trigger another round of rebalancing either. I checked zk. the
>node that was reported missing actually existed and it was created at the
>same time the error was thrown.
>
>
>I use the latest trunk code



Re: High CPU usage for idle kafka server

2015-06-08 Thread Jiangjie Qin
I guess apache mailing list does not support attachments…
Maybe you can paste it some where and send the link?

Jiangjie (Becket) Qin

From: "pundlik.anuja" mailto:pundlik.an...@gmail.com>>
Reply-To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Date: Monday, June 8, 2015 at 1:53 PM
To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: High CPU usage for idle kafka server

Hi Jiangjie,

Resending profiling screenshot for High CPU usage on Idle kafka broker.

Appreciate your help.

Thanks,
Anuja

On Mon, Jun 8, 2015 at 1:45 PM, pundlik.anuja 
mailto:pundlik.an...@gmail.com>> wrote:
Yes, partly it seems to be due to log cleaner thread.

I changed log cleaning settings for each broker (server*.properties) file.


# The minimum age of a log file to be eligible for deletion
log.retention.hours=1

# A size-based retention policy for logs. Segments are pruned from the log as 
long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted 
according
# to the retention policies
log.retention.check.interval.ms<http://log.retention.check.interval.ms>=1 
(** This was 1 ms before)

# By default the log cleaner is disabled and the log retention policy will 
default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual 
logs can then be marked for log compaction.
log.cleaner.enable=false (* this was true before)


Question:
Per my understanding, setting log.cleaner.enable=false will cause deletion of 
logs instead of compaction. Is that correct? Or will it cause the logs to keep 
on accumulating and filling up the disk space?


THanks,
Anuja

On Mon, Jun 8, 2015 at 1:24 PM, Jay Kreps 
mailto:jay.kr...@gmail.com>> wrote:
Could it also be that the log cleaner is running? This will definitely use
some CPU while the cleaning is occurring (it would attempt to use one cpu
per log cleaner thread you configure).

-Jay

On Mon, Jun 8, 2015 at 1:07 PM, Jiangjie Qin 
mailto:j...@linkedin.com.invalid>>
wrote:

> It seems the attachments are lost. But high CPU for ReplicaRetcherThread
> might be related to KAFKA-1461. Can you try to apply that patch and see if
> it solves problem?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> From: "pundlik.anuja" 
> mailto:pundlik.an...@gmail.com> pundlik.an...@gmail.com<mailto:pundlik.an...@gmail.com>>>
> Reply-To: 
> "users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org<mailto:users@kafka.apache.org>>"
>  <
> users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org<mailto:users@kafka.apache.org>>>
> Date: Monday, June 8, 2015 at 12:20 PM
> To: 
> "users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org<mailto:users@kafka.apache.org>>"
>  <
> users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org<mailto:users@kafka.apache.org>>>
> Subject: Re: High CPU usage for idle kafka server
>
> Seems to be LogCleaner and FetchRequest threads.
> Attached is the profiling screenshot
>
> On Fri, Jun 5, 2015 at 3:06 PM, Jiangjie Qin 
> mailto:j...@linkedin.com.invalid>
> <mailto:j...@linkedin.com.invalid<mailto:j...@linkedin.com.invalid>>> wrote:
> Has this to do with KAFKA-1461?
> Can you see which thread is taking a lot of cpu? Some jconsole plugin can
> get that information.
>
> Jiangjie (Becket) Qin
>
> On 6/5/15, 2:57 PM, "pundlik.anuja" 
> mailto:pundlik.an...@gmail.com> pundlik.an...@gmail.com<mailto:pundlik.an...@gmail.com>>> wrote:
>
> >Hi Jay,
> >
> >Good to hear from you. I met you at the kafka meetup at linkedin.
> >
> >- No, I am running kafka_2.11-0.8.2.1
> >
> >
> >Are there any logs/ any info that I can provide that will help you
> >understand what could be the issue?
> >
> >Thanks,
> >Anuja
> >
> >On Fri, Jun 5, 2015 at 2:36 PM, Jay Kreps 
> >mailto:jay.kr...@gmail.com> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >
> >> This sounds a lot like a bug we fixed in 0.8.2.0, no chance you are
> >>running
> >> that pre-release version is there?
> >>
> >> -Jay
> >>
> >> On Wed, Jun 3, 2015 at 9:43 PM, Anuja Pundlik

Re: High CPU usage for idle kafka server

2015-06-08 Thread Jiangjie Qin
It seems the attachments are lost. But high CPU for ReplicaRetcherThread might 
be related to KAFKA-1461. Can you try to apply that patch and see if it solves 
problem?

Thanks,

Jiangjie (Becket) Qin

From: "pundlik.anuja" mailto:pundlik.an...@gmail.com>>
Reply-To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Date: Monday, June 8, 2015 at 12:20 PM
To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: High CPU usage for idle kafka server

Seems to be LogCleaner and FetchRequest threads.
Attached is the profiling screenshot

On Fri, Jun 5, 2015 at 3:06 PM, Jiangjie Qin 
mailto:j...@linkedin.com.invalid>> wrote:
Has this to do with KAFKA-1461?
Can you see which thread is taking a lot of cpu? Some jconsole plugin can
get that information.

Jiangjie (Becket) Qin

On 6/5/15, 2:57 PM, "pundlik.anuja" 
mailto:pundlik.an...@gmail.com>> wrote:

>Hi Jay,
>
>Good to hear from you. I met you at the kafka meetup at linkedin.
>
>- No, I am running kafka_2.11-0.8.2.1
>
>
>Are there any logs/ any info that I can provide that will help you
>understand what could be the issue?
>
>Thanks,
>Anuja
>
>On Fri, Jun 5, 2015 at 2:36 PM, Jay Kreps 
>mailto:jay.kr...@gmail.com>> wrote:
>
>> This sounds a lot like a bug we fixed in 0.8.2.0, no chance you are
>>running
>> that pre-release version is there?
>>
>> -Jay
>>
>> On Wed, Jun 3, 2015 at 9:43 PM, Anuja Pundlik (apundlik) <
>> apund...@cisco.com<mailto:apund...@cisco.com>
>> > wrote:
>>
>> > Hi,
>> >
>> > I am using Kafka 0.8.2.1.
>> > We have 1 zookeeper, 3 kafka brokers.
>> > We have 9 topics, out of which 1 topic has 18 partitions, while
>>another
>> > has 12 partitions. All other topics have 1 partition each.
>> >
>> > We see that idle kafka brokers (not carrying any message) are using
>>more
>> > than 50% of CPU. See top output below.
>> >
>> > Is this a known issue?
>> >
>> >
>> > Thanks
>> >
>> >
>> >
>> > top - 04:42:30 up  2:07,  1 user,  load average: 1.50, 1.31, 0.92
>> > Tasks: 177 total,   1 running, 176 sleeping,   0 stopped,   0 zombie
>> > Cpu(s): 13.5%us,  4.5%sy,  0.0%ni, 81.3%id,  0.2%wa,  0.0%hi,  0.1%si,
>> > 0.4%st
>> > Mem:  65974296k total, 22310524k used, 43663772k free,   112688k
>>buffers
>> > Swap:0k total,0k used,0k free, 13382460k
>>cached
>> >
>> >   PID USER  PR  NI  VIRT  RES  SHR S %CPU %MEMTIME+ COMMAND
>> >  9295 wae   20   0 5212m 894m  12m S   62  1.4  22:50.99 java
>> >  9323 wae   20   0 5502m 894m  12m S   56  1.4  24:28.69 java
>> >  9353 wae   20   0 5072m 896m  12m S   54  1.4  17:04.31 java
>> >
>>




Re: callback handler is not getting called if cluster is down

2015-06-08 Thread Jiangjie Qin
KIP-19 should help in your case.

Jiangjie (Becket) Qin

On 6/8/15, 11:55 AM, "ankit tyagi"  wrote:

>Yes Jiangjie,
>
>I was using 1 broker with 1 replication factor for testing purpose.
>
>*Is there any way to detect broker failure with callback handler or any
>other means  while sending messages ??*
>
>On Mon, Jun 8, 2015 at 10:46 PM, Jiangjie Qin 
>wrote:
>
>> What replication factor are you using?
>>
>> Currently if a partition is offline, the message in producer will not be
>> sent but sit in accumulator until the partition comes back online. Do
>>you
>> mean you want to use the message send callback to detect broker failure?
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/8/15, 12:27 AM, "ankit tyagi"  wrote:
>>
>> >Hi,
>> >
>> >we are using .8.2.0 broker and default async producer to send the
>> >message*.
>> >we recently found out that if whole cluster gets down then callback
>> >handler
>> >is not getting called while we are getting below exception
>>continuously*
>> >
>> >
>> >
>> >*12:36:41,267# WARN  [Selector] - Error in I/O with localhost/127.0.0.1
>> ><http://127.0.0.1>java.net.ConnectException: Connection refusedat
>> >sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)*
>> >at 
>>sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>> >at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
>> >at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>> >at 
>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>> >at 
>>org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>> >at java.lang.Thread.run(Thread.java:745)
>> >
>> >I want to know is there any way I can catch to figure out cluster is
>> >down problematically  and log all the failed message so that it can be
>> >retry later after cluster state changes.
>>
>>



Re: New producer very slow to call Callback on error

2015-06-08 Thread Jiangjie Qin
KIP-19 should address this issue.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request+ti
meout+to+NetworkClient

Thanks,

Jiangjie (Becket) Qin

On 6/8/15, 10:44 AM, "Christofer Hedbrandh"  wrote:

>I think this question might relate to the very recently posted "callback
>handler is not getting called if cluster is down" topic from "ankit
>tyagi".
>
>I am using the 0.8.2.1 new producer send(ProducerRecord record,
>Callback callback) with a Callback and never calling .get() on the
>Future. I have noticed that it takes ~10-15 minutes before
>Callback.onCompletion() is called with an Exception when I take down the
>Kafka cluster or block connections.
>
>From the documentation it is unclear what the expected producer behavior
>should be. I had assumed that the configuration "retries" and "timeout.ms"
>should control how long it would take for the Callback to be called with
>an
>Exception, but that does not seem to be the case.
>
>How do I know how long it will take for the Callback to be called with an
>Exception. And can I somehow configure this to be much shorter than ~10-15
>minutes.
>
>relevant broker configs:
>zookeeper.session.timeout.ms: 6000
>zookeeper.connection.timeout.m: 100
>
>relevant producer configs:
>retries: 0
>linger.ms: 0
>timeout.ms: 3
>metadata.fetch.timeout.ms: 1
>metadata.max.age.ms: 1
>reconnect.backoff.ms: 10
>retry.backoff.ms: 100
>
>
>Thanks,
>Christofer



Re: simple consumer giving error always

2015-06-08 Thread Jiangjie Qin
That is offset out of range error. So you are fetching from an offset
either greater than latest offset or smaller than earliest offset
available on broker.

Jiangjie (Becket) Qin

On 6/8/15, 2:21 AM, "sunil kalva"  wrote:

>Hi
>For few topics i always get FetchResponse.error code as "1", i am using
>simple consumer 0.8.1. What are the reasons to get this error. and any fix
>for this ?
>
>-- 
>SunilKalva



Re: callback handler is not getting called if cluster is down

2015-06-08 Thread Jiangjie Qin
What replication factor are you using?

Currently if a partition is offline, the message in producer will not be
sent but sit in accumulator until the partition comes back online. Do you
mean you want to use the message send callback to detect broker failure?

Jiangjie (Becket) Qin

On 6/8/15, 12:27 AM, "ankit tyagi"  wrote:

>Hi,
>
>we are using .8.2.0 broker and default async producer to send the
>message*.
>we recently found out that if whole cluster gets down then callback
>handler
>is not getting called while we are getting below exception continuously*
>
>
>
>*12:36:41,267# WARN  [Selector] - Error in I/O with localhost/127.0.0.1
>java.net.ConnectException: Connection refusedat
>sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)*
>at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
>at java.lang.Thread.run(Thread.java:745)
>
>I want to know is there any way I can catch to figure out cluster is
>down problematically  and log all the failed message so that it can be
>retry later after cluster state changes.



Re: High CPU usage for idle kafka server

2015-06-05 Thread Jiangjie Qin
Has this to do with KAFKA-1461?
Can you see which thread is taking a lot of cpu? Some jconsole plugin can
get that information.

Jiangjie (Becket) Qin

On 6/5/15, 2:57 PM, "pundlik.anuja"  wrote:

>Hi Jay,
>
>Good to hear from you. I met you at the kafka meetup at linkedin.
>
>- No, I am running kafka_2.11-0.8.2.1
>
>
>Are there any logs/ any info that I can provide that will help you
>understand what could be the issue?
>
>Thanks,
>Anuja
>
>On Fri, Jun 5, 2015 at 2:36 PM, Jay Kreps  wrote:
>
>> This sounds a lot like a bug we fixed in 0.8.2.0, no chance you are
>>running
>> that pre-release version is there?
>>
>> -Jay
>>
>> On Wed, Jun 3, 2015 at 9:43 PM, Anuja Pundlik (apundlik) <
>> apund...@cisco.com
>> > wrote:
>>
>> > Hi,
>> >
>> > I am using Kafka 0.8.2.1.
>> > We have 1 zookeeper, 3 kafka brokers.
>> > We have 9 topics, out of which 1 topic has 18 partitions, while
>>another
>> > has 12 partitions. All other topics have 1 partition each.
>> >
>> > We see that idle kafka brokers (not carrying any message) are using
>>more
>> > than 50% of CPU. See top output below.
>> >
>> > Is this a known issue?
>> >
>> >
>> > Thanks
>> >
>> >
>> >
>> > top - 04:42:30 up  2:07,  1 user,  load average: 1.50, 1.31, 0.92
>> > Tasks: 177 total,   1 running, 176 sleeping,   0 stopped,   0 zombie
>> > Cpu(s): 13.5%us,  4.5%sy,  0.0%ni, 81.3%id,  0.2%wa,  0.0%hi,  0.1%si,
>> > 0.4%st
>> > Mem:  65974296k total, 22310524k used, 43663772k free,   112688k
>>buffers
>> > Swap:0k total,0k used,0k free, 13382460k
>>cached
>> >
>> >   PID USER  PR  NI  VIRT  RES  SHR S %CPU %MEMTIME+ COMMAND
>> >  9295 wae   20   0 5212m 894m  12m S   62  1.4  22:50.99 java
>> >  9323 wae   20   0 5502m 894m  12m S   56  1.4  24:28.69 java
>> >  9353 wae   20   0 5072m 896m  12m S   54  1.4  17:04.31 java
>> >
>>



Re: How to prevent custom Partitioner from increasing the number of producer's requests?

2015-06-04 Thread Jiangjie Qin
>From the code you pasted, that is old producer.
The new producer class is org.apache.kafka.clients.producer.KafkaProducer.

The new producer does not have sticky partition behavior. The default
partitioner use round-robin like way to send non-keyed messages to
partitions.

Jiangjie (Becket) Qin

On 6/3/15, 11:35 PM, "Sebastien Falquier" 
wrote:

>I am using this code (from "org.apache.kafka" % "kafka_2.10" % "0.8.2.0"),
>no idea if it is the old producer or the new one
>
>import kafka.producer.Produced
>import kafka.producer.ProducerConfig
>val prodConfig : ProducerConfig = new ProducerConfig(properties)
>val producer : Producer[Integer,String] = new
>Producer[Integer,String](prodConfig)
>
>How can I know which producer I am using? And what is the behavior of the
>new producer?
>
>Thanks,
>Sébastien
>
>
>2015-06-03 20:04 GMT+02:00 Jiangjie Qin :
>
>>
>> Are you using new producer or old producer?
>> The old producer has 10 min sticky partition behavior while the new
>> producer does not.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On 6/2/15, 11:58 PM, "Sebastien Falquier" 
>> wrote:
>>
>> >Hi Jason,
>> >
>> >The default partitioner does not make the job since my producers
>>haven't a
>> >smooth traffic. What I mean is that they can deliver lots of messages
>> >during 10 minutes and less during the next 10 minutes, that is too say
>>the
>> >first partition will have stacked most of the messages of the last 20
>> >minutes.
>> >
>> >By the way, I don't understand your point about breaking batch into 2
>> >separate partitions. With that code, I jump to a new partition on
>>message
>> >201, 401, 601, ... with batch size = 200, where is my mistake?
>> >
>> >Thanks for your help,
>> >Sébastien
>> >
>> >2015-06-02 16:55 GMT+02:00 Jason Rosenberg :
>> >
>> >> Hi Sebastien,
>> >>
>> >> You might just try using the default partitioner (which is random).
>>It
>> >> works by choosing a random partition each time it re-polls the
>>meta-data
>> >> for the topic.  By default, this happens every 10 minutes for each
>>topic
>> >> you produce to (so it evenly distributes load at a granularity of 10
>> >> minutes).  This is based on 'topic.metadata.refresh.interval.ms'.
>> >>
>> >> I suspect your code is causing double requests for each batch, if
>>your
>> >> partitioning is actually breaking up your batches into 2 separate
>> >> partitions.  Could be an off by 1 error, with your modulo
>>calculation?
>> >> Perhaps you need to use '% 0' instead of '% 1' there?
>> >>
>> >> Jason
>> >>
>> >>
>> >>
>> >> On Tue, Jun 2, 2015 at 3:35 AM, Sebastien Falquier <
>> >> sebastien.falqu...@teads.tv> wrote:
>> >>
>> >> > Hi guys,
>> >> >
>> >> > I am new to Kafka and I am facing a problem I am not able to sort
>>out.
>> >> >
>> >> > To smooth traffic over all my brokers' partitions, I have coded a
>> >>custom
>> >> > Paritioner for my producers, using a simple round robin algorithm
>>that
>> >> > jumps from a partition to another on every batch of messages
>> >> (corresponding
>> >> > to batch.num.messages value). It looks like that :
>> >> > https://gist.github.com/sfalquier/4c0c7f36dd96d642b416
>> >> >
>> >> > With that fix, every partitions are used equally, but the amount of
>> >> > requests from the producers to the brokers have been multiplied by
>>2.
>> >>I
>> >> do
>> >> > not understand since all producers are async with
>> >>batch.num.messages=200
>> >> > and the amount of messages processed is still the same as before.
>>Why
>> >>do
>> >> > producers need more requests to do the job? As internal traffic is
>>a
>> >>bit
>> >> > critical on our platform, I would really like to reduce producers'
>> >> requests
>> >> > volume if possible.
>> >> >
>> >> > Any idea? Any suggestion?
>> >> >
>> >> > Regards,
>> >> > Sébastien
>> >> >
>> >>
>>
>>



Re: How to prevent custom Partitioner from increasing the number of producer's requests?

2015-06-03 Thread Jiangjie Qin

Are you using new producer or old producer?
The old producer has 10 min sticky partition behavior while the new
producer does not. 

Thanks,

Jiangjie (Becket) Qin

On 6/2/15, 11:58 PM, "Sebastien Falquier" 
wrote:

>Hi Jason,
>
>The default partitioner does not make the job since my producers haven't a
>smooth traffic. What I mean is that they can deliver lots of messages
>during 10 minutes and less during the next 10 minutes, that is too say the
>first partition will have stacked most of the messages of the last 20
>minutes.
>
>By the way, I don't understand your point about breaking batch into 2
>separate partitions. With that code, I jump to a new partition on message
>201, 401, 601, ... with batch size = 200, where is my mistake?
>
>Thanks for your help,
>Sébastien
>
>2015-06-02 16:55 GMT+02:00 Jason Rosenberg :
>
>> Hi Sebastien,
>>
>> You might just try using the default partitioner (which is random).  It
>> works by choosing a random partition each time it re-polls the meta-data
>> for the topic.  By default, this happens every 10 minutes for each topic
>> you produce to (so it evenly distributes load at a granularity of 10
>> minutes).  This is based on 'topic.metadata.refresh.interval.ms'.
>>
>> I suspect your code is causing double requests for each batch, if your
>> partitioning is actually breaking up your batches into 2 separate
>> partitions.  Could be an off by 1 error, with your modulo calculation?
>> Perhaps you need to use '% 0' instead of '% 1' there?
>>
>> Jason
>>
>>
>>
>> On Tue, Jun 2, 2015 at 3:35 AM, Sebastien Falquier <
>> sebastien.falqu...@teads.tv> wrote:
>>
>> > Hi guys,
>> >
>> > I am new to Kafka and I am facing a problem I am not able to sort out.
>> >
>> > To smooth traffic over all my brokers' partitions, I have coded a
>>custom
>> > Paritioner for my producers, using a simple round robin algorithm that
>> > jumps from a partition to another on every batch of messages
>> (corresponding
>> > to batch.num.messages value). It looks like that :
>> > https://gist.github.com/sfalquier/4c0c7f36dd96d642b416
>> >
>> > With that fix, every partitions are used equally, but the amount of
>> > requests from the producers to the brokers have been multiplied by 2.
>>I
>> do
>> > not understand since all producers are async with
>>batch.num.messages=200
>> > and the amount of messages processed is still the same as before. Why
>>do
>> > producers need more requests to do the job? As internal traffic is a
>>bit
>> > critical on our platform, I would really like to reduce producers'
>> requests
>> > volume if possible.
>> >
>> > Any idea? Any suggestion?
>> >
>> > Regards,
>> > Sébastien
>> >
>>



Re: Kafka Not Commiting Messages

2015-05-28 Thread Jiangjie Qin
Actually the name should be log4j.logger.kafka.network.RequestChannel$
It should be there in 0.8.2.1. Can you check it again?

From: Charlie Mason mailto:charlie@gmail.com>>
Reply-To: "charlie@gmail.com<mailto:charlie@gmail.com>" 
mailto:charlie@gmail.com>>
Date: Thursday, May 28, 2015 at 1:09 PM
To: Jiangjie Qin mailto:j...@linkedin.com>>
Cc: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: Kafka Not Commiting Messages

Hi Jiangjie,

Thanks for you message. Unfortunately there doesn't appear to have that setting 
in log4j.properties in Kafka 0.8.2.1.

I tried adding that property to the log file however it doesn't seem to have 
any effect.

Is there a different logger I need to configure for 0.8.2.1?


Thanks,

Charlie M

On Thu, May 28, 2015 at 1:15 AM, Jiangjie Qin 
mailto:j...@linkedin.com>> wrote:
Can you turn on TRACE level logging for kafka-request.log and see if
broker received the producer request or not?
You can go to KAKFA_FOLDER/config/log4j.properties and change
log4j.logger.kafka.network.RequestChannels to TRACE.

Jiangjie (Becket) Qin

On 5/27/15, 12:12 PM, "Charlie Mason" 
mailto:charlie@gmail.com>> wrote:

>Hi All,
>
>So I have done some more tests and found something I really don't
>understand.
>
>I found a simple example of the Kafka Java producer so I ran that pointing
>at the same topic as my last test. That failed when run from my local
>machine. I uploaded it to the VM where Kafka is installed and it worked
>perfectly. Bare in mind this exactly the same code configured to talk to
>exactly the same IP address.
>
>The thing I don't understand is I can see a connection via Wireshark to
>Kafka when the code is running on my machine. I can even query for the
>topic metadata successfully from my local machine. The only thing it won't
>do is commit a message. From what I have read the Kafka protocol is a
>fairly straight forward single TCP connection to the server from the
>client.
>
>Does anyone have any ideas what be causing this? Obviously people run in
>production with the producer on different nodes from the brokers, so I
>can't see whats different here.
>
>Thanks,
>
>Charlie M
>
>
>
>
>On Tue, May 26, 2015 at 9:09 PM, Charlie Mason 
>mailto:charlie@gmail.com>>
>wrote:
>
>> Hi All,
>>
>> I have been trying to get started with Kafka. I have set up an 0.8.2
>> broker as per the quick start. With a single node broker I am able to
>>run
>> the scripts in the bin folder to successfully produce and consume
>>messages.
>>
>> I then tried to write some Scala code to use the new 0.8 Producer API to
>> produce messages. However nothing seems to appear on the consumer. I
>> modified the producer code to wait for the broker's metadata response.
>>That
>> blocks and then times out.
>>
>> The only difference I can see apart from using the new Producer API is
>>all
>> my previous tests were run inside the VM Kafka is installed on. Where
>>as my
>> code is running on the host machine and connecting into Kafka. I put
>>some
>> code into get the topics metadata to confirm connectivity to Kafka. That
>> prints the topic metadata correctly before hanging on the
>>send(...).get().
>> I have also checked the topic is set for a replication factor of 1. I
>>can't
>> see anything in the Kafka logs either. All I see on the broker is a
>>message
>> when the Producer times out saying the client disconnected.
>>
>> Anyone got any ideas what might be making Kafka fail to commit the
>> messages?
>>
>> I really want to start playing with Kafka however I seem to have fallen
>>at
>> the first hurdle.
>>
>> Thanks,
>>
>> Charlie M
>>




Re: Java - High Level Consumer

2015-05-28 Thread Jiangjie Qin
Auto.offset.reset only comes into place when
1. the consumer fetches message from offset out of range - available
offsets on broker side
2. The consumer group has no offset committed.

So in your case, I guess high level consumer has already committed its
offset before and that offset is not out of range. When you start consume
later with the same consumer group id, it will resume from the last
committed offset.
If you start a new consumer with a new consumer group and set
auto.offset.reset to largest, you should see only new messages.

Jiangjie (Becket) Qin

On 5/28/15, 9:31 AM, "Panda, Samaresh"  wrote:

>I'm following this page
>https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>and am able to consume messages using four threads. The threads keep
>receiving messages, which is good. However, I just want to receive only
>NEW messages and not the old ones.
>
>Since the default for auto.offset.reset is 'largest' I expected to
>receive only new ones, but the messages keep coming including some really
>old ones.
>
>Any other configuration or property change that lets me accomplish this?
>
>Thanks
>
>
>



Re: Compatibility of 0.8.2 client API (new Producer API) and 0.8.1 Kafka server

2015-05-27 Thread Jiangjie Qin
It should work, but usually we prefer the server version to be not lower
than client version.

On 5/27/15, 3:12 PM, "Zhuo Liu"  wrote:

>Dear all,
>
>
>In 0.8.2.1 Kafka, there is new Producer API (KafkaProducer etc.).
>
>My question is: will 0.8.2.1 new Producer API
>
>be able to successfully talk to a Kafka server cluster running
>
>with 0.8.1.1 Kafka?
>
>
>Thanks very much!
>
>
>Best Regards,
>Zhuo Liu
>Ph.D. Student, CSSE department
>Auburn University, AL 36849
>http://www.auburn.edu/~zzl0014/



Re: Kafka Not Commiting Messages

2015-05-27 Thread Jiangjie Qin
Can you turn on TRACE level logging for kafka-request.log and see if
broker received the producer request or not?
You can go to KAKFA_FOLDER/config/log4j.properties and change
log4j.logger.kafka.network.RequestChannels to TRACE.

Jiangjie (Becket) Qin

On 5/27/15, 12:12 PM, "Charlie Mason"  wrote:

>Hi All,
>
>So I have done some more tests and found something I really don't
>understand.
>
>I found a simple example of the Kafka Java producer so I ran that pointing
>at the same topic as my last test. That failed when run from my local
>machine. I uploaded it to the VM where Kafka is installed and it worked
>perfectly. Bare in mind this exactly the same code configured to talk to
>exactly the same IP address.
>
>The thing I don't understand is I can see a connection via Wireshark to
>Kafka when the code is running on my machine. I can even query for the
>topic metadata successfully from my local machine. The only thing it won't
>do is commit a message. From what I have read the Kafka protocol is a
>fairly straight forward single TCP connection to the server from the
>client.
>
>Does anyone have any ideas what be causing this? Obviously people run in
>production with the producer on different nodes from the brokers, so I
>can't see whats different here.
>
>Thanks,
>
>Charlie M
>
>
>
>
>On Tue, May 26, 2015 at 9:09 PM, Charlie Mason 
>wrote:
>
>> Hi All,
>>
>> I have been trying to get started with Kafka. I have set up an 0.8.2
>> broker as per the quick start. With a single node broker I am able to
>>run
>> the scripts in the bin folder to successfully produce and consume
>>messages.
>>
>> I then tried to write some Scala code to use the new 0.8 Producer API to
>> produce messages. However nothing seems to appear on the consumer. I
>> modified the producer code to wait for the broker's metadata response.
>>That
>> blocks and then times out.
>>
>> The only difference I can see apart from using the new Producer API is
>>all
>> my previous tests were run inside the VM Kafka is installed on. Where
>>as my
>> code is running on the host machine and connecting into Kafka. I put
>>some
>> code into get the topics metadata to confirm connectivity to Kafka. That
>> prints the topic metadata correctly before hanging on the
>>send(...).get().
>> I have also checked the topic is set for a replication factor of 1. I
>>can't
>> see anything in the Kafka logs either. All I see on the broker is a
>>message
>> when the Producer times out saying the client disconnected.
>>
>> Anyone got any ideas what might be making Kafka fail to commit the
>> messages?
>>
>> I really want to start playing with Kafka however I seem to have fallen
>>at
>> the first hurdle.
>>
>> Thanks,
>>
>> Charlie M
>>



Re: KafkaException: Size of FileMessageSet has been truncated during write

2015-05-27 Thread Jiangjie Qin
This should be just a message fetch failure. The socket was disconnected
when broker was writing to it. There should not be data loss.

Jiangjie (Becket) Qin

On 5/27/15, 11:00 AM, "Andrey Yegorov"  wrote:

>I've noticed a few exceptions in the logs like the one below, does it
>indicate data loss? should I worry about this?
>What is the possible reason for this to happen?
>I am using kafka 0.8.1.1
>
>ERROR Closing socket for /xx.xxx.xxx.xxx because of error
>(kafka.network.Processor)
>
>kafka.common.KafkaException: Size of FileMessageSet
>/data/kafka/topic-name-11/14340499.log has been truncated
>during write: old size 26935, new size 0
>
>at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:144)
>
>at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
>
>at kafka.network.MultiSend.writeTo(Transmission.scala:102)
>
>at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
>
>at kafka.network.MultiSend.writeTo(Transmission.scala:102)
>
>at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
>
>at kafka.network.Processor.write(SocketServer.scala:375)
>
>at kafka.network.Processor.run(SocketServer.scala:247)
>
>at java.lang.Thread.run(Thread.java:745)
>
>--
>Andrey Yegorov



Re: Delete topic pending

2015-05-18 Thread Jiangjie Qin
Hmm, which Kafka version are you running? From the code it looks we should
have already ignored the deletion of a topic if it does not exist.

Jiangjie (Becket) Qin

On 5/18/15, 10:15 AM, "Dillian Murphey"  wrote:

>If a broker doesn't have the topic, and I run delete topic, that topic
>will
>be in a "pending delete" state forever.
>
>What am I doing wrong here?
>
>Also, what if I have data loss and I just want to delete the dang topic
>form zookeeper directly with non of this pending stuff.
>
>Thanks



Re: Consumers are not load balanced against partitions

2015-05-14 Thread Jiangjie Qin
The default range partition assignment algorithm will assign partition on
per topic basis. If you have more consumer threads than number of
partitions in a topic, some threads will not be assigned any partition.
If you are consuming from multiple topics, You might want to set the
partition.assignment.strategy to ³roundrobin². This will assign partitions
based on all the topics the consumer group is consuming from. But remember
there are two restrictions for this algorithm:
1. All consumers in the same consumer group must have same number of
consumer threads.
2. All consumers in the same consumer group must consume from the same set
of topics.

Thanks.

Jiangjie (Becket) Qin

On 5/14/15, 3:28 PM, "Gomathivinayagam Muthuvinayagam"
 wrote:

>I have started multiple consumers with some time delay. Even after long
>period of time, later joined consumers are not getting any distribution of
>partitions. Only one consumer is loaded with all the partitions. I dont
>see
>any configuration parameter to change this behavior.
>
>Did anyone face similar problem? I started all the consumers with same
>number of consumer threads.  Note that all the consumers are started
>within
>the same host. Also I see consumer ids, on zookeeper.
>
>If you guys can share any to fix this problem that will be great.
>
>Thanks & Regards,



Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-14 Thread Jiangjie Qin

If you can reproduce this problem steadily, once you see this issue, can
you grep the controller log for topic partition in question and see if
there is anything interesting?

Thanks.

Jiangjie (Becket) Qin

On 5/14/15, 3:43 AM, "tao xiao"  wrote:

>Yes, it does exist in ZK and the node that had the
>NotLeaderForPartitionException
>is the leader of the topic
>
>On Thu, May 14, 2015 at 6:12 AM, Jiangjie Qin 
>wrote:
>
>> Does this topic exist in Zookeeper?
>>
>> On 5/12/15, 11:35 PM, "tao xiao"  wrote:
>>
>> >Hi,
>> >
>> >Any updates on this issue? I keep seeing this issue happening over and
>> >over
>> >again
>> >
>> >On Thu, May 7, 2015 at 7:28 PM, tao xiao  wrote:
>> >
>> >> Hi team,
>> >>
>> >> I have a 12 nodes cluster that has 800 topics and each of which has
>> >>only 1
>> >> partition. I observed that one of the node keeps generating
>> >> NotLeaderForPartitionException that causes the node to be
>>unresponsive
>> >>to
>> >> all requests. Below is the exception
>> >>
>> >> [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error
>>for
>> >> partition [topic1,0] to broker 12:class
>> >> kafka.common.NotLeaderForPartitionException
>> >> (kafka.server.ReplicaFetcherThread)
>> >>
>> >> All other nodes in the cluster generate lots of replication error
>>too as
>> >> shown below due to unresponsiveness of above node.
>> >>
>> >> [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
>> >> request with correlation id 3630911 from client
>> >>ReplicaFetcherThread-0-1 on
>> >> partition [topic1,0] failed due to Leader not local for partition
>> >> [cg22_user.item_attr_info.lcr,0] on broker 1
>> >>(kafka.server.ReplicaManager)
>> >>
>> >> Any suggestion why the node runs into the unstable stage and any
>> >> configuration I can set to prevent this?
>> >>
>> >> I use kafka 0.8.2.1
>> >>
>> >> And here is the server.properties
>> >>
>> >>
>> >> broker.id=5
>> >> port=9092
>> >> num.network.threads=3
>> >> num.io.threads=8
>> >> socket.send.buffer.bytes=1048576
>> >> socket.receive.buffer.bytes=1048576
>> >> socket.request.max.bytes=104857600
>> >> log.dirs=/mnt/kafka
>> >> num.partitions=1
>> >> num.recovery.threads.per.data.dir=1
>> >> log.retention.hours=1
>> >> log.segment.bytes=1073741824
>> >> log.retention.check.interval.ms=30
>> >> log.cleaner.enable=false
>> >> zookeeper.connect=ip:2181
>> >> zookeeper.connection.timeout.ms=6000
>> >> unclean.leader.election.enable=false
>> >> delete.topic.enable=true
>> >> default.replication.factor=3
>> >> num.replica.fetchers=3
>> >> delete.topic.enable=true
>> >> kafka.metrics.reporters=report.KafkaMetricsCollector
>> >> straas.hubble.conf.file=/etc/kafka/report.conf
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> Regards,
>> >> Tao
>> >>
>> >
>> >
>> >
>> >--
>> >Regards,
>> >Tao
>>
>>
>
>
>-- 
>Regards,
>Tao



Re: Compression and batching

2015-05-13 Thread Jiangjie Qin
Yes, in old producer we don¹t control the compressed message size. In new
producer, we estimate the compressed size heuristically and decide whether
to close the batch or not. It is not perfect but at least better than the
old one.

Jiangjie (Becket) Qin

On 5/13/15, 4:00 PM, "Jamie X"  wrote:

>Jiangjie, I changed my code to group by partition, then for each partition
>to group mesages into up to 900kb of uncompressed data, and then sent
>those
>batches out. That worked fine and didn't cause any MessageTooLarge errors.
>So it looks like the issue is that the producer batches all the messages
>of
>a certain partition together and then compresses them, which may end up
>too
>large.
>
>It would be nice if the producer could do something smarter here, but it's
>probably difficult to predict post-compression size and whether it would
>hit a limit and the like :/
>
>
>On Wed, May 13, 2015 at 9:57 AM, Jamie X  wrote:
>
>> (sorry if this messes up the mailing list, I didn't seem to get replies
>>in
>> my inbox)
>>
>> Jiangjie, I am indeed using the old producer, and on sync mode.
>>
>> > Notice that the old producer uses number of messages as batch
>>limitation
>> instead of number of bytes.
>>
>> Can you clarify this? I see a setting batch.num.messages but it is only
>> for async and wouldn't affect sync mode.
>>
>> > But in your case, it seems you have a single message whose compressed
>> size is larger than the max message size Kafka broker accepts. Any idea
>>why?
>>
>> I don't think this is the case as my messages are at most 70kb when
>> uncompressed. (I checked the message sizes it was trying to send)
>>
>> When you say
>> > the list of message will be sent as a batch
>>
>> does that mean that the producer would group the messages by partition,
>> and for each partition, it would batch all the messages for that
>>partition
>> together, regardless of whether it would exceed a size limit? If so that
>> may explain things.
>>
>> Thanks,
>> Jamie
>>
>>
>> On Tue, May 12, 2015 at 4:40 PM, Jamie X  wrote:
>>
>>> Hi,
>>>
>>> I'm wondering when you call kafka.javaapi.Producer.send() with a list
>>>of
>>> messages, and also have compression on (snappy in this case), how does
>>>it
>>> decide how many messages to put together as one?
>>>
>>> The reason I'm asking is that even though my messages are only 70kb
>>> uncompressed, the broker complains that I'm hitting the 1mb message
>>>limit
>>> such as:
>>>
>>>
>>> kafka.common.MessageSizeTooLargeException: Message size is 1035608
>>>bytes
>>> which exceeds the maximum configured message size of 112.
>>> at
>>> 
>>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:37
>>>8)
>>> at
>>> 
>>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:36
>>>1)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at 
>>>kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>>> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
>>> at kafka.log.Log.append(Log.scala:257)
>>> at
>>> 
>>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partitio
>>>n.scala:379)
>>> at
>>> 
>>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partitio
>>>n.scala:365)
>>> at kafka.utils.Utils$.inLock(Utils.scala:535)
>>> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>>> at
>>> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>>> at
>>> 
>>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala
>>>:291)
>>> at
>>> 
>>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala
>>>:282)
>>> at
>>> 
>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
>>>ala:244)
>>> at
>>> 
>>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sc
>>>ala:244)
>>> at
>>> 
>>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:
>>>98)
>>> at
>>> 
>>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:
>>>98)
>>> at
>>> 
>>>scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:22
>>>6)
>>> at 
>>>scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>>
>>> Thanks,
>>> Jamie
>>>
>>
>>



Re: Experiences testing new producer performance across multiple threads/producer counts

2015-05-13 Thread Jiangjie Qin
Thanks for sharing this, Garry. I actually did similar tests before but
unfortunately lost the test data because my laptop rebooted and I forgot
to save the dataŠ

Anyway, several things to verify:

1. Remember KafkaProducer holds lock per partition. So if you have only
one partition in the target topic and many application threads. Lock
contention could be an issue.

2. It matters that how frequent the sender thread wake up and runs. You
can take a look at the following sensors to further verify whether the
sender thread really become a bottleneck or not:
Select-rate
Io-wait-time-ns-avg
Io-time-ns-avg

3. Batch size matters, so take a look at the sensor batch-size-avg and see
if the average batch size makes sense or not.

Looking forward to your further profiling. My thinking is that unless you
are sending very small messages to a small number of partitions. You don¹t
need to worry about to use more than one producer.

Thanks.

Jiangjie (Becket) Qin



On 5/13/15, 2:40 PM, "Garry Turkington" 
wrote:

>Hi,
>
>I talked with Gwen at Strata last week and promised to share some of my
>experiences benchmarking an app reliant on the new  producer. I'm using
>relatively meaty boxes running my producer code (24 core/64GB RAM) but I
>wasn't pushing them until I got them on the same 10GB fabric as the Kafka
>cluster they are using (saturating the prior 1GB NICs was just too easy).
>There are 5 brokers, 24 core/192GB RAM/8*2TB disks, running 0.8.2.1.
>
>With lots of cores and a dedicated box the question was then how to
>deploy my application. In particular how many worker threads and how many
>instances of the KafkaProducer  to share amongst them. I also wanted to
>see how things would change as I scale up the thread count.
>
>I ripped out the data retrieval part of my app (it reads from S3) and
>instead replaced it with some code to produce random records of average
>size 500 bytes but varying between 250 and 750. I started the app
>running, ignored the first 25m messages then measured the timing for the
>next 100m and  calculated the average messages/sec written to Kafka
>across that run.
>
>Starting small I created 4 application threads with a range of approaches
>to sharing KafkaProducer instances. The records written to the Kafka
>cluster per second were as follows:
>
>4 threads all sharing 1 client: 332K
>4 threads sharing 2 clients: 333K
>4 threads, dedicated client per thread: 310K
>
>Note that when I had 2 KafkaProducer clients as in the second line above
>each was used by 2 threads. Similar below, number of threads/number of
>clients is the max number of threads per KafkaProducer instance.
>
>As can be seen from the above there's not much in it. Scaling up to 8
>application threads the numbers  looked like:
>
>8 threads sharing 1 client: 387K
>8 threads sharing 2 clients: 609K
>8 threads sharing 4 clients: 628K
>8 threads with dedicated  client per thread: 527K
>
>This time sharing a single producer client  across all threads has by far
>the worse performance and  isn't much better than when using 4
>application threads. The 2 and 4 client options are much better and are
>in the ballpark of 2x the 4 thread performance. A dedicated client per
>thread isn't quite as good but isn't so far off to be unusable. So then
>taking it to 16 application threads:
>
>16 threads sharing 1 client: 380K
>16 threads sharing 2 clients: 675K
>16 threads sharing 4 clients: 869K
>16 threads sharing 8 clients: 733K
>16 threads  with a dedicated client per thread: 491K
>
>This gives a much clearer performance curve. The 16 thread/4 producer
>client is by far the best performance but it is still far from 4x the
>4-thread or 2x the 8-thread mark. At this point I seem to be hitting some
>limiting factor. On the client machine memory was still lightly used,
>network was peaking just over 4GB/sec but CPU load was showing 1 minute
>load averages around 18-20. CPU load did seem to increase with as did the
>number of KafkaProducer instances but that is more a conclusion from
>memory and not backed by hard numbers.
>
>For completeness sake I did do a 24 thread test but the numbers are as
>you'd expect. 1 client and 24 both showed poor performance. 4,6 or 8
>clients (24 has more  ways of dividing it by 2!) all showed performance
>around that of the 16 thread/4 client run above. The other configs were
>in-between.
>
>With my full application I've found the best deployment so far is to have
>  multiple instances running on the same box. I can get much better
>performance from 3 instances each with 8 threads than 1 instance with 24
>threads. This is almost certainly because when adding in my own
>application logic and the AWS clients there is just a lot more contention
>- not to mention much more i/o waits -- in each application instance. The
>benchmark variant doesn't have as much happening but just to compare I
>ran a few concurrent instances:
>
>2 copies of 8 threads sharing 4 clients: 780K total
>2 copies of 8 threads sharing 2 clients: 8

Re: Auto-rebalance not triggering in 2.10-0.8.1.1

2015-05-13 Thread Jiangjie Qin
Automatic preferred leader election hasn¹t been turned on in 0.8.1.1. It¹s
been turned on in latest trunk though.

The config name is ³auto.leader.rebalance.enable".

Jiangjie (Becket) Qin

On 5/13/15, 10:50 AM, "Stephen Armstrong" 
wrote:

>Does anyone have any insight into this? Am I correct that 0.8.1.1 should
>be
>running the leader election automatically? If this is a known issue, is
>there any reason not to have a cron script that runs the leader election
>regularly?
>
>Thanks
>Steve
>
>On Thu, May 7, 2015 at 2:47 PM, Stephen Armstrong <
>stephen.armstr...@linqia.com> wrote:
>
>> I'm running 2.10-0.8.1.1, and rebalance will not trigger on it's own.
>>From
>> 
>>http://grokbase.com/t/kafka/users/14bj5ps9hp/partition-auto-rebalance#201
>>41118rf39q8cs4sjh6vzjgdw92e37cw
>> I think the leader imbalance means: For a single broker, add up all the
>> partitions it is leading (Y), and count the ones for which it's not the
>> preferred broker (X). The ratio of X:Y is the one being used.
>>
>> I have about 10 topics spread between the 3 brokers, each with 4 or 8
>> partitions. If I restart broker A, wait 5 min, then restart B,
>>leadership
>> ends up entirely on C (even though A was in ISR when B went down).
>>Nothing
>> triggers on it's own. Triggering it manually works (with
>> bin/kafka-preferred-replica-election.sh).
>>
>> Is there something I should be checking, or is there a downside to just
>> adding a cron job to trigger replica election once an hour?
>>
>> Thanks
>> Steve
>>
>>
>>
>>
>
>
>-- 
>Stephen Armstrong
>Senior Software Engineer
>Linqia, Inc - Matching Marketers with Storytellers
>
>www.linqia.com
>Like  | Follow
> | Blog 
>
>Email. stephen.armstr...@linqia.com
>
>The information in this e-mail and in any attachments is confidential and
>solely for the attention and use of the named addressee(s). You are hereby
>notified that any dissemination, distribution or copy of this
>communication
>is prohibited without the prior written consent of Linqia, Inc. If you
>have
>received this communication in error, please, notify the sender
>immediately
>by reply e-mail and delete this e-mail from your system as well as any
>files transmitted with it.



Re: Compression and batching

2015-05-13 Thread Jiangjie Qin
If you are sending in sync mode, producer will just group by partition the
list of messages you provided as argument of send() and send them out. You
don¹t need to worry about batch.num.messages.

There is a potential that compressed message is even bigger than
uncompressed message, though. I¹m not sure if this is the case you see
there if you are sure your message size is only 70KB while the limitation
is ~1MB.

Can you send the messages one by one use the batch send - each MessageSet
only has one message - and see which message caused the problem?

Jiangjie (Becket) Qin

On 5/13/15, 9:57 AM, "Jamie X"  wrote:

>(sorry if this messes up the mailing list, I didn't seem to get replies in
>my inbox)
>
>Jiangjie, I am indeed using the old producer, and on sync mode.
>
>> Notice that the old producer uses number of messages as batch limitation
>instead of number of bytes.
>
>Can you clarify this? I see a setting batch.num.messages but it is only
>for
>async and wouldn't affect sync mode.
>
>> But in your case, it seems you have a single message whose compressed
>size is larger than the max message size Kafka broker accepts. Any idea
>why?
>
>I don't think this is the case as my messages are at most 70kb when
>uncompressed. (I checked the message sizes it was trying to send)
>
>When you say
>> the list of message will be sent as a batch
>
>does that mean that the producer would group the messages by partition,
>and
>for each partition, it would batch all the messages for that partition
>together, regardless of whether it would exceed a size limit? If so that
>may explain things.
>
>Thanks,
>Jamie
>
>On Tue, May 12, 2015 at 4:40 PM, Jamie X  wrote:
>
>> Hi,
>>
>> I'm wondering when you call kafka.javaapi.Producer.send() with a list of
>> messages, and also have compression on (snappy in this case), how does
>>it
>> decide how many messages to put together as one?
>>
>> The reason I'm asking is that even though my messages are only 70kb
>> uncompressed, the broker complains that I'm hitting the 1mb message
>>limit
>> such as:
>>
>>
>> kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
>> which exceeds the maximum configured message size of 112.
>> at
>> 
>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378
>>)
>> at
>> 
>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361
>>)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at 
>>kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
>> at kafka.log.Log.append(Log.scala:257)
>> at
>> 
>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition
>>.scala:379)
>> at
>> 
>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition
>>.scala:365)
>> at kafka.utils.Utils$.inLock(Utils.scala:535)
>> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>> at
>> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>> at
>> 
>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:
>>291)
>> at
>> 
>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:
>>282)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
>>8)
>> at
>> 
>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
>>8)
>> at
>> 
>>scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226
>>)
>> at 
>>scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>
>> Thanks,
>> Jamie
>>



Re: New Producer Async - Metadata Fetch Timeout

2015-05-13 Thread Jiangjie Qin
Isn’t the producer part of the application? The metadata is stored in
memory. If the application rebooted (process restarted), all the metadata
will be gone.

Jiangjie (Becket) Qin

On 5/13/15, 9:54 AM, "Mohit Gupta"  wrote:

>I meant the producer. ( i.e. application using the producer api to push
>messages into kafka ) .
>
>On Wed, May 13, 2015 at 10:20 PM, Mayuresh Gharat <
>gharatmayures...@gmail.com> wrote:
>
>> By application rebooting, do you mean you bounce the brokers?
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Wed, May 13, 2015 at 4:06 AM, Mohit Gupta <
>> success.mohit.gu...@gmail.com>
>> wrote:
>>
>> > Thanks Jiangjie. This is helpful.
>> >
>> > Adding to what you have mentioned, I can think of one more scenario
>>which
>> > may not be very rare.
>> > Say, the application is rebooted and the Kafka brokers registered in
>>the
>> > producer are not reachable ( could be due to network issues or those
>> > brokers are actually down ).  Since, no metadata is available the send
>> will
>> > block. Right?
>> >
>> > On Wed, May 13, 2015 at 10:51 AM, Jiangjie Qin
>>> >
>> > wrote:
>> >
>> > >
>> > > Application will not block on each metadata refresh or metadata is
>> > > expired.
>> > > Application will only be blocked when
>> > > 1. It sends the first message to a topic (only for that single
>> message),
>> > or
>> > > 2. The topic has been deleted from broker thus refreshed metadata
>>loses
>> > > the topic info (which is pretty rare).
>> > >
>> > > So I think the async here might mean a little bit different. It
>>means
>> > when
>> > > you send first message to a topic, you wait till you know the topic
>> > exist,
>> > > after that point it is async.
>> > > It is very low chance that your application will block on send. If
>>it
>> is
>> > > then something probably really went wrong and needs immediate
>> attention.
>> > >
>> > > Thanks.
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On 5/12/15, 5:08 PM, "Rendy Bambang Junior"
>>
>> > > wrote:
>> > >
>> > > >Thank you for the clarification.
>> > > >
>> > > >I think I agree with Mohit. Sometime blocking on logging is not
>> > acceptable
>> > > >by nature of application who uses kafka.
>> > > >
>> > > >Yes it is not blocking when metadata is still available. But
>> application
>> > > >will be blocked once metada is expired.
>> > > >
>> > > >It might be handled by application, by implementing async call
>>when do
>> > > >send() and manage buffer and async timeout internally, but it makes
>> > async
>> > > >feature in kafka producer has less meaning.
>> > > >
>> > > >Sorry if my understanding is incorrect.
>> > > >
>> > > >Rendy
>> > > >On May 13, 2015 6:59 AM, "Jiangjie Qin" 
>> > > wrote:
>> > > >
>> > > >> Send() will only block if the metadata is *not available* for the
>> > topic.
>> > > >> It won't block if metadata there is stale. The metadata refresh
>>is
>> > async
>> > > >> to send(). However, if you send the message to a topic for the
>>first
>> > > >>time,
>> > > >> send() will trigger a metadata refresh and block until it has
>> metadata
>> > > >>for
>> > > >> that topic.
>> > > >>
>> > > >> Jiangjie (Becket) Qin
>> > > >>
>> > > >> On 5/12/15, 12:58 PM, "Magnus Edenhill" 
>>wrote:
>> > > >>
>> > > >> >I completely agree with Mohit, an application should not have to
>> know
>> > > >>or
>> > > >> >care about
>> > > >> >producer implementation internals.
>> > > >> >Given a message and its delivery constraints (produce retry
>>count
>> and
>> > > >> >timeout) the producer
>> > > >> >should hide any temporal failures until the message is
>>succesfully
>> > > >> >delivered, a permanent
>> > > >> >error is enc

Re: Getting NotLeaderForPartitionException in kafka broker

2015-05-13 Thread Jiangjie Qin
Does this topic exist in Zookeeper?

On 5/12/15, 11:35 PM, "tao xiao"  wrote:

>Hi,
>
>Any updates on this issue? I keep seeing this issue happening over and
>over
>again
>
>On Thu, May 7, 2015 at 7:28 PM, tao xiao  wrote:
>
>> Hi team,
>>
>> I have a 12 nodes cluster that has 800 topics and each of which has
>>only 1
>> partition. I observed that one of the node keeps generating
>> NotLeaderForPartitionException that causes the node to be unresponsive
>>to
>> all requests. Below is the exception
>>
>> [2015-05-07 04:16:01,014] ERROR [ReplicaFetcherThread-1-12], Error for
>> partition [topic1,0] to broker 12:class
>> kafka.common.NotLeaderForPartitionException
>> (kafka.server.ReplicaFetcherThread)
>>
>> All other nodes in the cluster generate lots of replication error too as
>> shown below due to unresponsiveness of above node.
>>
>> [2015-05-07 04:17:34,917] WARN [Replica Manager on Broker 1]: Fetch
>> request with correlation id 3630911 from client
>>ReplicaFetcherThread-0-1 on
>> partition [topic1,0] failed due to Leader not local for partition
>> [cg22_user.item_attr_info.lcr,0] on broker 1
>>(kafka.server.ReplicaManager)
>>
>> Any suggestion why the node runs into the unstable stage and any
>> configuration I can set to prevent this?
>>
>> I use kafka 0.8.2.1
>>
>> And here is the server.properties
>>
>>
>> broker.id=5
>> port=9092
>> num.network.threads=3
>> num.io.threads=8
>> socket.send.buffer.bytes=1048576
>> socket.receive.buffer.bytes=1048576
>> socket.request.max.bytes=104857600
>> log.dirs=/mnt/kafka
>> num.partitions=1
>> num.recovery.threads.per.data.dir=1
>> log.retention.hours=1
>> log.segment.bytes=1073741824
>> log.retention.check.interval.ms=30
>> log.cleaner.enable=false
>> zookeeper.connect=ip:2181
>> zookeeper.connection.timeout.ms=6000
>> unclean.leader.election.enable=false
>> delete.topic.enable=true
>> default.replication.factor=3
>> num.replica.fetchers=3
>> delete.topic.enable=true
>> kafka.metrics.reporters=report.KafkaMetricsCollector
>> straas.hubble.conf.file=/etc/kafka/report.conf
>>
>>
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
>
>-- 
>Regards,
>Tao



Re: Compression and batching

2015-05-12 Thread Jiangjie Qin
Mayuresh, this is about the old producer instead of the new Java producer.

Jamie,
In the old producer, if you use sync mode, the list of message will be
sent as a batch. On the other hand, if you are using async mode, the
messages are just put into the queue and batched with other messages.
Notice that the old producer uses number of messages as batch limitation
instead of number of bytes.

But in your case, it seems you have a single message whose compressed size
is larger than the max message size Kafka broker accepts. Any idea why?

Thanks.

Jiangjie (Becket) Qin


On 5/12/15, 9:11 PM, "Mayuresh Gharat"  wrote:

>Well, the batch size is decided by the value set for the property :
>
> "batch.size";
> "The producer will attempt to batch records together into fewer requests
>whenever multiple records are being sent to the same partition. This helps
>performance on both the client and the server. This configuration controls
>the  default batch size in bytes. No attempt will be made to batch records
>larger than this size. Requests sent to brokers will contain multiple
>batches, one for each partition with data available to be sent. A small
>batch size will make batching less common and may reduce throughput (a
>batch size of zero will disable batching entirely). A very large batch
>size
>may use memory a bit more wastefully as we will always allocate a buffer
>of
>the specified batch size in anticipation of additional records."
>
>Also it may happen that message size may increase due to compression which
>is kind of weird.
>
>Thanks,
>
>Mayuresh
>
>On Tue, May 12, 2015 at 4:40 PM, Jamie X  wrote:
>
>> Hi,
>>
>> I'm wondering when you call kafka.javaapi.Producer.send() with a list of
>> messages, and also have compression on (snappy in this case), how does
>>it
>> decide how many messages to put together as one?
>>
>> The reason I'm asking is that even though my messages are only 70kb
>> uncompressed, the broker complains that I'm hitting the 1mb message
>>limit
>> such as:
>>
>>
>> kafka.common.MessageSizeTooLargeException: Message size is 1035608 bytes
>> which exceeds the maximum configured message size of 112.
>> at
>> 
>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:378
>>)
>> at
>> 
>>kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:361
>>)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at 
>>kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:361)
>> at kafka.log.Log.append(Log.scala:257)
>> at
>>
>> 
>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition
>>.scala:379)
>> at
>>
>> 
>>kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition
>>.scala:365)
>> at kafka.utils.Utils$.inLock(Utils.scala:535)
>> at kafka.utils.Utils$.inReadLock(Utils.scala:541)
>> at
>> kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
>> at
>>
>> 
>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:
>>291)
>> at
>>
>> 
>>kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:
>>282)
>> at
>>
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>>
>> 
>>scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.sca
>>la:244)
>> at
>> 
>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
>>8)
>> at
>> 
>>scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:9
>>8)
>> at
>> 
>>scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226
>>)
>> at 
>>scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>>
>> Thanks,
>> Jamie
>>
>
>
>
>-- 
>-Regards,
>Mayuresh R. Gharat
>(862) 250-7125



Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Jiangjie Qin

Application will not block on each metadata refresh or metadata is
expired. 
Application will only be blocked when
1. It sends the first message to a topic (only for that single message), or
2. The topic has been deleted from broker thus refreshed metadata loses
the topic info (which is pretty rare).

So I think the async here might mean a little bit different. It means when
you send first message to a topic, you wait till you know the topic exist,
after that point it is async.
It is very low chance that your application will block on send. If it is
then something probably really went wrong and needs immediate attention.

Thanks.

Jiangjie (Becket) Qin

On 5/12/15, 5:08 PM, "Rendy Bambang Junior" 
wrote:

>Thank you for the clarification.
>
>I think I agree with Mohit. Sometime blocking on logging is not acceptable
>by nature of application who uses kafka.
>
>Yes it is not blocking when metadata is still available. But application
>will be blocked once metada is expired.
>
>It might be handled by application, by implementing async call when do
>send() and manage buffer and async timeout internally, but it makes async
>feature in kafka producer has less meaning.
>
>Sorry if my understanding is incorrect.
>
>Rendy
>On May 13, 2015 6:59 AM, "Jiangjie Qin"  wrote:
>
>> Send() will only block if the metadata is *not available* for the topic.
>> It won’t block if metadata there is stale. The metadata refresh is async
>> to send(). However, if you send the message to a topic for the first
>>time,
>> send() will trigger a metadata refresh and block until it has metadata
>>for
>> that topic.
>>
>> Jiangjie (Becket) Qin
>>
>> On 5/12/15, 12:58 PM, "Magnus Edenhill"  wrote:
>>
>> >I completely agree with Mohit, an application should not have to know
>>or
>> >care about
>> >producer implementation internals.
>> >Given a message and its delivery constraints (produce retry count and
>> >timeout) the producer
>> >should hide any temporal failures until the message is succesfully
>> >delivered, a permanent
>> >error is encountered or the constraints are hit.
>> >This should also include internal start up sequencing, such as metadata
>> >retrieval.
>> >
>> >
>> >
>> >2015-05-12 21:22 GMT+02:00 Mohit Gupta :
>> >
>> >> I could not follow the reasoning behind blocking the send method if
>>the
>> >> metadata is not up-to-date. Though, I see that it as per design, it
>> >> requires the metadata to batch the message into appropriate
>> >>topicPartition
>> >> queue. Also, if the metadata could not be updated in the specified
>> >> interval, it throws an exception and the message is not queued to be
>> >> retried once the brokers are up.
>> >>
>> >> Should it not be that messages are buffered in another queue ( up-to
>>a
>> >> limit ) if the brokers are down and retried later?
>> >> Is it not a general use case to require producer to be asynchronous
>>in
>> >>all
>> >> the scenarios?
>> >>
>> >>
>> >> On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
>> >> gharatmayures...@gmail.com> wrote:
>> >>
>> >> > The way it works I suppose is that, the producer will do
>> >>fetchMetadata,
>> >> if
>> >> > the last fetched metadata is stale (the refresh interval has
>>expired)
>> >>or
>> >> if
>> >> > it is not able to send data to a particular broker in its current
>> >> metadata
>> >> > (This might happen in some cases like if the leader moves).
>> >> >
>> >> > It cannot produce without having the right metadata.
>> >> >
>> >> > Thanks,
>> >> >
>> >> > Mayuresh
>> >> >
>> >> > On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin
>> >>> >> >
>> >> > wrote:
>> >> >
>> >> > > That¹s right. Send() will first try to get metadata of a topic,
>>that
>> >> is a
>> >> > > blocking operation.
>> >> > >
>> >> > > On 5/12/15, 2:48 AM, "Rendy Bambang Junior"
>> >>
>> >> > > wrote:
>> >> > >
>> >> > > >Hi, sorry if my understanding is incorrect.
>> >> > > >
>> >> > > >I am integrating kafka producer with application, when i try to
>> >> shutdown
>> >> > > >all kafka broker (preparing for prod env) I notice that 'send'
>> >>method
>> >> is
>> >> > > >blocking.
>> >> > > >
>> >> > > >Is new producer fetch metadata not async?
>> >> > > >
>> >> > > >Rendy
>> >> > >
>> >> > >
>> >> >
>> >> >
>> >> > --
>> >> > -Regards,
>> >> > Mayuresh R. Gharat
>> >> > (862) 250-7125
>> >> >
>> >>
>> >>
>> >>
>> >> --
>> >> Best Regards,
>> >>
>> >> Mohit Gupta
>> >>
>>
>>



Re: Could this be happening?

2015-05-12 Thread Jiangjie Qin
If you are using new Java producer, reorder could happen if
max.inflight.requests.per.connection is set to > 1 and retries are enabled
- which are both default settings.

Can you set max.in.flight.requests.per.connection to 1 and see if this
solve the issue?

Jiangjie (Becket) Qin

On 5/12/15, 12:57 PM, "Scott Chapman"  wrote:

>We are using the Java producer API (0.8.2.1 if I am not mistaken). We are
>using producer type of sync though.
>
>On Tue, May 12, 2015 at 3:50 PM Magnus Edenhill 
>wrote:
>
>> Hi Scott,
>>
>> what producer client are you using?
>>
>> Reordering is possible in async producers in the case of temporary
>>broker
>> failures
>> and the combination of request.required.acks != 0 and retries > 0.
>>
>> Consider the case where a producer has 20 messages in-flight to the
>>broker,
>> out of those
>> messages # 1-10 fails due to some temporary failure (?) on the broker
>>side,
>> but message # 11-20 are accepted.
>> When the producer receives error results from the broker for message #
>>1-10
>> it will try to resend
>> these 10 failed messages, that are now accepted, causing them to end up
>> after message #20 in the log - thus reordered.
>>
>> This failure scenario should be rather rare though.
>>
>>
>> Regards,
>> Magnus
>>
>> 2015-05-12 20:18 GMT+02:00 Scott Chapman :
>>
>> > We are basically using kafka as a transport mechanism for multi-line
>>log
>> > files.
>> >
>> > So, for this we are using single partition topics (with a replica for
>> good
>> > measure) writing to a multi-broker cluster.
>> >
>> > Our producer basically reads a file line-by-line (as it is being
>>written
>> > to) and publishes each line as a message to the topic. We are also
>> writing
>> > as quickly as we can (not waiting for ACK).
>> >
>> > What I am seeing is occasionally the messages in the topic appear to
>>be
>> > slightly out of order when compared to the source file they were based
>> on.
>> >
>> > I am wonder if this might happen when the producer switches brokers
>> because
>> > we are not waiting for the ACK before continuing to write.
>> >
>> > Does this make any sense??
>> >
>> > Thanks in advance!
>> >
>> > -Scott
>> >
>>



Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Jiangjie Qin
Send() will only block if the metadata is *not available* for the topic.
It won’t block if metadata there is stale. The metadata refresh is async
to send(). However, if you send the message to a topic for the first time,
send() will trigger a metadata refresh and block until it has metadata for
that topic.

Jiangjie (Becket) Qin

On 5/12/15, 12:58 PM, "Magnus Edenhill"  wrote:

>I completely agree with Mohit, an application should not have to know or
>care about
>producer implementation internals.
>Given a message and its delivery constraints (produce retry count and
>timeout) the producer
>should hide any temporal failures until the message is succesfully
>delivered, a permanent
>error is encountered or the constraints are hit.
>This should also include internal start up sequencing, such as metadata
>retrieval.
>
>
>
>2015-05-12 21:22 GMT+02:00 Mohit Gupta :
>
>> I could not follow the reasoning behind blocking the send method if the
>> metadata is not up-to-date. Though, I see that it as per design, it
>> requires the metadata to batch the message into appropriate
>>topicPartition
>> queue. Also, if the metadata could not be updated in the specified
>> interval, it throws an exception and the message is not queued to be
>> retried once the brokers are up.
>>
>> Should it not be that messages are buffered in another queue ( up-to a
>> limit ) if the brokers are down and retried later?
>> Is it not a general use case to require producer to be asynchronous in
>>all
>> the scenarios?
>>
>>
>> On Tue, May 12, 2015 at 10:54 PM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>>
>> > The way it works I suppose is that, the producer will do
>>fetchMetadata,
>> if
>> > the last fetched metadata is stale (the refresh interval has expired)
>>or
>> if
>> > it is not able to send data to a particular broker in its current
>> metadata
>> > (This might happen in some cases like if the leader moves).
>> >
>> > It cannot produce without having the right metadata.
>> >
>> > Thanks,
>> >
>> > Mayuresh
>> >
>> > On Tue, May 12, 2015 at 10:09 AM, Jiangjie Qin
>>> >
>> > wrote:
>> >
>> > > That¹s right. Send() will first try to get metadata of a topic, that
>> is a
>> > > blocking operation.
>> > >
>> > > On 5/12/15, 2:48 AM, "Rendy Bambang Junior"
>>
>> > > wrote:
>> > >
>> > > >Hi, sorry if my understanding is incorrect.
>> > > >
>> > > >I am integrating kafka producer with application, when i try to
>> shutdown
>> > > >all kafka broker (preparing for prod env) I notice that 'send'
>>method
>> is
>> > > >blocking.
>> > > >
>> > > >Is new producer fetch metadata not async?
>> > > >
>> > > >Rendy
>> > >
>> > >
>> >
>> >
>> > --
>> > -Regards,
>> > Mayuresh R. Gharat
>> > (862) 250-7125
>> >
>>
>>
>>
>> --
>> Best Regards,
>>
>> Mohit Gupta
>>



Re: New Producer Async - Metadata Fetch Timeout

2015-05-12 Thread Jiangjie Qin
That¹s right. Send() will first try to get metadata of a topic, that is a
blocking operation.

On 5/12/15, 2:48 AM, "Rendy Bambang Junior" 
wrote:

>Hi, sorry if my understanding is incorrect.
>
>I am integrating kafka producer with application, when i try to shutdown
>all kafka broker (preparing for prod env) I notice that 'send' method is
>blocking.
>
>Is new producer fetch metadata not async?
>
>Rendy



Re: New Java producer broker metadata update stuck

2015-05-08 Thread Jiangjie Qin
Dan,

Just to make sure I understand it correctly. What do you mean by different
ip -> broker mapping? Do you mean you changed your broker ip? We have
different mechanism in and producer to get the cluster information.
Consumer get all the information from Zookeeper while producer has to talk
to broker directly.
 
We have found some problem in the producer in current trunk and are fixing
it. Just want to see if your scenario indicates a new issue that we
haven¹t addressed.

Thanks.

Jiangjie (Becket) Qin

On 5/8/15, 9:48 AM, "Mayuresh Gharat"  wrote:

>It should do a updateMetadataRequest in case it gets NOT_LEADER_FOR
>PARTITION. This looks like a bug.
>
>Thanks,
>
>Mayuresh
>
>On Fri, May 8, 2015 at 8:53 AM, Dan  wrote:
>
>> Hi,
>>
>> We've noticed an issue on our staging environment where all 3 of our
>>Kafka
>> hosts shutdown and came back with a different ip -> broker id mapping. I
>> know this is not good and we're fixing that separately. But what we
>>noticed
>> is all the consumers recovered but the producers got stuck with the
>> following exceptions:
>>
>> WARN  2015-05-08 09:19:56,347
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544968 on topic-partition
>> samza-metrics-0, retrying (2145750068 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,448
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544970 on topic-partition
>> samza-metrics-0, retrying (2145750067 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,549
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544972 on topic-partition
>> samza-metrics-0, retrying (2145750066 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,649
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544974 on topic-partition
>> samza-metrics-0, retrying (2145750065 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,749
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544976 on topic-partition
>> samza-metrics-0, retrying (2145750064 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,850
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544978 on topic-partition
>> samza-metrics-0, retrying (2145750063 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:56,949
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544980 on topic-partition
>> samza-metrics-0, retrying (2145750062 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,049
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544982 on topic-partition
>> samza-metrics-0, retrying (2145750061 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,150
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544984 on topic-partition
>> samza-metrics-0, retrying (2145750060 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,254
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544986 on topic-partition
>> samza-metrics-0, retrying (2145750059 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,351
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544988 on topic-partition
>> samza-metrics-0, retrying (2145750058 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>> WARN  2015-05-08 09:19:57,454
>> org.apache.kafka.clients.producer.internals.Sender: Got error produce
>> response with correlation id 3544990 on topic-partition
>> samza-metrics-0, retrying (2145750057 attempts left). Error:
>> NOT_LEADER_FOR_PARTITION
>>
>>
>> So it appears as if the producer did not refresh the metadata once the
>> brokers had come back up. The exceptions carried on for a few hours
>>until
>> we restarted them.
>>
>> We noticed this in both 0.8.2.1 Java clients and via, Kakfa-rest
>> https://github.com/confluentinc/kafka-rest which is using 0.8.2.0-cp.
>>
>> Is this a known issue when all brokers go away, or is it a subtle bug
>>we've
>> hit?
>>
>> Thanks,
>> Dan
>>
>
>
>
>-- 
>-Regards,
>Mayuresh R. Gharat
>(862) 250-7125



Re: Kafka behind AWS ELB

2015-05-04 Thread Jiangjie Qin
Hi Dillian, 

Yeah, ELB + ASG will is pretty popular however might need further tricks
to use them for Kafka brokers.

As I replied to Chandrashekhar in another email. You can use ELB as
bootstrap.servers/metadata.broker.list to serve as client bootstrap use
case. But all the producing/consuming traffic still need to go to the
brokers directly. Another thing about ASG is that when new Kafka broker
comes up, it might need to copy a lot of data if its log is empty. That
might take a while. This might not be the case if you are using EBS. I
think Netflix has some practical experience of running Kafka on AWS with
dynamically changing number of brokers, but they had some tricks to manage
the metadata(broker id, etc) and log files.

Anyway, I would be surprised if ELB + ASG can be easily used for dynamic
provision of Kafka brokers. But I think it probably is a very helpful
improvement if we can do that.

Jiangjie (Becket) Qin

On 5/4/15, 1:52 PM, "Dillian Murphey"  wrote:

>I'm interested in this topic as well.  If you put kafka brokers inside an
>autoscaling group, then AWS will automatically add brokers if demand
>increases, and the ELB will automatically round-robin across all of your
>kafka instances.  So in your config files and code, you only need to
>provide a single DNS name (the load balancer). You don't need to specify
>all your kafka brokers inside your config file.  If a broker dies, the ELB
>will only route to healthy nodes.
>
>So you get a lot of robustness, scalability, and fault-tolerance by using
>the AWS services. Kafka Brokers will automatically load balance, but the
>question is whether it is ok to put all your brokers behind an ELB and
>expect the system to work properly.
>
>What alternatives are there to dynamic/scalable broker clusters?  I don't
>want to have to modify my config files or code if I add more brokers, and
>I
>want to be able to handle a broker going down. So these are the reasons
>AWS
>questions like this come up.
>
>Thanks for any comments too. :)
>
>
>
>
>On Mon, May 4, 2015 at 9:03 AM, Mayuresh Gharat
>
>wrote:
>
>> Ok. You can deploy kafka in AWS. You can have brokers on AWS servers.
>> Kafka is not a push system. So you will need someone writing to kafka
>>and
>> consuming from kafka. It will work. My suggestion will be to try it out
>>on
>> a smaller instance in AWS and see the effects.
>>
>> As I do not know the actual use case about why you want to use kafka
>>for, I
>> cannot comment on whether it will work for you personalized use case.
>>
>> Thanks,
>>
>> Mayuresh
>>
>> On Mon, May 4, 2015 at 8:55 AM, Chandrashekhar Kotekar <
>> shekhar.kote...@gmail.com> wrote:
>>
>> > I am sorry but I cannot reveal those details due to confidentiality
>> issues.
>> > I hope you understand.
>> >
>> >
>> > Regards,
>> > Chandrash3khar Kotekar
>> > Mobile - +91 8600011455
>> >
>> > On Mon, May 4, 2015 at 9:18 PM, Mayuresh Gharat <
>> > gharatmayures...@gmail.com>
>> > wrote:
>> >
>> > > Hi Chandrashekar,
>> > >
>> > > Can you please elaborate the use case for Kafka here, like how you
>>are
>> > > planning to use it.
>> > >
>> > >
>> > > Thanks,
>> > >
>> > > Mayuresh
>> > >
>> > > On Sat, May 2, 2015 at 9:08 PM, Chandrashekhar Kotekar <
>> > > shekhar.kote...@gmail.com> wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I am new to Apache Kafka. I have played with it on my laptop.
>> > > >
>> > > > I want to use Kafka in AWS. Currently we have tomcat web servers
>> based
>> > > REST
>> > > > API. We want to replace REST API with Apache Kafka, web servers
>>are
>> > > behind
>> > > > ELB.
>> > > >
>> > > > I would like to know if we can keep Kafka brokers behind ELB?
>>Will it
>> > > work?
>> > > >
>> > > > Regards,
>> > > > Chandrash3khar Kotekar
>> > > > Mobile - +91 8600011455
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > -Regards,
>> > > Mayuresh R. Gharat
>> > > (862) 250-7125
>> > >
>> >
>>
>>
>>
>> --
>> -Regards,
>> Mayuresh R. Gharat
>> (862) 250-7125
>>



Re: Kafka brokers behind AWS ELB

2015-05-03 Thread Jiangjie Qin
Yes.

On 5/3/15, 10:57 AM, "Chandrashekhar Kotekar" 
wrote:

>Thanks a lot for the information. So basically each broker needs to have
>his own public IP address?
>
>
>Regards,
>Chandrash3khar Kotekar
>Mobile - +91 8600011455
>
>On Sun, May 3, 2015 at 11:09 PM, Jiangjie Qin 
>wrote:
>
>> You can use ELB address as metadata broker list. But you still need to
>> allow direct traffic from clients to a brokers because clients need to
>> talk to a specific broker who has the interested partition.
>>
>> Jiangjie (Becket) Qin
>>
>> On 5/2/15, 11:10 AM, "Chandrashekhar Kotekar"
>>
>> wrote:
>>
>> >Hi,
>> >
>> >I am new to Apache Kafka. I have played with it on my laptop.
>> >
>> >I want to use Kafka in AWS. Currently we have tomcat web servers based
>> >REST
>> >API. We want to replace REST API with Apache Kafka, web servers are
>>behind
>> >ELB.
>> >
>> >I would like to know if we can keep Kafka brokers behind ELB? Will it
>> >work?
>> >
>> >Regards,
>> >Chandrash3khar Kotekar
>> >Mobile - +91 8600011455
>>
>>



Re: Kafka Cluster Issue

2015-05-03 Thread Jiangjie Qin
What do you mean by cluster mode with 3 Zookeeper and 3 Kafka brokers? Do
you mean 1 Zookeeper and 3 brokers?

On 5/2/15, 11:01 PM, "Kamal C"  wrote:

>Any comments on this issue?
>
>On Sat, May 2, 2015 at 9:16 AM, Kamal C  wrote:
>
>> Hi,
>> We are using Kafka_2.10-0.8.2.0, new Kafka producer and Kafka Simple
>> Consumer. In Standalone mode, 1 ZooKeeper and 1 Kafka we haven't faced
>>any
>> problems.
>>
>> In cluster mode, 3 ZooKeeper and 3 Kafka Brokers. We did some sanity
>> testing by bringing a Kafka node down then a random Producer starts to
>> throw Connect Exception continuously and tries to connect with the dead
>> node (not all producers).
>>
>> Is there any configuration available to avoid this exception ?
>>
>> Regards,
>> Kamal C
>>
>>
>>
>>
>>



Re: Kafka brokers behind AWS ELB

2015-05-03 Thread Jiangjie Qin
You can use ELB address as metadata broker list. But you still need to
allow direct traffic from clients to a brokers because clients need to
talk to a specific broker who has the interested partition.

Jiangjie (Becket) Qin

On 5/2/15, 11:10 AM, "Chandrashekhar Kotekar" 
wrote:

>Hi,
>
>I am new to Apache Kafka. I have played with it on my laptop.
>
>I want to use Kafka in AWS. Currently we have tomcat web servers based
>REST
>API. We want to replace REST API with Apache Kafka, web servers are behind
>ELB.
>
>I would like to know if we can keep Kafka brokers behind ELB? Will it
>work?
>
>Regards,
>Chandrash3khar Kotekar
>Mobile - +91 8600011455



Re: Data replication and zero data loss

2015-04-30 Thread Jiangjie Qin
Which mirror maker version did you look at? The MirrorMaker in trunk
should not have data loss if you just use the default setting.

On 4/30/15, 7:53 PM, "Joong Lee"  wrote:

>Hi,
>We are exploring Kafka to keep two data centers (primary and DR) running
>hosts of elastic search nodes in sync. One key requirement is that we
>can't lose any data. We POC'd use of MirrorMaker and felt it may not meet
>out data loss requirement.
>
>I would like ask the community if we should look for another solution or
>would Kafka be the right solution considering zero data loss requirement.
>
>Thanks



Re: New Producer API - batched sync mode support

2015-04-30 Thread Jiangjie Qin
Roshan,

If I understand correctly, you just want to make sure a number of messages
has been sent successfully. Using callback might be easier to do so.

Public class MyCallback implements Callback {
public Set failedSend;
@Override
Public void onCompletion(RecordMetadata metadata, Exception exception) {
If (exception != null)
failedSend.add(metadata);
}

Public boolean hasFailure() {return failedSend.size() > 0);
}

In main code, you just need to do the following:
{
MyCallback callback = new MyCallback();
For (ProducerRecord record: records)
Producer.send();

Producer.flush();
If (callback.hasFailure())
// do something
}

This will avoid the loop checking and provide you pretty much the same
guarantee as old producer if not better.

Jiangjie (Becket) Qin


On 4/30/15, 4:54 PM, "Roshan Naik"  wrote:

>@Gwen, @Ewen,
>  While atomicity of a batch is nice to have, it is not essential. I don't
>think users always expect such atomicity. Atomicity is not even guaranteed
>in many un-batched systems let alone batched systems.
>
>As long as the client gets informed about the ones that failed in the
>batch.. that would suffice.
>
>One issue with the current flush() based batch-sync implementation is that
>the client needs to iterate over *all* futures in order to scan for any
>failed messages. In the common case, it is just wasted CPU cycles as there
>won't be any failures. Would be ideal if the client is informed about only
>problematic messages.
>
>  IMO, adding a new send(batch) API may be meaningful if it can provide
>benefits beyond what user can do with a simple wrapper on existing stuff.
>For example: eliminate the CPU cycles wasted on examining results from
>successful message deliveries, or other efficiencies.
>
>
>
>@Ivan,
>   I am not certain, I am thinking that there is a possibility that the
>first few messages of the batch got accepted, but not the remainder ? At
>the same time based on some comments made earlier it appears underlying
>implementation does have an all-or-none mechanism for a batch going to a
>partition.
>For simplicity, streaming clients may not want to deal explicitly with
>partitions (and get exposed to repartitioning & leader change type issues)
>
>-roshan
>
>
>
>On 4/30/15 2:07 PM, "Gwen Shapira"  wrote:
>
>>Why do we think atomicity is expected, if the old API we are emulating
>>here
>>lacks atomicity?
>>
>>I don't remember emails to the mailing list saying: "I expected this
>>batch
>>to be atomic, but instead I got duplicates when retrying after a failed
>>batch send".
>>Maybe atomicity isn't as strong requirement as we believe? That is,
>>everyone expects some duplicates during failure events and handles them
>>downstream?
>>
>>
>>
>>On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov 
>>wrote:
>>
>>> 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava :
>>>
>>> > They aren't going to get this anyway (as Jay pointed out) given the
>>> current
>>> > broker implementation
>>> >
>>>
>>> Is it also incorrect to assume atomicity even if all messages in the
>>>batch
>>> go to the same partition?
>>>
>



Re: Kafka offset using kafka topic - not consuming messages

2015-04-29 Thread Jiangjie Qin
OK, so you turned off auto.offset.commit, and set the auto.offset.reset to
largest.

That means when you consume,
1. If you did not commit offsets manually, no offsets will be committed to
Kafka.
2. If you do not have an offset stored in Kafka, you will start from the
log end and ignore the existing messages in the topic.

Another thing you want to check is that are you using the group Id all the
time?

Jiangjie (Becket) Qin

On 4/29/15, 3:17 PM, "Gomathivinayagam Muthuvinayagam"
 wrote:

>I am using Kafka 0.8.2 and I am using Kafka based storage for offset.
>Whenever I restart a consumer (high level consumer api) it is not
>consuming
>messages whichever were posted when the consumer was down.
>
>I am using the following consumer properties
>
>Properties props = new Properties();
>
>props.put("zookeeper.connect", zooKeeper);
>
>props.put("group.id", consumerName);
>
>props.put("zookeeper.session.timeout.ms", "6000");
>
>props.put("zookeeper.sync.time.ms", "200");
>
>props.put("auto.commit.enable", "false");
>
>props.put("offsets.storage", "kafka");
>
>props.put("dual.commit.enabled", "false");
>
>props.put("auto.offset.reset", "largest");
>
>
>My offset manager is here
>https://gist.github.com/gomathi/0d63e29385017577ce3a. I am not sure why
>the
>consumer is behaving weird. Please share any updates if you have.
>
>
>
>Thanks & Regards,



Re: Kafka commit offset

2015-04-28 Thread Jiangjie Qin
Yes, if you set the offset storage to Kafka, high level consumer will be
using Kafka for all offset related operations.

Jiangjie (Becket) Qin

On 4/27/15, 7:03 PM, "Gomathivinayagam Muthuvinayagam"
 wrote:

>I am trying to commit offset request in a background thread. I am able to
>commit  it so far. I am using high level consumer api.
>
>So if I just use high level consumer api, and if I have disabled auto
>commit, with kafka as the storage for offsets, will the high level
>consumer
>api use automatically the offsets from kafka storage.
>
>If high level consumer api does not use the offsets automatically from
>kafka storage, how do I enforce that? Is that possible?
>
>The above things are not clear from kafka documentation.
>
>
>Thanks & Regards,



Re: New Java Producer: Single Producer vs multiple Producers

2015-04-27 Thread Jiangjie Qin
Hi Jay, 

Does o.a.k.clients.tools.ProducerPerformance provide multi-thread test? I
did not find it.

I tweaked the test a little bit to make it multi-threaded and what I found
is that in a single thread case, with each message of 10 bytes, single
caller thread has ~2M messages/second throughput. 2 threads gives ~1.2 M
messages/second each, 10 threads gives ~0.11M messages/second each. My
topic has 8 partitions.

It looks in my test, the bottleneck is actually the sender thread (not
network bandwidth) instead of number of caller threads because I'm sending
really small uncompressed messages. In this case, you do need more than
one producer.

>From what I understand, the main reasons we recommend to share a producer
are:
1. We have a per partition lock in the producer, so lock contention should
not be an issue assuming there are many partitions to send data to.
2. The caller threads usually are much slower than sender thread because
they have to do the compression.

So I guess in general cases sharing a producer would provide good
performance and save memory footprint. But if sender thread becomes a
bottleneck, it is time to have more producers.

Please correct me if I miss something.

Thanks.

Jiangjie (Becket) Qin

On 4/24/15, 3:23 PM, "Jay Kreps"  wrote:

>That should work. I recommend using the performance tool cited in the blog
>linked from the "performance" page of the website. That tool is more
>accurate and uses the new producer.
>
>On Fri, Apr 24, 2015 at 2:29 PM, Roshan Naik 
>wrote:
>
>> Can we use the new 0.8.2 producer perf tool against a 0.8.1 broker ?
>> -roshan
>>
>>
>> On 4/24/15 1:19 PM, "Jay Kreps"  wrote:
>>
>> >Do make sure if you are at all performance sensitive you are using the
>>new
>> >producer api we released in 0.8.2.
>> >
>> >-Jay
>> >
>> >On Fri, Apr 24, 2015 at 12:46 PM, Roshan Naik 
>> >wrote:
>> >
>> >> Yes, I too notice the same behavior (with producer/consumer perf
>>tool on
>> >> 8.1.2) Š adding more threads indeed improved the perf a lot (both
>>with
>> >>and
>> >> without --sync). in --sync mode
>> >>   batch size made almost no diff, larger events improved the perf.
>> >>
>> >> I was doing some 8.1.2 perf testing with a 1 node broker setup
>> >>(machine:
>> >> 32 cpu cores, 256gb RAM, 10gig ethernet, 1 x 15000rpm disks,).
>> >>
>> >> My observations:
>> >>
>> >>
>> >>
>> >> ASYNC MODE:
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> Partition Count: large improvement when going from 1 to 2, beyond 2
>>see
>> >>a
>> >> slight dip
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>   Number of producer threads: perf much better than sync mode with 1
>> >> thread, perf peaks out with ~10 threads, beyond 10 thds perf impacted
>> >> negatively
>> >>
>> >>
>> >>
>> >> SYNC MODE (does not seem to use batch size)
>> >> Batch Size: little to no impact
>> >> Event Size: perf scales linearly with event size
>> >> Number of producer threads: poor perf with one thread, improves with
>> >>more
>> >> threads,peaks around 30 to 50 threads
>> >> socket.send.buffer.bytes : increasing it Made a small but measurable
>> >> difference (~4%)
>> >>
>> >>
>> >> --SYNC mode was much slower.
>> >>
>> >>
>> >> I modified the producer perf tool to use the scala batched producer
>>api
>> >> (not available in v8.2) --sync mode and perf of --sync mode was
>>closer
>> >>to
>> >> async mode.
>> >>
>> >>
>> >> -roshan
>> >>
>> >>
>> >>
>> >> On 4/24/15 11:42 AM, "Navneet Gupta (Tech - BLR)"
>> >>  wrote:
>> >>
>> >> >Hi,
>> >> >
>> >> >I ran some tests on our cluster by sending message from multiple
>> >>clients
>> >> >(machines). Each machine had about 40-100 threads per producer.
>> >> >
>> >> >I thought of trying out having multiple producers per clients with
>>each
>> >> >producer receiving messages from say 10-15 threads. I actually did
>>see
>> >>an
>> >> >increase in throughput in this case. It was not one off cases but a
>> >> >repeatable phenomenon. I called threads to producer ratio
>> >>sharingFactor in
>> >> >my code.
>> >> >
>> >> >I am not planning to use it this way in our clients sending
>>messages to
>> >> >Kafka but it did go against the suggestion to have single producer
>> >>across
>> >> >all threads.
>> >> >
>> >> >
>> >> >
>> >> >On Fri, Apr 24, 2015 at 10:27 PM, Manikumar Reddy
>> >>
>> >> >wrote:
>> >> >
>> >> >> Hi Jay,
>> >> >>
>> >> >> Yes, we are producing from single process/jvm.
>> >> >>
>> >> >> From docs "The producer will attempt to batch records together
>>into
>> >> >>fewer
>> >> >> requests whenever multiple records are being sent to the same
>> >> >>partition."
>> >> >>
>> >> >> If I understand correctly, batching happens at topic/partition
>>level,
>> >> >>not
>> >> >> at Node level. right?
>> >> >>
>> >> >> If yes, then  both (single producer for all topics , separate
>> >>producer
>> >> >>for
>> >> >> each topic) approaches
>> >> >> may give similar performance.
>> >> >>
>> >> >> On Fri, Apr 24, 2015 at 9:29 PM, Jay Kreps 
>> >>wrote:
>> >> >>
>> >> >> > I

Re: Getting java.lang.IllegalMonitorStateException in mirror maker when building fetch request

2015-04-27 Thread Jiangjie Qin
Hi Tao,

KAFKA-2150 has been filed.

Jiangjie

On 4/24/15, 12:38 PM, "tao xiao"  wrote:

>Hi team,
>
>I observed java.lang.IllegalMonitorStateException thrown
>from AbstractFetcherThread in mirror maker when it is trying to build the
>fetchrequst. Below is the error
>
>[2015-04-23 16:16:02,049] ERROR
>[ConsumerFetcherThread-group_id_localhost-1429830778627-4519368f-0-7],
>Error due to  (kafka.consumer.ConsumerFetcherThread)
>
>java.lang.IllegalMonitorStateException
>
>at
>java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.jav
>a:155)
>
>at
>java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueu
>edSynchronizer.java:1260)
>
>at
>java.util.concurrent.locks.AbstractQueuedSynchronizer.fullyRelease(Abstrac
>tQueuedSynchronizer.java:1723)
>
>at
>java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awai
>t(AbstractQueuedSynchronizer.java:2166)
>
>at
>kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:95)
>
>at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>I believe this is due to partitionMapCond.await(fetchBackOffMs,
>TimeUnit.MILLISECONDS) being called while not lock is acquired.
>
>below code should fix the issue
>
>inLock(partitionMapLock) {
>partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
>}
>
>Should I file a jira ticket and submit the patch?
>
>I use the latest version of mirror maker in trunk.
>
>
>-- 
>Regards,
>Tao



Re: partition reassignment stuck

2015-04-21 Thread Jiangjie Qin
Hard to say, but if you have producers keeping producing data and they
work well then probably you don¹t need to.

On 4/21/15, 5:34 PM, "Wesley Chow"  wrote:

>There is only one broker that thinks it's the controller right now.  The
>double controller situation happened earlier this morning. Do the other
>brokers have to be bounced after the controller situation is fixed? I did
>not do that for all brokers.
>
>Wes
> On Apr 21, 2015 8:25 PM, "Jiangjie Qin" 
>wrote:
>
>>  Yes, should be broker 25 thread 0 from the log.
>> This needs to be resolved, you might need to bounce both of the brokers
>> who think itself as controller respectively. The new controller should
>>be
>> able to continue the partition reassignment.
>>
>>   From: Wes Chow 
>> Reply-To: "users@kafka.apache.org" 
>> Date: Tuesday, April 21, 2015 at 1:29 PM
>> To: "users@kafka.apache.org" 
>> Subject: Re: partition reassignment stuck
>>
>>
>> Quick clarification: you say broker 0, but do you actually mean broker
>>25?
>> 25 one of the replicas for the partition, is currently the one having
>> trouble getting into sync, and 28 is the leader for the partition.
>>
>> Unfortunately, the logs of rotated off so I can't get to what happened
>> around then. However there was a time period of a few hours where we had
>> two brokers that both believed they were controllers. I'm not sure why I
>> didn't think of this before.
>>
>> ZooKeeper data appears to be inconsistent at the moment.
>> /brokers/topics/click_engage says that partition 116's replica set is:
>>[4,
>> 7, 25]. /brokers/topics/click_engage/partitions/116/state says the
>>leader
>> is 28 and the ISR is [28, 15]. Does this need to be resolved, and if so
>>how?
>>
>> Thanks,
>> Wes
>>
>>   Jiangjie Qin 
>> April 21, 2015 at 2:24 PM
>>   This means that the broker 0 thought broker 28 was leader for that
>> partition but broker 28 has actually already received StopReplicaRequest
>> from controller and stopped serving as a replica for that partition.
>> This might happen transiently but broker 0 will be able to find the new
>> leader for the partition once it receive LeaderAndIsrRequest from
>> controller to update the new leader information. If these messages keep
>>got
>> logged for long time then there might be an issue.
>> Can you maybe check the timestamp around [2015-04-21 12:15:36,585] on
>> broker 28 to see if there is some error log. The error log might not
>>have
>> partition info included.
>>
>>   From: Wes Chow 
>> Reply-To: "users@kafka.apache.org" 
>> Date: Tuesday, April 21, 2015 at 10:50 AM
>> To: "users@kafka.apache.org" 
>> Subject: Re: partition reassignment stuck
>>
>>
>> Not for that particular partition, but I am seeing these errors on 28:
>>
>> kafka.common.NotAssignedReplicaException: Leader 28 failed to record
>> follower 25's position 0 for partition [click_engage,116] since the
>>replica
>> 25 is not recognized to be one of the assigned r
>> eplicas  for partition [click_engage,116]
>> at
>> 
>>kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:2
>>31)
>> at
>> 
>>kafka.server.ReplicaManager.recordFollowerPosition(ReplicaManager.scala:4
>>32)
>> at
>> 
>>kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.
>>scala:460)
>> at
>> 
>>kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.
>>scala:458)
>> at
>> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:176)
>> at
>> 
>>scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
>> at
>> 
>>scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
>> at
>> kafka.server.KafkaApis.maybeUpdatePartitionHw(KafkaApis.scala:458)
>> at 
>>kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:424)
>> at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
>> at
>> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> What does this mean?
>>
>> Thanks!
>> Wes
>>
>>
>> Wes Chow 
>> April 21, 2015 at 1:50 PM
>>
>> Not for that particular partition, but I am seeing these errors on 28:
>>
>&g

Re: partition reassignment stuck

2015-04-21 Thread Jiangjie Qin
Yes, should be broker 25 thread 0 from the log.
This needs to be resolved, you might need to bounce both of the brokers who 
think itself as controller respectively. The new controller should be able to 
continue the partition reassignment.

From: Wes Chow mailto:w...@chartbeat.com>>
Reply-To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Date: Tuesday, April 21, 2015 at 1:29 PM
To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: partition reassignment stuck


Quick clarification: you say broker 0, but do you actually mean broker 25? 25 
one of the replicas for the partition, is currently the one having trouble 
getting into sync, and 28 is the leader for the partition.

Unfortunately, the logs of rotated off so I can't get to what happened around 
then. However there was a time period of a few hours where we had two brokers 
that both believed they were controllers. I'm not sure why I didn't think of 
this before.

ZooKeeper data appears to be inconsistent at the moment. 
/brokers/topics/click_engage says that partition 116's replica set is: [4, 7, 
25]. /brokers/topics/click_engage/partitions/116/state says the leader is 28 
and the ISR is [28, 15]. Does this need to be resolved, and if so how?

Thanks,
Wes
[cid:part1.03010908.07060808@chartbeat.com]
Jiangjie Qin<mailto:j...@linkedin.com.INVALID>
April 21, 2015 at 2:24 PM
This means that the broker 0 thought broker 28 was leader for that partition 
but broker 28 has actually already received StopReplicaRequest from controller 
and stopped serving as a replica for that partition.
This might happen transiently but broker 0 will be able to find the new leader 
for the partition once it receive LeaderAndIsrRequest from controller to update 
the new leader information. If these messages keep got logged for long time 
then there might be an issue.
Can you maybe check the timestamp around [2015-04-21 12:15:36,585] on broker 28 
to see if there is some error log. The error log might not have partition info 
included.

From: Wes Chow mailto:w...@chartbeat.com>>
Reply-To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Date: Tuesday, April 21, 2015 at 10:50 AM
To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: partition reassignment stuck


Not for that particular partition, but I am seeing these errors on 28:

kafka.common.NotAssignedReplicaException: Leader 28 failed to record follower 
25's position 0 for partition [click_engage,116] since the replica 25 is not 
recognized to be one of the assigned r
eplicas  for partition [click_engage,116]
at 
kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:231)
at 
kafka.server.ReplicaManager.recordFollowerPosition(ReplicaManager.scala:432)
at 
kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:460)
at 
kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:458)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:176)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
at kafka.server.KafkaApis.maybeUpdatePartitionHw(KafkaApis.scala:458)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:424)
at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:745)

What does this mean?

Thanks!
Wes


[cid:part2.06060804.01090904@chartbeat.com]
Wes Chow<mailto:w...@chartbeat.com>
April 21, 2015 at 1:50 PM

Not for that particular partition, but I am seeing these errors on 28:

kafka.common.NotAssignedReplicaException: Leader 28 failed to record follower 
25's position 0 for partition [click_engage,116] since the replica 25 is not 
recognized to be one of the assigned r
eplicas  for partition [click_engage,116]
at 
kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:231)
at 
kafka.server.ReplicaManager.recordFollowerPosition(ReplicaManager.scala:432)
at 
kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:460)
at 
kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:458)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:176)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
at kafka.server.KafkaApis.maybeUpdatePartitionHw(KafkaApis.scala:458)

Re: Kafka Zookeeper queries

2015-04-21 Thread Jiangjie Qin
I might not be the best one to answer this question. It will be better if
someone has more operation experience can help out here.

Jiangjie (Becket) Qin

On 4/21/15, 12:00 PM, "Achanta Vamsi Subhash" 
wrote:

>@Qin
>Thanks for the inputs. Could you point us what we should monitor in
>zookeeper if we have 20k partitions (this might go ~1 lakh in the coming 6
>months)? We are not running zookeeper on SSDs and we were facing
>ZkConnectionLoss in HighLevelConsumer, sometimes for all the topics. What
>are the critical things to monitor/optimise wrt. zookeeper.
>
>@Pushkar
>Yes. We are using kafka for storing offsets instead of zookeeper.
>
>On Tue, Apr 21, 2015 at 11:55 PM, pushkar priyadarshi <
>priyadarshi.push...@gmail.com> wrote:
>
>> In my knowledge if you are using 0.8.2.1 which is latest stable you can
>> sync up your consumer offsets in kafka itself instead of Zk which
>>further
>> brings down write load on ZKs.
>>
>> Regards,
>> Pushkar
>>
>> On Tue, Apr 21, 2015 at 1:13 PM, Jiangjie Qin
>>
>> wrote:
>>
>> > 2 partitions should be OK.
>> >
>> > On 4/21/15, 12:33 AM, "Achanta Vamsi Subhash" <
>> achanta.va...@flipkart.com>
>> > wrote:
>> >
>> > >We are planning to have ~2 partitions. Will it be a bottleneck?
>> > >
>> > >On Mon, Apr 20, 2015 at 10:48 PM, Jiangjie Qin
>> > > >
>> > >wrote:
>> > >
>> > >> Producers usually do not query zookeeper at all.
>> > >> Consumers usually query zookeeper at beginning or rebalance. It is
>> > >> supposed to be in frequent if you don¹t have consumers come and go
>>all
>> > >>the
>> > >> time. One exception is that if you are using zookeeper based
>>consumer
>> > >> offset commit, it will commit offset to zookeeper frequently.
>> > >> In Kafka, the most heavily used mechanism for zookeeper is
>>zookeeper
>> > >> listener and they are not fired in a regular frequency.
>> > >>
>> > >> The limitation of Zookeeper usage for Kafka I am aware of is
>>probably
>> > >>the
>> > >> size of each zNode. As long as you don¹t have so many partitions
>>that
>> > >> zNode cannot handle, it should be fine.
>> > >>
>> > >> Thanks.
>> > >>
>> > >> Jiangjie (Becket) Qin
>> > >>
>> > >> On 4/20/15, 5:58 AM, "Achanta Vamsi Subhash"
>> > >>
>> > >> wrote:
>> > >>
>> > >> >Hi,
>> > >> >
>> > >> >Could anyone help with this?
>> > >> >
>> > >> >Thanks.
>> > >> >
>> > >> >On Sun, Apr 19, 2015 at 12:58 AM, Achanta Vamsi Subhash <
>> > >> >achanta.va...@flipkart.com> wrote:
>> > >> >
>> > >> >> Hi,
>> > >> >>
>> > >> >> How often does Kafka query zookeeper while producing and
>>consuming?
>> > >> >>
>> > >> >> Ex:
>> > >> >> If there is a single partition to which we produce and a
>>HighLevel
>> > >> >> consumer running on it, how many read/write queries to zookeeper
>> > >>happen.
>> > >> >>
>> > >> >> Extending further, multiple topics with ~100 partitions each,
>>how
>> > >>many
>> > >> >> zookeeper calls will be made (read/write).
>> > >> >>
>> > >> >> What is the max limit of no of partitions / kafka cluster that
>> > >>zookeeper
>> > >> >> can handle?
>> > >> >>
>> > >> >> --
>> > >> >> Regards
>> > >> >> Vamsi Subhash
>> > >> >>
>> > >> >
>> > >> >
>> > >> >
>> > >> >--
>> > >> >Regards
>> > >> >Vamsi Subhash
>> > >>
>> > >>
>> > >
>> > >
>> > >--
>> > >Regards
>> > >Vamsi Subhash
>> >
>> >
>>
>
>
>
>-- 
>Regards
>Vamsi Subhash



Re: partition reassignment stuck

2015-04-21 Thread Jiangjie Qin
This means that the broker 0 thought broker 28 was leader for that partition 
but broker 28 has actually already received StopReplicaRequest from controller 
and stopped serving as a replica for that partition.
This might happen transiently but broker 0 will be able to find the new leader 
for the partition once it receive LeaderAndIsrRequest from controller to update 
the new leader information. If these messages keep got logged for long time 
then there might be an issue.
Can you maybe check the timestamp around [2015-04-21 12:15:36,585] on broker 28 
to see if there is some error log. The error log might not have partition info 
included.

From: Wes Chow mailto:w...@chartbeat.com>>
Reply-To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Date: Tuesday, April 21, 2015 at 10:50 AM
To: "users@kafka.apache.org<mailto:users@kafka.apache.org>" 
mailto:users@kafka.apache.org>>
Subject: Re: partition reassignment stuck


Not for that particular partition, but I am seeing these errors on 28:

kafka.common.NotAssignedReplicaException: Leader 28 failed to record follower 
25's position 0 for partition [click_engage,116] since the replica 25 is not 
recognized to be one of the assigned r
eplicas  for partition [click_engage,116]
at 
kafka.cluster.Partition.updateLeaderHWAndMaybeExpandIsr(Partition.scala:231)
at 
kafka.server.ReplicaManager.recordFollowerPosition(ReplicaManager.scala:432)
at 
kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:460)
at 
kafka.server.KafkaApis$$anonfun$maybeUpdatePartitionHw$2.apply(KafkaApis.scala:458)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:176)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:345)
at kafka.server.KafkaApis.maybeUpdatePartitionHw(KafkaApis.scala:458)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:424)
at kafka.server.KafkaApis.handle(KafkaApis.scala:186)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:745)

What does this mean?

Thanks!
Wes


[cid:part1.08040305.06010608@chartbeat.com]
Jiangjie Qin<mailto:j...@linkedin.com.INVALID>
April 21, 2015 at 1:19 PM
Those .index files are for different partitions and
they should be generated if new replicas is assigned to the broker.
We might want to know what caused the UnknownException. Did you see any
error log on broker 28?

Jiangjie (Becket) Qin


[cid:part2.02070705.06050804@chartbeat.com]
Wes Chow<mailto:w...@chartbeat.com>
April 21, 2015 at 12:16 PM
I started a partition reassignment (this is a 8.1.1 cluster) some time ago and 
it seems to be stuck. Partitions are no longer getting moved around, but it 
seems like the cluster is operational otherwise. The stuck nodes have a lot of 
.index files, and their logs show errors like:

[2015-04-21 12:15:36,585] 3237789 [ReplicaFetcherThread-0-28] ERROR 
kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-28], Error for 
partition [pings,227] to broker 28:class kafka.common.UnknownException

I'm at a loss as to what I should be looking at. Any ideas?

Thanks,
Wes



Re: partition reassignment stuck

2015-04-21 Thread Jiangjie Qin

Those .index files are for different partitions and
they should be generated if new replicas is assigned to the broker.
We might want to know what caused the UnknownException. Did you see any
error log on broker 28?

Jiangjie (Becket) Qin

On 4/21/15, 9:16 AM, "Wes Chow"  wrote:

>I started a partition reassignment (this is a 8.1.1 cluster) some time
>ago and it seems to be stuck. Partitions are no longer getting moved
>around, but it seems like the cluster is operational otherwise. The
>stuck nodes have a lot of .index files, and their
>logs show errors like:
>
>[2015-04-21 12:15:36,585] 3237789 [ReplicaFetcherThread-0-28] ERROR
>kafka.server.ReplicaFetcherThread  - [ReplicaFetcherThread-0-28], Error
>for partition [pings,227] to broker 28:class kafka.common.UnknownException
>
>I'm at a loss as to what I should be looking at. Any ideas?
>
>Thanks,
>Wes
>



Re: Kafka Zookeeper queries

2015-04-21 Thread Jiangjie Qin
2 partitions should be OK.

On 4/21/15, 12:33 AM, "Achanta Vamsi Subhash" 
wrote:

>We are planning to have ~2 partitions. Will it be a bottleneck?
>
>On Mon, Apr 20, 2015 at 10:48 PM, Jiangjie Qin 
>wrote:
>
>> Producers usually do not query zookeeper at all.
>> Consumers usually query zookeeper at beginning or rebalance. It is
>> supposed to be in frequent if you don¹t have consumers come and go all
>>the
>> time. One exception is that if you are using zookeeper based consumer
>> offset commit, it will commit offset to zookeeper frequently.
>> In Kafka, the most heavily used mechanism for zookeeper is zookeeper
>> listener and they are not fired in a regular frequency.
>>
>> The limitation of Zookeeper usage for Kafka I am aware of is probably
>>the
>> size of each zNode. As long as you don¹t have so many partitions that
>> zNode cannot handle, it should be fine.
>>
>> Thanks.
>>
>> Jiangjie (Becket) Qin
>>
>> On 4/20/15, 5:58 AM, "Achanta Vamsi Subhash"
>>
>> wrote:
>>
>> >Hi,
>> >
>> >Could anyone help with this?
>> >
>> >Thanks.
>> >
>> >On Sun, Apr 19, 2015 at 12:58 AM, Achanta Vamsi Subhash <
>> >achanta.va...@flipkart.com> wrote:
>> >
>> >> Hi,
>> >>
>> >> How often does Kafka query zookeeper while producing and consuming?
>> >>
>> >> Ex:
>> >> If there is a single partition to which we produce and a HighLevel
>> >> consumer running on it, how many read/write queries to zookeeper
>>happen.
>> >>
>> >> Extending further, multiple topics with ~100 partitions each, how
>>many
>> >> zookeeper calls will be made (read/write).
>> >>
>> >> What is the max limit of no of partitions / kafka cluster that
>>zookeeper
>> >> can handle?
>> >>
>> >> --
>> >> Regards
>> >> Vamsi Subhash
>> >>
>> >
>> >
>> >
>> >--
>> >Regards
>> >Vamsi Subhash
>>
>>
>
>
>-- 
>Regards
>Vamsi Subhash



Re: Kafka Zookeeper queries

2015-04-20 Thread Jiangjie Qin
Producers usually do not query zookeeper at all.
Consumers usually query zookeeper at beginning or rebalance. It is
supposed to be in frequent if you don¹t have consumers come and go all the
time. One exception is that if you are using zookeeper based consumer
offset commit, it will commit offset to zookeeper frequently.
In Kafka, the most heavily used mechanism for zookeeper is zookeeper
listener and they are not fired in a regular frequency.

The limitation of Zookeeper usage for Kafka I am aware of is probably the
size of each zNode. As long as you don¹t have so many partitions that
zNode cannot handle, it should be fine.

Thanks.

Jiangjie (Becket) Qin

On 4/20/15, 5:58 AM, "Achanta Vamsi Subhash" 
wrote:

>Hi,
>
>Could anyone help with this?
>
>Thanks.
>
>On Sun, Apr 19, 2015 at 12:58 AM, Achanta Vamsi Subhash <
>achanta.va...@flipkart.com> wrote:
>
>> Hi,
>>
>> How often does Kafka query zookeeper while producing and consuming?
>>
>> Ex:
>> If there is a single partition to which we produce and a HighLevel
>> consumer running on it, how many read/write queries to zookeeper happen.
>>
>> Extending further, multiple topics with ~100 partitions each, how many
>> zookeeper calls will be made (read/write).
>>
>> What is the max limit of no of partitions / kafka cluster that zookeeper
>> can handle?
>>
>> --
>> Regards
>> Vamsi Subhash
>>
>
>
>
>-- 
>Regards
>Vamsi Subhash



Re: ReplicaFetcherThread Error, Massive Logging, and Leader Flapping

2015-04-16 Thread Jiangjie Qin
It seems there are many different symptoms you see...
Maybe we can start from leader flapping issue. Any findings in controller
log?

Jiangjie (Becket) Qin
 


On 4/16/15, 12:09 PM, "Kyle Banker"  wrote:

>Hi,
>
>I've run into a pretty serious production issue with Kafka 0.8.2, and I'm
>wondering what my options are.
>
>
>ReplicaFetcherThread Error
>
>I have a broker on a 9-node cluster that went down for a couple of hours.
>When it came back up, it started spewing constant errors of the following
>form:
>
>INFO Reconnect due to socket error:
>java.nio.channels.ClosedChannelException (kafka.consumer.SimpleConsumer)
>[2015-04-09 22:38:54,580] WARN [ReplicaFetcherThread-0-7], Error in fetch
>Name: FetchRequest; Version: 0; CorrelationId: 767; ClientId:
>ReplicaFetcherThread-0-7; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1
>bytes;
>RequestInfo: [REDACTED] Possible cause: java.io.EOFException: Received -1
>when reading from channel, socket has likely been closed.
>(kafka.server.ReplicaFetcherThread)
>
>
>Massive Logging
>
>This produced around 300GB of new logs in a 24-hour period and rendered
>the
>broker completely unresponsive.
>
>This broker hosts about 500 partitions spanning 40 or so topics (all
>topics
>have a replication factor of 3). One topic contains messages up to 100MB
>in
>size. The remaining topics have messages no larger than 10MB.
>
>It appears that I've hit this bug:
>https://issues.apache.org/jira/browse/KAFKA-1196
>
>
>"Leader Flapping"
>
>I can get the broker to come online without logging massively by reducing
>both max.message.bytes and replica.fetch.max.bytes to ~10MB. It then
>starts
>resyncing all but the largest topic.
>
>Unfortunately, it also starts "leader flapping." That is, it continuously
>acquires and relinquishes partition leadership. There is nothing of note
>in
>the logs while this is happening, but the consumer offset checker clearly
>shows this. The behavior significantly reduces cluster write throughput
>(since producers are constantly failing).
>
>The only solution I have is to leave the broker off. Is this a known
>"catch-22" situation? Is there anything that can be done to fix it?
>
>Many thanks in advance.



Re: Fetch Request Purgatory and Mirrormaker

2015-04-14 Thread Jiangjie Qin
Hey Evan,

Is this issue only observed when mirror maker is consuming? It looks that
for Cluster A you have some other consumers.
Do you mean if you stop mirror maker the problem goes away?

Jiangjie (Becket) Qin

On 4/14/15, 6:55 AM, "Evan Huus"  wrote:

>Any ideas on this? It's still occurring...
>
>Is there a separate mailing list or project for mirrormaker that I could
>ask?
>
>Thanks,
>Evan
>
>On Thu, Apr 9, 2015 at 4:36 PM, Evan Huus  wrote:
>
>> Hey Folks, we're running into an odd issue with mirrormaker and the
>>fetch
>> request purgatory on the brokers. Our setup consists of two six-node
>> clusters (all running 0.8.2.1 on identical hw with the same config). All
>> "normal" producing and consuming happens on cluster A. Mirrormaker has
>>been
>> set up to copy all topics (except a tiny blacklist) from cluster A to
>> cluster B.
>>
>> Cluster A is completely healthy at the moment. Cluster B is not, which
>>is
>> very odd since it is literally handling the exact same traffic.
>>
>> The graph for Fetch Request Purgatory Size looks like this:
>> 
>>https://www.dropbox.com/s/k87wyhzo40h8gnk/Screenshot%202015-04-09%2016.08
>>.37.png?dl=0
>>
>> Every time the purgatory shrinks, the latency from that causes one or
>>more
>> nodes to drop their leadership (it quickly recovers). We could probably
>> alleviate the symptoms by decreasing
>> `fetch.purgatory.purge.interval.requests` (it is currently at the
>>default
>> value) but I'd rather try and understand/solve the root cause here.
>>
>> Cluster B is handling no outside fetch requests, and turning mirrormaker
>> off "fixes" the problem, so clearly (since mirrormaker is producing to
>>this
>> cluster not consuming from it) the fetch requests must be coming from
>> internal replication. However, the same data is being replicated when
>>it is
>> originally produced in cluster A, and the fetch purgatory size sits
>>stably
>> at ~10k there. There is nothing unusual in the logs on either cluster.
>>
>> I have read all the wiki pages and jira tickets I can find about the new
>> purgatory design in 0.8.2 but nothing stands out as applicable. I'm
>>happy
>> to provide more detailed logs, configuration, etc. if anyone thinks
>>there
>> might be something important in there. I am completely baffled as to
>>what
>> could be causing this.
>>
>> Any suggestions would be appreciated. I'm starting to think at this
>>point
>> that we've completely misunderstood or misconfigured *something*.
>>
>> Thanks,
>> Evan
>>



Re: Consumer offsets in offsets topic 0.8.2

2015-04-13 Thread Jiangjie Qin
Yeah, the current ConsumerOffsetChecker has this issue (maybe bug also) if
the offset storage is Kafka and no offset has been committed. It will
throw ZK exception, which is very confusing. KAFKA-1951 was opened for
this but was not checked in.

Thanks.

Jiangjie (Becket) Qin

On 4/13/15, 9:55 AM, "4mayank" <4may...@gmail.com> wrote:

>I did a similar change - moved from High Level Consumer to Simple
>Consumer.
>Howerver kafka-consumer-offset-checker.sh throws an exception. Its
>searching the zk path /consumers// which does not exist on any of
>my
>zk nodes.
>
>Is there any other tool for getting the offset lag when using Simple
>Consumer? Or am I using kafka-consumer-offset-checker.sh incorrectly for
>Simple Consumer?
>
>Output:
>kafka-consumer-offset-checker.sh --zookeeper 192.168.1.201:2181,
>192.168.1.202:2181,192.168.1.203:2181 --group my-control-group
>Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>KeeperErrorCode = NoNode for /consumers/my-control-group/owners.
>
>
>kafka-consumer-offset-checker.sh --zookeeper 192.168.1.201:2181,
>192.168.1.202:2181,192.168.1.203:2181 --group my-control-group --topic
>my-control
>Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>KeeperErrorCode = NoNode for
>/consumers/my-control-group/offsets/my-control/1.
>
>
>ZK cli output:
>
>./zookeeper-shell.sh 192.168.1.201:2181
>Connecting to 192.168.1.201:2181
>Welcome to ZooKeeper!
>JLine support is disabled
>
>WATCHER::
>
>WatchedEvent state:SyncConnected type:None path:null
>ls /config/topics
>[my-control]
>ls /consumers
>[]
>quit
>
>
>Thanks.
>Mayank.
>
>On Fri, Mar 20, 2015 at 9:54 AM, Jiangjie Qin 
>wrote:
>
>> Hi Vamsi,
>>
>> The ConsumerOffsetChecker.scala or kafka-consumer-offset-checker.sh
>>still
>> works. You can use them to check the offsets.
>> If you need to check the offsets programmatically, you can send
>> OffsetsFetcheRequest to broker using simple consumer. You may refer the
>> ConsumerOffsetChecker.scala to see how to find correct broker where
>> corresponding offset manager resides.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/19/15, 11:54 PM, "Achanta Vamsi Subhash"
>>
>> wrote:
>>
>> >Hi,
>> >
>> >We are using 0.8.2.1 currently.
>> >
>> >- How to get the consumer offsets from the offsets topic?
>> >​- Is there any built-in function which I could use? (like in
>> >AdminUtils.scala)
>> >- Is it ok to start a simple consumer and read the offsets from the
>>topic?
>> >
>> >​We used to read the offsets from zookeeper previously for the
>> >HighLevelConsumers. But with the new broker this changed as we are
>>using
>> >Kafka topic for offsets.​
>> >
>> >--
>> >Regards
>> >Vamsi Subhash
>>
>>



Re: Topic to broker assignment

2015-04-13 Thread Jiangjie Qin
A quick reference.
http://www.slideshare.net/junrao/kafka-replication-apachecon2013


On 4/12/15, 11:36 PM, "Bill Hastings"  wrote:

>Hi Guys
>
>How do topics get assigned to brokers? I mean if I were to create a topic
>X
>and publish to it how does Kafka assign the topic and the message to a
>particular broker? If I have create a topic with say 10 partitions how
>does
>kafka assign each partition to a different broker?
>
>-- 
>Cheers
>Bill



Re: question about Kafka

2015-04-07 Thread Jiangjie Qin
Yes, you might need to write some code to read from the log end and send
it to Kafka using Kafka’s producer.

On 4/6/15, 2:39 PM, "Sun, Joey"  wrote:

>Thanks for your info, Becket.
>
>Does it mean I should program for it? is there any other app can
>gracefully glue access_log to Kafka's producer?
>
>Thanks
>Liang
>
>-----Original Message-
>From: Jiangjie Qin [mailto:j...@linkedin.com.INVALID]
>Sent: Monday, April 06, 2015 11:46 AM
>To: users@kafka.apache.org
>Subject: Re: question about Kafka
>
>Hey Liang,
>
>Have you looked at the quick start here:
>https://kafka.apache.org/documentation.html#quickstart
>
>In Kafka, on the producer side, there is no concept of ³commit². If you
>are producing using KafkaProducer, you can do a send.get(), this is a
>synchronized send so if no exception was thrown, the message has been
>successfully produced to Kafka.
>
>Jiangjie (Becket) Qin
>
>On 4/3/15, 3:38 PM, "Sun, Joey"  wrote:
>
>>Hello, group
>>
>>I am a newbie to Kafka. I am researching on how to commit a new
>>appended log message (e.g. apache access log) to Kafka. Could you
>>please share some ideas/solutions?
>>
>>Thanks
>>Liang
>



Re: question about Kafka

2015-04-06 Thread Jiangjie Qin
Also if you are using Kafka from the latest trunk, KafkaProducer has a
flush() interface that you may call. This will ensure all the message
previously sent from send() methods are sent to Kafka server.

On 4/3/15, 3:38 PM, "Sun, Joey"  wrote:

>Hello, group
>
>I am a newbie to Kafka. I am researching on how to commit a new appended
>log message (e.g. apache access log) to Kafka. Could you please share
>some ideas/solutions?
>
>Thanks
>Liang



Re: question about Kafka

2015-04-06 Thread Jiangjie Qin
Hey Liang,

Have you looked at the quick start here:
https://kafka.apache.org/documentation.html#quickstart

In Kafka, on the producer side, there is no concept of ³commit². If you
are producing using KafkaProducer, you can do a send.get(), this is a
synchronized send so if no exception was thrown, the message has been
successfully produced to Kafka.

Jiangjie (Becket) Qin

On 4/3/15, 3:38 PM, "Sun, Joey"  wrote:

>Hello, group
>
>I am a newbie to Kafka. I am researching on how to commit a new appended
>log message (e.g. apache access log) to Kafka. Could you please share
>some ideas/solutions?
>
>Thanks
>Liang



Re: Problem with node after restart no partitions?

2015-04-03 Thread Jiangjie Qin
This sounds a very serious issueŠ Could you provide the controller log and
the log for the first broker on which you tried controlled shutdown and
upgrade?

On 4/3/15, 8:57 AM, "Jason Rosenberg"  wrote:

>I'm preparing a longer post here, but we recently ran into a similar
>scenario.  Not sure yet if it's the same thing you saw (but it feels
>similar).  We were also doing a rolling upgrade from 0.8.1.1 to 0.8.2.1,
>and during the controlled shutdown of the first node (of a 4 node
>cluster),
>the controlled shutdown was taking longer than normal (it timed out
>several
>times and was retrying controlled shutdown), and unfortunately, our
>deployment system decided to kill it hard (so it was in the middle of it's
>4th controlled shutdown retry, etc.).
>
>Anyway, when the node came back, it naturally decided to 'restore' most of
>it's partitions, which took some time (but only like 5 minutes).  What's
>weird is it didn't decide to resync data from other replicas, instead it
>just restored partitions locally.  During this time, the rest of the
>cluster failed to elect any new leaders, and so for 5 minutes, those
>partitions were unavailable (and we saw a flood of failed FetcherManager
>exceptions from the other nodes in the cluster).  Most of the partitions
>were empty (e.g. there's no way the other replicas were behind and not in
>the ISR normally).  During this 5 minutes, producers were unable to send
>messages due to NotLeaderForPartition exceptions.  Apparently the
>controller was still sending them to the unavailable broker.
>
>Finally, when the first node finally came up, the other nodes were
>somewhat
>happy again (but a few partitions remained under-replicated indefinitely).
>Because of this, we decided to pause the rolling restart, and try to wait
>for the under-replicated partitions to get insync.  Unfortunately, about
>an
>hour later, the whole cluster went foobar (e.g. partitions became
>unavailable, brokers logged a flood of Fetcher errors, producers couldn't
>find a valid leader, metadata requests timed out, etc.).  In a panic, we
>reverted that first node back to 0.8.1.1. This did not help,
>unfortunately,
>so, deciding we'd already probably lost data at this point (and producers
>could not send data due to (NotLeaderForPartition exceptions)), we decided
>to just forcibly do the upgrade to 0.8.2.1.  This was all a bad situation,
>of course.
>
>So, now we have the cluster stable at 0.8.2.1, but like you, we are very,
>very nervous about doing any kind of restart to any of our nodes.  We lost
>data, primarily in the form of producers failing to send during the
>periods
>of unavailability.
>
>It looks like the root cause, in our case, was a flood of topics created
>(long-since unused and empty).  This appears to have caused the longer
>than
>normal controlled shutdown, which in turn, led to the followon problems.
>However, in the past, we've seen a controlled shutdown failure result in
>an
>unclean shutdown, but usually the cluster recovers (e.g. it elects new
>leaders, and when the new node comes back, it recovers it's partitions
>that
>were uncleanly shutdown).  That did not happen this time (the rest of the
>cluster got in an apparent infinite loop where it tried repeatedly (e.g.
>500K times a minute) to fetch partitions that were unavailable).
>
>I'm preparing a longer post with more detail (will take a bit of time).
>
>Jason
>
>On Thu, Apr 2, 2015 at 10:19 PM, Gwen Shapira 
>wrote:
>
>> wow, thats scary for sure.
>>
>> Just to be clear - all you did is restart *one* broker in the cluster?
>> everything else was ok before the restart? and that was controlled
>> shutdown?
>>
>> Gwen
>>
>> On Wed, Apr 1, 2015 at 11:54 AM, Thunder Stumpges 
>> wrote:
>>
>> > Well it appears we lost all the data on the one node again. It
>>appears to
>> > be all or part of KAFKA-1647<
>> > https://issues.apache.org/jira/browse/KAFKA-1647> as we saw this in
>>our
>> > logs (for all topics):
>> >
>> > [2015-04-01 10:46:58,901] WARN Partition [logactivity-redirect,3] on
>> > broker 6: No checkpointed highwatermark is found for partition
>> > [logactivity-redirect,3] (kafka.cluster.Partition)
>> > [2015-04-01 10:46:58,902] WARN Partition [pageimpression,1] on broker
>>6:
>> > No checkpointed highwatermark is found for partition
>>[pageimpression,1]
>> > (kafka.cluster.Partition)
>> > [2015-04-01 10:46:58,904] WARN Partition [campaignplatformtarget,6] on
>> > broker 6: No checkpointed highwatermark is found for partition
>> > [campaignplatformtarget,6] (kafka.cluster.Partition)
>> > [2015-04-01 10:46:58,905] WARN Partition [trackingtags-c2,1] on
>>broker 6:
>> > No checkpointed highwatermark is found for partition
>>[trackingtags-c2,1]
>> > (kafka.cluster.Partition)
>> >
>> > Followed by:
>> >
>> > [2015-04-01 10:46:58,911] INFO Truncating log trafficshaperlog-3 to
>> offset
>> > 0. (kafka.log.Log)
>> > [2015-04-01 10:46:58,928] INFO Truncating log videorecrequest-0 to
>>offset
>> > 0. (kafka.log.Log)
>> > [2015-04-01 10:

Re: delete.retention.ms in 0.8.1

2015-04-03 Thread Jiangjie Qin
Another thing is that the active log segment would not be deleted, so if
there is only one log segment, it will not honor the retention policy. You
may config log.roll.ms to make sure you have a rolled over log segment.

On 4/3/15, 5:20 AM, "Madhukar Bharti"  wrote:

>Hi Gaurav,
>
>What is your "log.retention.check.interval.ms" ? There might be a chance
>it
>will be high so it is not able  to delete in specified interval.
>
>And also in Kafka 0.8.1 it will be "retention.ms". Please check this
>
>
>Regards,
>Madhukar
>
>On Fri, Apr 3, 2015 at 5:01 PM, Gaurav Agarwal 
>wrote:
>
>> hello group,
>> I have created a topic with the delete retention ms time 1000 and send
>> and consume message across. Nothing happened after that , i checked
>> the log also , message also not deleted as well. Please help me to
>> come to know what is the need
>>



Re: Which version works for kafka 0.8.2 as consumer?

2015-04-01 Thread Jiangjie Qin
Yes, KafkaConsumer in 0.8.2 is still in development. You probably still
want to use ZookeeperConsumerConnector for now.

On 4/1/15, 9:28 AM, "Mark Zang"  wrote:

>I found the 0.8.2.0 and 0.8.2.1 has a KafkaConsumer. But this class seems
>not completed and not functional. Lots of method returns null or throws
>NSM. Which version of consumer for kafka 0.8.2 broker?
>
>Thanks!
>
>-- 
>Best regards!
>Mike Zang



Re: Can Mirroring Preserve Every Topic's Partition?

2015-03-31 Thread Jiangjie Qin
The Mirror maker in trunk now supports mirroring with preserved partition.
You can wire in a message handler to assign partitions for each producer
record before handing them to producer.

Jiangjie (Becket) Qin

On 3/31/15, 3:41 AM, "Ivan Balashov"  wrote:

>Alex,
>
>Just wondering, did you have any success patching and running MM with
>exact
>partitioning support?
>If so, could you possibly share the patch and, as I hope, your positive
>experience with the process?
>
>Thanks!



Re: Async producer using Sync producer for send

2015-03-30 Thread Jiangjie Qin
The async send() put the message into a message queue then returns. When
the messages are pulled out of the queue by the sender thread, it still
uses SyncProducer to send ProducerRequests to brokers.

Jiangjie (Becket) Qin

On 3/30/15, 10:44 PM, "Madhukar Bharti"  wrote:

>Hi All,
>
>I am using *async *producer to send the data. When I checked the log it is
>showing as below:
>
>
>[2015-03-31 11:09:55,915] INFO Verifying properties
>(kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,946] INFO Property key.serializer.class is overridden
>to kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,947] INFO Property metadata.broker.list is overridden
>to 172.20.6.201:9092,172.20.6.25:9092,172.20.8.62:9092
>(kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,947] INFO Property producer.type is overridden to
>async (kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,947] INFO Property queue.buffering.max.ms is
>overridden to 300 (kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,947] INFO Property queue.enqueue.timeout.ms is
>overridden to 50 (kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,947] INFO Property request.required.acks is
>overridden
>to 1 (kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,948] INFO Property send.buffer.bytes is overridden to
>4048 (kafka.utils.VerifiableProperties)
>[2015-03-31 11:09:55,948] INFO Property serializer.class is overridden to
>kafka.serializer.StringEncoder (kafka.utils.VerifiableProperties)
>Thread Number0
>Sent: This is message 0, Topic::TestMQ
>Sent: This is message 1, Topic::TestMQ
>Sent: This is message 2, Topic::TestMQ
>Sent: This is message 3, Topic::TestMQ
>Sent: This is message 4, Topic::TestMQ
>Sent: This is message 5, Topic::TestMQ
>Sent: This is message 6, Topic::TestMQ
>Sent: This is message 7, Topic::TestMQ
>Sent: This is message 8, Topic::TestMQ
>Sent: This is message 9, Topic::TestMQ
>[2015-03-31 11:09:56,395] INFO Fetching metadata from broker
>id:1,host:172.20.6.25,port:9092 with correlation id 0 for 1 topic(s)
>Set(TestMQ) (kafka.client.ClientUtils$)
>[2015-03-31 11:09:56,404] INFO Connected to 172.20.6.25:9092 for producing
>(kafka.producer.SyncProducer)
>[2015-03-31 11:09:56,438] INFO Disconnecting from 172.20.6.25:9092
>(kafka.producer.SyncProducer)
>[2015-03-31 11:09:56,479] INFO Connected to 172.20.6.25:9092 for producing
>(kafka.producer.SyncProducer)
>[2015-03-31 11:09:56,573] INFO Connected to 172.20.6.201:9092 for
>producing
>(kafka.producer.SyncProducer)
>[2015-03-31 11:09:56,591] INFO Connected to 172.20.8.62:9092 for producing
>(kafka.producer.SyncProducer)
>
>
>
>My doubt is why it is using "*kafka.producer.SyncProducer*"? I am using
>kafka 0.8.1.1.
>
>
>Thanks in advance!



Re: Consumer in Java client

2015-03-30 Thread Jiangjie Qin
Hi,

KafkaConsumer is still under development and not ready for wide use case.
Currently, it can be used to replace SimpleConsumer (low level consumer),
but can not replace ZookeeperConsumerConnector(high level consumer). So if
you need to use simple consumer, I would suggest using KafkaConsumer
instead, otherwise, you probably still want to use
ZookeeperConsumerConnector.

- Jiangjie (Becket) Qin

On 3/30/15, 7:32 PM, "LongkerDandy"  wrote:

>Hi
>
>I'm new to kafka and using 0.8.2.1 kafka-clients and kafka_2.10 packages.
>The document says: ³We are in the process of rewritting the JVM clients
>for
>Kafka. As of 0.8.2 Kafka includes a newly rewritten Java producer. The
>next
>release will include an equivalent Java consumer.²
>It seems the kafka-clients package already ships with a KafkaConsumer.
>So should I use the KafkaConsumer from kafka-clients?
>Or I should stick with kafka.javaapi.consumer.ConsumerConnector from
>kafka_2.10?
>
>Regards
>LongkerDandy



Re: Kafka server relocation

2015-03-25 Thread Jiangjie Qin
If you want to do a seamless migration. I think a better way is to build a
cross datacenter Kafka cluster temporarily. So the process is:
1. Add several new Kafka brokers in your new datacenter and add them to
the old cluster.
2. Use replica assignment tool to reassign all the partitions to brokers
in new datacenter.
3. Perform controlled shutdown on the brokers in old datacenter.

Jiangjie (Becket) Qin

On 3/25/15, 2:01 PM, "nitin sharma"  wrote:

>Hi Team,
>
>in my project, we have built a new datacenter for Kafka brokers and wants
>to migrate from current datacenter to new one.
>
>Switching producers and consumers wont be a problem provided New
>Datacenter
>has all the messages of existing Datacenter.
>
>
>i have only 1 topic with 2 partition that need to be migrated... that too
>it is only 1 time activity.
>
>Kindly suggest the best way to deal with this situation.
>
>
>Regards,
>Nitin Kumar Sharma.



  1   2   >