Re: create topic does not really executed successfully

2015-02-02 Thread Xinyi Su
Hi, Harsha

I have not collected logs during creating topics. I will continue to
pay attention to this issue and collect logs if it occurs again.


Thanks.
Xinyi

On 3 February 2015 at 12:46, Harsha  wrote:

> Xinyl,
> Do you have any logs when the kafka-topics.sh unable to create
> topic dirs. Apart from this make sure you point to a different
> dir other than /tmp/kafka-logs since this dir gets delete when
> your machine restarts and not a place to store your topic data.
> -Harsha
>
> On Mon, Feb 2, 2015, at 07:03 PM, Xinyi Su wrote:
> > Hi,
> >
> > -bash-4.1$ bin/kafka-topics.sh  --zookeeper :2181 --create
> > --topic
> > zerg.hydra --partitions 3 --replication-factor 2
> > Created topic "zerg.hydra".
> >
> > -bash-4.1$ ls -lrt /tmp/kafka-logs/zerg.hydra-2
> > total 0
> > -rw-r--r-- 1 users0 Feb  3 02:58 .log
> > -rw-r--r-- 1 users 10485760 Feb  3 02:58 .index
> >
> > We can see the topic partition directory is created after the shell
> > command
> > is executed since I have not sent any data yet.
> > But this shell command is not always executed successfully, sometimes it
> > fails to create the directory for topic-partition.
> >
> > Besides, the broker number is greater than replication factor in my kafka
> > cluster.
> >
> > Thanks.
> > Xinyi
> >
> > On 2 February 2015 at 22:24, Gwen Shapira  wrote:
> >
> > > IIRC, the directory is only created after you send data to the topic.
> > >
> > > Do you get errors when your producer sends data?
> > >
> > > Another common issue is that you specify replication-factor 3 when you
> > > have fewer than 3 brokers.
> > >
> > > Gwen
> > >
> > > On Mon, Feb 2, 2015 at 2:34 AM, Xinyi Su  wrote:
> > > > Hi,
> > > >
> > > > I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to
> create
> > > > topic, I observed sometimes the topic is not really created
> successfully
> > > as
> > > > the output shows in console.
> > > >
> > > > Below is my command line:
> > > >
> > > > # bin/kafka-topics.sh  --zookeeper :2181 --create --topic
> zerg.hydra
> > > > --partitions 3 --replication-factor 3
> > > >
> > > > The command prompts "created topic xxx", but local storage directory
> used
> > > > for this topic under "log.dirs" does not created at all. Normally,
> there
> > > > should be some folders like zerg.hydra-0, zerg.hydra-1... just named
> > > > according to partion id and assignment policy.
> > > >
> > > > I come across this issue about four times, the disk is not full and
> > > > directory access permission is legal. Do you know about the cause of
> this
> > > > issue?
> > > >
> > > > Thanks.
> > > >
> > > > Xinyi
> > >
>


Re: Kafka ETL Camus Question

2015-02-02 Thread Pradeep Gollakota
Hi Bhavesh,

At Lithium, we don't run Camus in our pipelines yet, though we plan to. But
I just wanted to comment regarding speculative execution. We have it
disabled at the cluster level and typically don't need it for most of our
jobs. Especially with something like Camus, I don't see any need to run
parallel copies of the same task.

On Mon, Feb 2, 2015 at 10:36 PM, Bhavesh Mistry 
wrote:

> Hi Jun,
>
> Thanks for info.  I did not get answer  to my question there so I thought I
> try my luck here :)
>
> Thanks,
>
> Bhavesh
>
> On Mon, Feb 2, 2015 at 9:46 PM, Jun Rao  wrote:
>
> > You can probably ask the Camus mailing list.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > Hi Kafka Team or Linked-In  Team,
> > >
> > > I would like to know if you guys run Camus ETL job with speculative
> > > execution true or false.  Does it make sense to set this to false ?
> > Having
> > > true, it creates additional load on brokers for each map task (create a
> > map
> > > task to pull same partition twice).  Is there any advantage to this
> > having
> > > it on vs off ?
> > >
> > > mapred.map.tasks.speculative.execution
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> >
>


Re: Kafka ETL Camus Question

2015-02-02 Thread Bhavesh Mistry
Hi Jun,

Thanks for info.  I did not get answer  to my question there so I thought I
try my luck here :)

Thanks,

Bhavesh

On Mon, Feb 2, 2015 at 9:46 PM, Jun Rao  wrote:

> You can probably ask the Camus mailing list.
>
> Thanks,
>
> Jun
>
> On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Kafka Team or Linked-In  Team,
> >
> > I would like to know if you guys run Camus ETL job with speculative
> > execution true or false.  Does it make sense to set this to false ?
> Having
> > true, it creates additional load on brokers for each map task (create a
> map
> > task to pull same partition twice).  Is there any advantage to this
> having
> > it on vs off ?
> >
> > mapred.map.tasks.speculative.execution
> >
> > Thanks,
> >
> > Bhavesh
> >
>


Re: Question on ETL while replau

2015-02-02 Thread Jun Rao
You can't change existing messages. You can republish messages with new
fields and manually set the consumer offsets.

Thanks,

Jun

On Thu, Jan 29, 2015 at 1:12 PM, Joshua Schumacher 
wrote:

> What's the best way to add two 'fields' to my kafka messages once they are
> stored?  Can I just do a replay on all of them and add the field?  How
> would I throw out the old kafka messages that don't have the field then?  I
> am using Druid to process the data, but not sure of how to propagate data
> changes all the way down to kafka.
>
> Thanks,
> Josh
>


Re: Kafka ETL Camus Question

2015-02-02 Thread Jun Rao
You can probably ask the Camus mailing list.

Thanks,

Jun

On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry 
wrote:

> Hi Kafka Team or Linked-In  Team,
>
> I would like to know if you guys run Camus ETL job with speculative
> execution true or false.  Does it make sense to set this to false ? Having
> true, it creates additional load on brokers for each map task (create a map
> task to pull same partition twice).  Is there any advantage to this having
> it on vs off ?
>
> mapred.map.tasks.speculative.execution
>
> Thanks,
>
> Bhavesh
>


Re: Can't start Zookeeper on a EC2 instance in a public subnet

2015-02-02 Thread Jun Rao
It seems there is another process using the Zookeeper port.

Thanks,

Jun

On Thu, Jan 29, 2015 at 11:57 AM, Su She  wrote:

> Hello Everyone,
>
> I previously had my EC2 instances in a private subnet, but I spun up a new
> cluster in a public subnet. However, it seems to have taken me a step back
> as now I can't even start the zookeeper. I am getting this error:
>
> ERROR Unexpected exception, exiting abnormally
> (org.apache.zookeeper.server.ZooKeeperServerMain)
> java.net.BindException: Address already in use
> at sun.nio.ch.Net.bind0(Native Method)
> at sun.nio.ch.Net.bind(Net.java:444)
> at sun.nio.ch.Net.bind(Net.java:436)
> at
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:67)
> at
>
> org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95)
> at
>
> org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:111)
> at
>
> org.apache.zookeeper.server.ZooKeeperServerMain.initializeAndRun(ZooKeeperServerMain.java:86)
> at
>
> org.apache.zookeeper.server.ZooKeeperServerMain.main(ZooKeeperServerMain.java:52)
> at
>
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:116)
> at
>
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:78)
> I am confused as to how to move forward as the zookeeper.properties file
> does not allow for many configurations. Thank you!
>


Re: Detecting lost connection in high level consumer

2015-02-02 Thread Jun Rao
That's actually how consumer.timeout.ms works. The iterator only gets an
exception is there is no message after the timeout. The error handling of
the connection is done in the consumer client library for you.

Thanks,

Jun

On Wed, Jan 28, 2015 at 6:21 PM, harikiran  wrote:

> Hi
>
> I am using the 0811 Kafka High level consumer and I have configured "
> consumer.timeout.ms" to a value that is not -1, say 5000ms.
>
> I create the consumer iterator and invoke hasNext() method on it.
>
> Irrespective of whether kafka broker was shutdown or there was no message
> written to kafka, I see a ConsumerTimeOut exception after 5000ms.
>
> My goal is to detect lost connection and reconnect but I cannot figure out
> a way.
>
> Any kind of help is appreciated.
>
> Thanks
> Hari
>


Re: WARN Error in I/O with NetworkReceive.readFrom(NetworkReceive.java

2015-02-02 Thread Jun Rao
Any error on the broker log?

Thanks,

Jun

On Wed, Jan 28, 2015 at 8:36 AM, Dillian Murphey 
wrote:

> Running the performance test. What is the nature of this error??  I'm
> running a very high end cluster on aws. Tried this even within the same
> subnet on aws.
>
> bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> topic9 5000 100 -1 acks=1 bootstrap.servers=$IP:9092
> buffer.memory=67108864 batch.size=8196
>
>
>
> 2015-01-28 16:32:22,178] WARN Error in I/O with /
> (org.apache.kafka.common.network.Selector)
> java.io.EOFException
> at
>
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:248)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> at java.lang.Thread.run(Thread.java:745)
> 65910 records sent, 13158.3 records/sec (1.25 MB/sec), 38525.9 ms avg
> latency, 39478.0 max latency.
>
> Thanks for any ideas
>


Re: Replication stop working

2015-02-02 Thread Jun Rao
One thing that you need to be aware is that if a broker goes down, the
affected partitions will remain under replicated until the broker is
restarted and catches up again.

Thanks,

Jun

On Tue, Jan 27, 2015 at 10:59 AM, Dong, John  wrote:

> Hi,
>
> I am new to this forum and I am not sure this is the correct mailing list
> for sending question. If not, please let me know and I will stop.
>
> I am looking for help to resolve replication issue. Replication stopped
> working a while back.
>
> Kafka environment: Kafka 0.8.1.1, Centos 6.5, 7 node cluster, default
> replication-factor 2, 10 partition per topic.
>
> Initially each partition is residing on two different nodes. It has been
> this way for several months and working. Starting two weeks ago, two things
> happened.
>
>   1.  one node's disk usage got to 100% and crashed kafka process. So we
> had to delete some *.log and *.index and restarted kafka process.
>   2.  In another case, some other node's disk usage reached 90%. Someone
> deleted some *.log and *.index files without shutting down kafka process.
> This caused issues and kafka was unable to restarted. I had to delete all
> *.log and *.index on this node to bring kafka back online.
>
> Now replication is all broken. Most of the partition has only one leader
> and one in ISR, even though replication is setup with two broker ids.
> Whenever I shutdown kafka process on a node, whatever leader running on
> this node will get moved to another node that is defined in replication.
> After I restart kafka on this node, it will never become a follower and its
> data directory never get updated.
>
> I tried the following:
>
>
>   1.  I had turned on TRACE/DEBUG level with kafka and zookeeper. I did
> not find anything that can help.
>   2.   I also tried to manipulate replication configuration in zookeeper
> using zkCLI.sh, like adding a follower to ISR list. That did not initiate a
> fether process to make itself become a follower.
>   3.   I also created new topic with replication working initially. But as
> soon as I shutdown kafka on one of its two nodes, that partition loses one
> replica in ISR and never come back. This confirms that it is reproducible.
>   4.  I ran kafka preferred replication election tool to force re-election
> of leader. That did not do anything. It is like nothing happen to the
> cluster.
>   5.  I added num.replica.fetchers=10 to server.properties and restarted
> kakfa. That did not do anything.
>
> Has anyone have any experience with this ? Or any advice where to look and
> what the next steps are for trouble-shooting ? There are only two things
> that I may have to do.
>
>
>   1.  Shutdown all kafka and zookeeper and restart them. I really do not
> want to go this route unless I have to. I would like to identify the root
> cause of it and not to randomly restart the whole cluster.
>   2.  Move all topics to another kafka cluster, and rebuild it. This will
> be very time consuming and a lot of changes in the application.
>
> Thanks.
>
> John Dong
>


Re: Consumers closing sockets abruptly?

2015-02-02 Thread Jun Rao
Is there another broker running on that ip? If the replication factor is
larger than 1, the follower will be fetching data from the leader just like
a regular consumer.

Thanks,

Jun

On Tue, Jan 27, 2015 at 9:52 AM, Scott Reynolds 
wrote:

> On my brokers I am seeing this error log message:
>
> Closing socket for /X because of error (X is the ip address of a consumer)
> > 2015-01-27_17:32:58.29890 java.io.IOException: Connection reset by peer
> > 2015-01-27_17:32:58.29890   at
> > sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> > 2015-01-27_17:32:58.29891   at
> > sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:433)
> > 2015-01-27_17:32:58.29892   at
> > sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:565)
> > 2015-01-27_17:32:58.29892   at
> > kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> > 2015-01-27_17:32:58.29893   at
> > kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:69)
> > 2015-01-27_17:32:58.29893   at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29894   at
> > kafka.api.TopicDataSend.writeTo(FetchResponse.scala:124)
> > 2015-01-27_17:32:58.29895   at
> > kafka.network.MultiSend.writeTo(Transmission.scala:102)
> > 2015-01-27_17:32:58.29895   at
> > kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:219)
> > 2015-01-27_17:32:58.29896   at
> > kafka.network.Processor.write(SocketServer.scala:375)
> > 2015-01-27_17:32:58.29896   at
> > kafka.network.Processor.run(SocketServer.scala:247)
> > 2015-01-27_17:32:58.29897   at java.lang.Thread.run(Thread.java:745)
> >
>
> This is because the Processor doesn't handle java.io.IOException and it
> falls through to the catch all.
>
> My consumers seem actually really happy. So I don't think there is a real
> issue here. But I could use some help figuring out if there is.
>
> We are using the Java consumer like so:
>
> > final ConsumerConnector consumer =
> > kafka.consumer.Consumer.createJavaConsumerConnector(config);
> > Map topicCountMap = new HashMap();
> > topicCountMap.put(topicName, new Integer(1));
> > final Map>> consumerMap =
> > consumer.createMessageStreams(topicCountMap);
> > final KafkaStream stream =
> > consumerMap.get(topicName).get(0);
> >
>
> and we just iterate over the stream
>
> Questions:
> 1. What class is the one that makes the network connection to the consumer?
> 2. Do legit cases exist where the consumer would close its socket
> connection ? Zookeeper issues ? Consumer too far behind ?
>


Re: create topic does not really executed successfully

2015-02-02 Thread Harsha
Xinyl,
Do you have any logs when the kafka-topics.sh unable to create
topic dirs. Apart from this make sure you point to a different
dir other than /tmp/kafka-logs since this dir gets delete when
your machine restarts and not a place to store your topic data.
-Harsha

On Mon, Feb 2, 2015, at 07:03 PM, Xinyi Su wrote:
> Hi,
> 
> -bash-4.1$ bin/kafka-topics.sh  --zookeeper :2181 --create
> --topic
> zerg.hydra --partitions 3 --replication-factor 2
> Created topic "zerg.hydra".
> 
> -bash-4.1$ ls -lrt /tmp/kafka-logs/zerg.hydra-2
> total 0
> -rw-r--r-- 1 users0 Feb  3 02:58 .log
> -rw-r--r-- 1 users 10485760 Feb  3 02:58 .index
> 
> We can see the topic partition directory is created after the shell
> command
> is executed since I have not sent any data yet.
> But this shell command is not always executed successfully, sometimes it
> fails to create the directory for topic-partition.
> 
> Besides, the broker number is greater than replication factor in my kafka
> cluster.
> 
> Thanks.
> Xinyi
> 
> On 2 February 2015 at 22:24, Gwen Shapira  wrote:
> 
> > IIRC, the directory is only created after you send data to the topic.
> >
> > Do you get errors when your producer sends data?
> >
> > Another common issue is that you specify replication-factor 3 when you
> > have fewer than 3 brokers.
> >
> > Gwen
> >
> > On Mon, Feb 2, 2015 at 2:34 AM, Xinyi Su  wrote:
> > > Hi,
> > >
> > > I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to create
> > > topic, I observed sometimes the topic is not really created successfully
> > as
> > > the output shows in console.
> > >
> > > Below is my command line:
> > >
> > > # bin/kafka-topics.sh  --zookeeper :2181 --create --topic zerg.hydra
> > > --partitions 3 --replication-factor 3
> > >
> > > The command prompts "created topic xxx", but local storage directory used
> > > for this topic under "log.dirs" does not created at all. Normally, there
> > > should be some folders like zerg.hydra-0, zerg.hydra-1... just named
> > > according to partion id and assignment policy.
> > >
> > > I come across this issue about four times, the disk is not full and
> > > directory access permission is legal. Do you know about the cause of this
> > > issue?
> > >
> > > Thanks.
> > >
> > > Xinyi
> >


Re: create topic does not really executed successfully

2015-02-02 Thread Harsha
Xinyl,
Do you have any logs when the kafka-topics.sh unable to create
topic dirs. Apart from this make sure you point to a different
dir other than /tmp/kafka-logs since this dir gets delete when
your machine restarts and not a place to store your topic data.
-Harsha

On Mon, Feb 2, 2015, at 07:03 PM, Xinyi Su wrote:
> Hi,
> 
> -bash-4.1$ bin/kafka-topics.sh  --zookeeper :2181 --create
> --topic
> zerg.hydra --partitions 3 --replication-factor 2
> Created topic "zerg.hydra".
> 
> -bash-4.1$ ls -lrt /tmp/kafka-logs/zerg.hydra-2
> total 0
> -rw-r--r-- 1 users0 Feb  3 02:58 .log
> -rw-r--r-- 1 users 10485760 Feb  3 02:58 .index
> 
> We can see the topic partition directory is created after the shell
> command
> is executed since I have not sent any data yet.
> But this shell command is not always executed successfully, sometimes it
> fails to create the directory for topic-partition.
> 
> Besides, the broker number is greater than replication factor in my kafka
> cluster.
> 
> Thanks.
> Xinyi
> 
> On 2 February 2015 at 22:24, Gwen Shapira  wrote:
> 
> > IIRC, the directory is only created after you send data to the topic.
> >
> > Do you get errors when your producer sends data?
> >
> > Another common issue is that you specify replication-factor 3 when you
> > have fewer than 3 brokers.
> >
> > Gwen
> >
> > On Mon, Feb 2, 2015 at 2:34 AM, Xinyi Su  wrote:
> > > Hi,
> > >
> > > I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to create
> > > topic, I observed sometimes the topic is not really created successfully
> > as
> > > the output shows in console.
> > >
> > > Below is my command line:
> > >
> > > # bin/kafka-topics.sh  --zookeeper :2181 --create --topic zerg.hydra
> > > --partitions 3 --replication-factor 3
> > >
> > > The command prompts "created topic xxx", but local storage directory used
> > > for this topic under "log.dirs" does not created at all. Normally, there
> > > should be some folders like zerg.hydra-0, zerg.hydra-1... just named
> > > according to partion id and assignment policy.
> > >
> > > I come across this issue about four times, the disk is not full and
> > > directory access permission is legal. Do you know about the cause of this
> > > issue?
> > >
> > > Thanks.
> > >
> > > Xinyi
> >


Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-02 Thread Jay Kreps
Yay!

-Jay

On Mon, Feb 2, 2015 at 2:23 PM, Neha Narkhede  wrote:

> Great! Thanks Jun for helping with the release and everyone involved for
> your contributions.
>
> On Mon, Feb 2, 2015 at 1:32 PM, Joe Stein  wrote:
>
> > Huzzah!
> >
> > Thanks Jun for preparing the release candidates and getting this out to
> the
> > community.
> >
> > - Joe Stein
> >
> > On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao  wrote:
> >
> > > The following are the results of the votes.
> > >
> > > +1 binding = 3 votes
> > > +1 non-binding = 1 votes
> > > -1 = 0 votes
> > > 0 = 0 votes
> > >
> > > The vote passes.
> > >
> > > I will release artifacts to maven central, update the dist svn and
> > download
> > > site. Will send out an announce after that.
> > >
> > > Thanks everyone that contributed to the work in 0.8.2.0!
> > >
> > > Jun
> > >
> > > On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao  wrote:
> > >
> > >> This is the third candidate for release of Apache Kafka 0.8.2.0.
> > >>
> > >> Release Notes for the 0.8.2.0 release
> > >>
> > >>
> >
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
> > >>
> > >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
> > >>
> > >> Kafka's KEYS file containing PGP keys we use to sign the release:
> > >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> > >> (SHA256) checksum.
> > >>
> > >> * Release artifacts to be voted upon (source and binary):
> > >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
> > >>
> > >> * Maven artifacts to be voted upon prior to release:
> > >> https://repository.apache.org/content/groups/staging/
> > >>
> > >> * scala-doc
> > >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
> > >>
> > >> * java-doc
> > >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
> > >>
> > >> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> > >>
> > >>
> >
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> > >> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
> > >>
> > >> /***
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >  --
> > > You received this message because you are subscribed to the Google
> Groups
> > > "kafka-clients" group.
> > > To unsubscribe from this group and stop receiving emails from it, send
> an
> > > email to kafka-clients+unsubscr...@googlegroups.com.
> > > To post to this group, send email to kafka-clie...@googlegroups.com.
> > > Visit this group at http://groups.google.com/group/kafka-clients.
> > > To view this discussion on the web visit
> > >
> >
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com
> > > <
> >
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com?utm_medium=email&utm_source=footer
> > >
> > > .
> > >
> > > For more options, visit https://groups.google.com/d/optout.
> > >
> >
>
>
>
> --
> Thanks,
> Neha
>


Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jaikiran Pai

On Monday 02 February 2015 11:03 PM, Jun Rao wrote:

Jaikiran,

The fix you provided in probably unnecessary. The channel that we use in
SimpleConsumer (BlockingChannel) is configured to be blocking. So even
though the read from the socket is in a loop, each read blocks if there is
no bytes received from the broker. So, that shouldn't cause extra CPU
consumption.

Hi Jun,

Of course, you are right! I forgot that while reading the thread dump in 
hprof output, one has to be aware that the thread state isn't shown and 
the thread need not necessarily be doing any CPU activity.


-Jaikiran




Thanks,

Jun

On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:


Hi Neha,

I sent an e-mail earlier today, but noticed now that it didn't actually go
through.

Anyhow, I've attached two files, one with output from a 10 minute run and
one with output from a 30 minute run. Realized that maybe I should've done
one or two runs with 0.8.1.1 as well, but nevertheless.

I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same CPU
usage as with the beta version (basically pegging all cores). If I manage
to find the time I'll do another run with hprof on the rc2 version later
today.

Best regards,
Mathias

On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede  wrote:


The following should be sufficient

java
-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
y,thread=y,file=kafka.hprof


You would need to start the Kafka server with the settings above for
sometime until you observe the problem.

On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:


Hi Neha,

Yeah sure. I'm not familiar with hprof, so any particular options I

should

include or just run with defaults?

Best regards,
Mathias

On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede 

wrote:

Thanks for reporting the issue. Would you mind running hprof and

sending

the output?

On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:


Good day,

I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed

that

the CPU usage on the broker machines went up by roughly 40%, from

~60%

to

~100% and am wondering if anyone else has experienced something

similar?

The load average also went up by 2x-3x.

We're running on EC2 and the cluster currently consists of four

m1.xlarge,

with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65

to

be

exact) and Scala 2.9.2. Configurations can be found over here:
https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.

I'm assuming that this is not expected behaviour for 0.8.2-beta?

Best regards,
Mathias




--
Thanks,
Neha




--
Thanks,
Neha





Re: create topic does not really executed successfully

2015-02-02 Thread Xinyi Su
Hi,

-bash-4.1$ bin/kafka-topics.sh  --zookeeper :2181 --create --topic
zerg.hydra --partitions 3 --replication-factor 2
Created topic "zerg.hydra".

-bash-4.1$ ls -lrt /tmp/kafka-logs/zerg.hydra-2
total 0
-rw-r--r-- 1 users0 Feb  3 02:58 .log
-rw-r--r-- 1 users 10485760 Feb  3 02:58 .index

We can see the topic partition directory is created after the shell command
is executed since I have not sent any data yet.
But this shell command is not always executed successfully, sometimes it
fails to create the directory for topic-partition.

Besides, the broker number is greater than replication factor in my kafka
cluster.

Thanks.
Xinyi

On 2 February 2015 at 22:24, Gwen Shapira  wrote:

> IIRC, the directory is only created after you send data to the topic.
>
> Do you get errors when your producer sends data?
>
> Another common issue is that you specify replication-factor 3 when you
> have fewer than 3 brokers.
>
> Gwen
>
> On Mon, Feb 2, 2015 at 2:34 AM, Xinyi Su  wrote:
> > Hi,
> >
> > I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to create
> > topic, I observed sometimes the topic is not really created successfully
> as
> > the output shows in console.
> >
> > Below is my command line:
> >
> > # bin/kafka-topics.sh  --zookeeper :2181 --create --topic zerg.hydra
> > --partitions 3 --replication-factor 3
> >
> > The command prompts "created topic xxx", but local storage directory used
> > for this topic under "log.dirs" does not created at all. Normally, there
> > should be some folders like zerg.hydra-0, zerg.hydra-1... just named
> > according to partion id and assignment policy.
> >
> > I come across this issue about four times, the disk is not full and
> > directory access permission is legal. Do you know about the cause of this
> > issue?
> >
> > Thanks.
> >
> > Xinyi
>


Re: Consumer not getting data when there is a big lag in the topic

2015-02-02 Thread Guozhang Wang
Dinesh,

I took a look at your logs, first it seems error_logs_kafka_request.log is
also from the consumer side, not the server side.

And the error logs are pointing to an EOF on the server side while reading
the data, one possibility is that your socket buffer size is configured to
be not as large enough to fill in the fetch response data when the consumer
is catching up and hence getting a huge fetch response each time.

Guozhang

On Wed, Jan 28, 2015 at 12:28 PM, dinesh kumar  wrote:

> Hi Guozhang,
> Sorry for the delayed response. We had some hardware issues and I was not
> able to give you the logs you asked for. We upgraded to 0.8.2-beta version
> of kafka
> in our cluster but that did not help solve the issue.
>
> To make more lucid, I am attaching a sample code that can be used to repro
> the issue on 0.8.1.1 version of  kafka (running locally is also fine).
> Also I have put the log files from my experiments in the attachment under
> logs/ directory.
>
> *logs/error_entires_consumer.log *- error entries that show up in the
> consumer logs
> *logs/error_logs_kafka_request.log *- error from the kafka-request.log in
> the server.
>
> To run the repro, please go through the attached README file.
>
> Let me know if you need any more info.
>
> Thanks,
> Dinesh
>
>
> On 15 January 2015 at 06:00, Guozhang Wang  wrote:
>
>> Could you check both the server logs and the consumer logs (with and
>> without the config specified) and see if there are any error entries /
>> exception logs?
>>
>> Guozhang
>>
>> On Wed, Jan 14, 2015 at 1:53 PM, dinesh kumar 
>> wrote:
>>
>> > We don't have any compression on Kafka
>> >
>> > On 14 January 2015 at 22:54, Guozhang Wang  wrote:
>> >
>> > > Did you have compression enabled on Kafka?
>> > >
>> > > On Tue, Jan 13, 2015 at 10:33 AM, dinesh kumar 
>> > > wrote:
>> > >
>> > > > We are using 0.8.1.1 version of Kafka and *not 0.8.2 *as mentioned
>> > above.
>> > > >
>> > > > Thanks,
>> > > > Dinesh
>> > > >
>> > > > On 13 January 2015 at 23:35, dinesh kumar 
>> wrote:
>> > > >
>> > > > > Hi Guozhang,
>> > > > > Sorry for the misinformation. We have file sizes around 50 - 100
>> MB.
>> > So
>> > > > we
>> > > > > set *fetch.message.max.bytes* conservatively around 188743680.
>> Can
>> > you
>> > > > > please explain me the reason for this behavior?
>> > > > >
>> > > > > Thanks,
>> > > > > Dinesh
>> > > > >
>> > > > > On 13 January 2015 at 21:42, Guozhang Wang 
>> > wrote:
>> > > > >
>> > > > >> Dinesh,
>> > > > >>
>> > > > >> Your fetch.message.max.bytes is 188743680 < 155MB, but you said
>> some
>> > > > >> messages can be as large as 180MB. Could you try to set it to be
>> > > larger
>> > > > >> than, say 200MB and see if it helps?
>> > > > >>
>> > > > >> Guozhang
>> > > > >>
>> > > > >> On Tue, Jan 13, 2015 at 4:18 AM, dinesh kumar <
>> dinesh...@gmail.com>
>> > > > >> wrote:
>> > > > >>
>> > > > >> > Hi,
>> > > > >> > I am been facing some JAVA high level consumer related issues
>> > lately
>> > > > and
>> > > > >> > would like to understand more on this.
>> > > > >> >
>> > > > >> > We have 9 bare-metals (48 core, 250 GB, Terabytes of Hard
>> disks)
>> > > > running
>> > > > >> > *Kafka
>> > > > >> > 0.8.2* and 5 independent VM (8 core, 60 GB) running zookeeper.
>> > > > >> >
>> > > > >> > I have a topic that has key as metadata and value as a file.
>> The
>> > > file
>> > > > >> can
>> > > > >> > be as large as *180 MB.* We have a topic with 90 partitions.
>> > > Sometimes
>> > > > >> > there will be only one consumer consuming from the topic. When
>> the
>> > > > >> consumer
>> > > > >> > group for my topic has a *lag in the range of 200's* and when I
>> > > start
>> > > > a
>> > > > >> > consumer (no other consumer running before) there is *no data*
>> > > coming
>> > > > >> > through to the consumer.
>> > > > >> >
>> > > > >> > Please find below my consumer parameters.
>> > > > >> >
>> > > > >> > "zookeeper.connect"=> ,
>> > > > >> > "group.id" => "default",
>> > > > >> > "consumer.timeout.ms"  => "-1",
>> > > > >> > "auto.offset.reset"=> "smallest",
>> > > > >> > "auto.commit.enable"   => "false",
>> > > > >> > "consumer.timeout.ms"  => "-1",
>> > > > >> > "zookeeper.session.timeout.ms" => "10",
>> > > > >> > "zookeeper.connection.timeout.ms"  => "6000",
>> > > > >> > "zookeeper.sync.time.ms"   => "2000",
>> > > > >> > "rebalance.backoff.ms" =>  "2",
>> > > > >> > "rebalance.max.retries"=> "50"
>> > > > >> > "fetch.message.max.bytes"  => "188743680",
>> > > > >> > "fetch.size"   => "18874368"
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> > This problem occurs only when the *auto.offset.reset *property
>> is
>> > > > >> > *smallest.
>> > > > >> > *I am able to get data if the offset is largest. I tried using
>> the
>> > > > >> *console
>> > > > >> > consumer* for the same topic and consumer group with
>> > > > *--f

Re: New Producer - ONLY sync mode?

2015-02-02 Thread Pradeep Gollakota
I looked at the newly added batch API to Kinesis for inspiration. The
response on the batch put is a list of message-ids and their status (offset
if success else a failure code).

Ideally, I think the server should fail the entire batch or succeed the
entire batch (i.e. no duplicates), but this is pretty hard to implement.
Given that, what Kinesis did is probably good compromise (perhaps while we
wait for exactly once semantics :))

In addition, perhaps adding a flush() method to the producer to allow for
control over when flushes happen might be another good starting point. With
the addition of a flush, it's easier to implement a "SyncProducer" by doing
something like, flush() -> n x send() -> flush(). This doesn't guarantee
that a particular batch isn't broken into two, but with sane batch sizes
and sane record sizes, we can assume the guarantee.

On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira  wrote:

> I've been thinking about that too, since both Flume and Sqoop rely on
> send(List) API of the old API.
>
> I'd like to see this API come back, but I'm debating how we'd handle
> errors. IIRC, the old API would fail an entire batch on a single
> error, which can lead to duplicates. Having N callbacks lets me retry
> / save / whatever just the messages that had issues.
>
> If messages had identifiers from the producer side, we could have the
> API call the callback with a list of message-ids and their status. But
> they don't :)
>
> Any thoughts on how you'd like it to work?
>
> Gwen
>
>
> On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota 
> wrote:
> > This is a great question Otis. Like Gwen said, you can accomplish Sync
> mode
> > by setting the batch size to 1. But this does highlight a shortcoming of
> > the new producer API.
> >
> > I really like the design of the new API and it has really great
> properties
> > and I'm enjoying working with it. However, once API that I think we're
> > lacking is a "batch" API. Currently, I have to iterate over a batch and
> > call .send() on each record, which returns n callbacks instead of 1
> > callback for the whole batch. This significantly complicates recovery
> logic
> > where we need to commit a batch as opposed 1 record at a time.
> >
> > Do you guys have any plans to add better semantics around batches?
> >
> > On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira 
> wrote:
> >
> >> If I understood the code and Jay correctly - if you wait for the
> >> future it will be a similar delay to that of the old sync producer.
> >>
> >> Put another way, if you test it out and see longer delays than the
> >> sync producer had, we need to find out why and fix it.
> >>
> >> Gwen
> >>
> >> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
> >>  wrote:
> >> > Hi,
> >> >
> >> > Nope, unfortunately it can't do that.  X is a remote app, doesn't
> listen
> >> to
> >> > anything external, calls Y via HTTPS.  So X has to decide what to do
> with
> >> > its data based on Y's synchronous response.  It has to block until Y
> >> > responds.  And it wouldn't be pretty, I think, because nobody wants to
> >> run
> >> > apps that talk to remove servers and hang on to connections more than
> >> they
> >> > have to.  But perhaps that is the only way?  Or maybe the answer to
> "I'm
> >> > guessing the delay would be more or less the same as if the Producer
> was
> >> > using SYNC mode?" is YES, in which case the connection from X to Y
> would
> >> be
> >> > open for just as long as with a SYNC producer running in Y?
> >> >
> >> > Thanks,
> >> > Otis
> >> > --
> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira 
> >> wrote:
> >> >
> >> >> Can Y have a callback that will handle the notification to X?
> >> >> In this case, perhaps Y can be async and X can buffer the data until
> >> >> the callback triggers and says "all good" (or resend if the callback
> >> >> indicates an error)
> >> >>
> >> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> >> >>  wrote:
> >> >> > Hi,
> >> >> >
> >> >> > Thanks for the info.  Here's the use case.  We have something up
> >> stream
> >> >> > sending data, say a log shipper called X.  It sends it to some
> remote
> >> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
> >> But Y
> >> >> > needs to send a reply to X and tell it whether it successfully put
> all
> >> >> its
> >> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> >> locally
> >> >> > and resend it later.
> >> >> >
> >> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y
> would
> >> just
> >> >> > need to wait for the Future to come back and only then send the
> >> response
> >> >> > back to X?  If so, I'm guessing the delay would be more or less the
> >> same
> >> >> as
> >> >> > if the Producer was using SYNC mode?
> >> >> >
> >> >> > Thanks,
> >> >> > Otis
> >> >> > --
> >> >> > Monitoring * Al

Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-02 Thread Neha Narkhede
Great! Thanks Jun for helping with the release and everyone involved for
your contributions.

On Mon, Feb 2, 2015 at 1:32 PM, Joe Stein  wrote:

> Huzzah!
>
> Thanks Jun for preparing the release candidates and getting this out to the
> community.
>
> - Joe Stein
>
> On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao  wrote:
>
> > The following are the results of the votes.
> >
> > +1 binding = 3 votes
> > +1 non-binding = 1 votes
> > -1 = 0 votes
> > 0 = 0 votes
> >
> > The vote passes.
> >
> > I will release artifacts to maven central, update the dist svn and
> download
> > site. Will send out an announce after that.
> >
> > Thanks everyone that contributed to the work in 0.8.2.0!
> >
> > Jun
> >
> > On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao  wrote:
> >
> >> This is the third candidate for release of Apache Kafka 0.8.2.0.
> >>
> >> Release Notes for the 0.8.2.0 release
> >>
> >>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
> >>
> >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
> >>
> >> Kafka's KEYS file containing PGP keys we use to sign the release:
> >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> >> (SHA256) checksum.
> >>
> >> * Release artifacts to be voted upon (source and binary):
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
> >>
> >> * Maven artifacts to be voted upon prior to release:
> >> https://repository.apache.org/content/groups/staging/
> >>
> >> * scala-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
> >>
> >> * java-doc
> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
> >>
> >> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> >> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
> >>
> >> /***
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >  --
> > You received this message because you are subscribed to the Google Groups
> > "kafka-clients" group.
> > To unsubscribe from this group and stop receiving emails from it, send an
> > email to kafka-clients+unsubscr...@googlegroups.com.
> > To post to this group, send email to kafka-clie...@googlegroups.com.
> > Visit this group at http://groups.google.com/group/kafka-clients.
> > To view this discussion on the web visit
> >
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com
> > <
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com?utm_medium=email&utm_source=footer
> >
> > .
> >
> > For more options, visit https://groups.google.com/d/optout.
> >
>



-- 
Thanks,
Neha


Re: Any Java 7 compatibility issues for 0.8.1.1?

2015-02-02 Thread Mark Reddy
>
> I don't think there are any issues.
>

+1, I've been running Kafka with Java 7 for quite some time now and haven't
experienced any issues.


Regards,
Mark

On 2 February 2015 at 19:09, Otis Gospodnetic 
wrote:

> I don't think there are any issues.  We use 0.8.1.1 under Oracle Java 7.
>
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 5:02 AM, Yury Ruchin  wrote:
>
> > Hello,
> >
> > I wonder if there are any known issues with running Kafka 0.8.1.1 against
> > Oracle JDK 7? Any unsupported JVM options in startup scripts, runtime
> > issues, etc.?  I'm trying to understand how easy Kafka migration from
> JDK 6
> > to 7 would be.
> >
> > Thanks,
> > Yury
> >
>


Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Mathias Söderberg
Jun,

Yeah, sure, I'll take it for a spin tomorrow.

On Mon Feb 02 2015 at 11:08:42 PM Jun Rao  wrote:

> Mathias,
>
> Thanks for the info. I took a quick look. The biggest difference I saw is
> the org.xerial.snappy.SnappyNative.rawCompress() call. In 0.8.1.1, it uses
> about 0.05% of the CPU. In 0.8.2.0, it uses about 0.10% of the CPU. We did
> upgrade snappy from 1.0.5 in 0.8.1.1 to 1.1.1.6 in 0.8.2.0. Could you try
> to use the same version of snappy in 0.8.1.1 and 0.8.2.0 and rerun your
> tests?
>
> Jun
>
> On Mon, Feb 2, 2015 at 1:15 PM, Mathias Söderberg <
> mathias.soederb...@gmail.com> wrote:
>
> > Hi all,
> >
> > I ran the same hprof test on 0.8.1.1, and also did a re-run on
> > 0.8.2.0-rc2, attached logs from both runs. Both runs lasted for 30-40
> > minutes. The configurations used can be found over here:
> > https://gist.github.com/mthssdrbrg/5fcb9fbdb851d8cc66a2. The
> > configuration used for the first run (on 0.8.2-beta) can be found here:
> > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
> >
> > The cluster is running on EC2, currently on 4 m1.xlarge, with a ZooKeeper
> > cluster consisting of 3 nodes. There's 1248 topics, and a total of 4143
> > partitions, using replication factor 3.
> >
> > Let me know if there's any other information I can provide.
> >
> > Best regards,
> > Mathias
> >
> > On Mon Feb 02 2015 at 6:50:54 PM Jay Kreps  wrote:
> >
> >> Looking at that profile:
> >>
> >> Misc. blocking socket activity, not actual CPU work:
> >>1  5.24%  5.24%   67781 300447 java.net.PlainSocketImpl.socketAccept
> >>2  5.24% 10.49%   67775 300515 java.net.PlainSocketImpl.socketAccept
> >>3  5.24% 15.73%   67773 300567 java.net.PlainSocketImpl.socketAccept
> >>4  5.24% 20.97%   67683 301396 sun.nio.ch.ServerSocketChannelImpl.
> >> accept0
> >>5  5.23% 26.20%   67582 301395 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>6  5.23% 31.42%   67529 301519 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>7  5.13% 36.55%   66297 302447 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>8  3.95% 40.51%   51071 302446 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>9  3.65% 44.16%   47234 303479 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   10  3.64% 47.80%   47019 302444 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   11  3.62% 51.42%   46768 302445 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   12  3.27% 54.69%   42237 303475 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   13  3.16% 57.85%   40892 303476 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   14  3.14% 60.99%   40556 303478 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   15  3.05% 64.04%   39428 303480 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   16  2.68% 66.72%   34673 303477 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   17  1.23% 67.95%   15867 303520 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   18  0.98% 68.93%   12663 303541 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   19  0.92% 69.85%   11920 303536 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   20  0.85% 70.70%   11015 303546 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   21  0.82% 71.53%   10625 303534 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>   22  0.69% 72.21%8866 303512 sun.nio.ch.EPollArrayWrapper.
> epollWait
> >>
> >>   TransferTo to write data to socket, not actual CPU work (mostly)
> >>   23  0.36% 72.57%4592 304991 sun.nio.ch.FileDispatcherImpl.write0
> >>   24  0.36% 72.92%4589 305021 sun.nio.ch.FileDispatcherImpl.write0
> >>   25  0.35% 73.27%4503 304992 sun.nio.ch.FileDispatcherImpl.write0
> >>   26  0.32% 73.60%4198 305022 sun.nio.ch.FileDispatcherImpl.write0
> >>   27  0.25% 73.85%3250 305246 sun.nio.ch.FileDispatcherImpl.write0
> >>   28  0.25% 74.10%3249 305497 sun.nio.ch.FileDispatcherImpl.write0
> >>
> >>   Request channel - Actual CPU
> >>   29  0.22% 74.32%2862 305000 sun.misc.Unsafe.unpark
> >>   30  0.17% 74.49%2163 304838 sun.misc.Unsafe.unpark
> >>   31  0.14% 74.63%1795 305240 sun.misc.Unsafe.unpark
> >>
> >>   Purgatory - Actual CPU
> >>   32  0.12% 74.75%1553 305137 scala.collection.immutable.
> >> HashMap.$plus
> >>   33  0.12% 74.87%1546 305100 java.util.concurrent.
> >> ConcurrentHashMap.get
> >>   34  0.12% 74.98%1531 305181 java.util.concurrent.
> >> ConcurrentHashMap.get
> >>   35  0.12% 75.10%1526 305234 scala.collection.immutable.
> >> HashMap.$plus
> >>   36  0.12% 75.22%1521 305401 scala.collection.immutable.
> >> HashMap.$plus
> >>   37  0.12% 75.34%1519 305186 java.util.concurrent.
> >> ConcurrentHashMap.get
> >>   38  0.12% 75.46%1517 305264 java.util.concurrent.
> >> ConcurrentHashMap.get
> >>   39  0.12% 75.57%1514 305271 java.util.concurrent.
> >> ConcurrentHashMap.get
> >>   40  0.12% 75.69%1511 305250 scala.collection.immutable.
> >> HashMap.$plus
> >>   41  0.12% 75.81%1499 305155 java.util.concurrent.
> >> ConcurrentHashMap.get
> >>   42  0.12% 75.92%1496 305113 scala.collection.immutabl

Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jun Rao
Mathias,

Thanks for the info. I took a quick look. The biggest difference I saw is
the org.xerial.snappy.SnappyNative.rawCompress() call. In 0.8.1.1, it uses
about 0.05% of the CPU. In 0.8.2.0, it uses about 0.10% of the CPU. We did
upgrade snappy from 1.0.5 in 0.8.1.1 to 1.1.1.6 in 0.8.2.0. Could you try
to use the same version of snappy in 0.8.1.1 and 0.8.2.0 and rerun your
tests?

Jun

On Mon, Feb 2, 2015 at 1:15 PM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:

> Hi all,
>
> I ran the same hprof test on 0.8.1.1, and also did a re-run on
> 0.8.2.0-rc2, attached logs from both runs. Both runs lasted for 30-40
> minutes. The configurations used can be found over here:
> https://gist.github.com/mthssdrbrg/5fcb9fbdb851d8cc66a2. The
> configuration used for the first run (on 0.8.2-beta) can be found here:
> https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
>
> The cluster is running on EC2, currently on 4 m1.xlarge, with a ZooKeeper
> cluster consisting of 3 nodes. There's 1248 topics, and a total of 4143
> partitions, using replication factor 3.
>
> Let me know if there's any other information I can provide.
>
> Best regards,
> Mathias
>
> On Mon Feb 02 2015 at 6:50:54 PM Jay Kreps  wrote:
>
>> Looking at that profile:
>>
>> Misc. blocking socket activity, not actual CPU work:
>>1  5.24%  5.24%   67781 300447 java.net.PlainSocketImpl.socketAccept
>>2  5.24% 10.49%   67775 300515 java.net.PlainSocketImpl.socketAccept
>>3  5.24% 15.73%   67773 300567 java.net.PlainSocketImpl.socketAccept
>>4  5.24% 20.97%   67683 301396 sun.nio.ch.ServerSocketChannelImpl.
>> accept0
>>5  5.23% 26.20%   67582 301395 sun.nio.ch.EPollArrayWrapper.epollWait
>>6  5.23% 31.42%   67529 301519 sun.nio.ch.EPollArrayWrapper.epollWait
>>7  5.13% 36.55%   66297 302447 sun.nio.ch.EPollArrayWrapper.epollWait
>>8  3.95% 40.51%   51071 302446 sun.nio.ch.EPollArrayWrapper.epollWait
>>9  3.65% 44.16%   47234 303479 sun.nio.ch.EPollArrayWrapper.epollWait
>>   10  3.64% 47.80%   47019 302444 sun.nio.ch.EPollArrayWrapper.epollWait
>>   11  3.62% 51.42%   46768 302445 sun.nio.ch.EPollArrayWrapper.epollWait
>>   12  3.27% 54.69%   42237 303475 sun.nio.ch.EPollArrayWrapper.epollWait
>>   13  3.16% 57.85%   40892 303476 sun.nio.ch.EPollArrayWrapper.epollWait
>>   14  3.14% 60.99%   40556 303478 sun.nio.ch.EPollArrayWrapper.epollWait
>>   15  3.05% 64.04%   39428 303480 sun.nio.ch.EPollArrayWrapper.epollWait
>>   16  2.68% 66.72%   34673 303477 sun.nio.ch.EPollArrayWrapper.epollWait
>>   17  1.23% 67.95%   15867 303520 sun.nio.ch.EPollArrayWrapper.epollWait
>>   18  0.98% 68.93%   12663 303541 sun.nio.ch.EPollArrayWrapper.epollWait
>>   19  0.92% 69.85%   11920 303536 sun.nio.ch.EPollArrayWrapper.epollWait
>>   20  0.85% 70.70%   11015 303546 sun.nio.ch.EPollArrayWrapper.epollWait
>>   21  0.82% 71.53%   10625 303534 sun.nio.ch.EPollArrayWrapper.epollWait
>>   22  0.69% 72.21%8866 303512 sun.nio.ch.EPollArrayWrapper.epollWait
>>
>>   TransferTo to write data to socket, not actual CPU work (mostly)
>>   23  0.36% 72.57%4592 304991 sun.nio.ch.FileDispatcherImpl.write0
>>   24  0.36% 72.92%4589 305021 sun.nio.ch.FileDispatcherImpl.write0
>>   25  0.35% 73.27%4503 304992 sun.nio.ch.FileDispatcherImpl.write0
>>   26  0.32% 73.60%4198 305022 sun.nio.ch.FileDispatcherImpl.write0
>>   27  0.25% 73.85%3250 305246 sun.nio.ch.FileDispatcherImpl.write0
>>   28  0.25% 74.10%3249 305497 sun.nio.ch.FileDispatcherImpl.write0
>>
>>   Request channel - Actual CPU
>>   29  0.22% 74.32%2862 305000 sun.misc.Unsafe.unpark
>>   30  0.17% 74.49%2163 304838 sun.misc.Unsafe.unpark
>>   31  0.14% 74.63%1795 305240 sun.misc.Unsafe.unpark
>>
>>   Purgatory - Actual CPU
>>   32  0.12% 74.75%1553 305137 scala.collection.immutable.
>> HashMap.$plus
>>   33  0.12% 74.87%1546 305100 java.util.concurrent.
>> ConcurrentHashMap.get
>>   34  0.12% 74.98%1531 305181 java.util.concurrent.
>> ConcurrentHashMap.get
>>   35  0.12% 75.10%1526 305234 scala.collection.immutable.
>> HashMap.$plus
>>   36  0.12% 75.22%1521 305401 scala.collection.immutable.
>> HashMap.$plus
>>   37  0.12% 75.34%1519 305186 java.util.concurrent.
>> ConcurrentHashMap.get
>>   38  0.12% 75.46%1517 305264 java.util.concurrent.
>> ConcurrentHashMap.get
>>   39  0.12% 75.57%1514 305271 java.util.concurrent.
>> ConcurrentHashMap.get
>>   40  0.12% 75.69%1511 305250 scala.collection.immutable.
>> HashMap.$plus
>>   41  0.12% 75.81%1499 305155 java.util.concurrent.
>> ConcurrentHashMap.get
>>   42  0.12% 75.92%1496 305113 scala.collection.immutable.
>> HashMap.$plus
>>   43  0.12% 76.04%1496 305263 scala.collection.immutable.
>> HashMap.$plus
>>   44  0.11% 76.15%1480 305235 scala.collection.immutable.
>> HashMap.$plus
>>   45  0.11% 76.26%1444 305185 scala.collection.immutable.
>> HashMap.$plus
>>   46  0.11% 76.37%1428 305102 java.util.concurrent.
>> ConcurrentHashMap.

Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
I've been thinking about that too, since both Flume and Sqoop rely on
send(List) API of the old API.

I'd like to see this API come back, but I'm debating how we'd handle
errors. IIRC, the old API would fail an entire batch on a single
error, which can lead to duplicates. Having N callbacks lets me retry
/ save / whatever just the messages that had issues.

If messages had identifiers from the producer side, we could have the
API call the callback with a list of message-ids and their status. But
they don't :)

Any thoughts on how you'd like it to work?

Gwen


On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota  wrote:
> This is a great question Otis. Like Gwen said, you can accomplish Sync mode
> by setting the batch size to 1. But this does highlight a shortcoming of
> the new producer API.
>
> I really like the design of the new API and it has really great properties
> and I'm enjoying working with it. However, once API that I think we're
> lacking is a "batch" API. Currently, I have to iterate over a batch and
> call .send() on each record, which returns n callbacks instead of 1
> callback for the whole batch. This significantly complicates recovery logic
> where we need to commit a batch as opposed 1 record at a time.
>
> Do you guys have any plans to add better semantics around batches?
>
> On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira  wrote:
>
>> If I understood the code and Jay correctly - if you wait for the
>> future it will be a similar delay to that of the old sync producer.
>>
>> Put another way, if you test it out and see longer delays than the
>> sync producer had, we need to find out why and fix it.
>>
>> Gwen
>>
>> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
>>  wrote:
>> > Hi,
>> >
>> > Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
>> to
>> > anything external, calls Y via HTTPS.  So X has to decide what to do with
>> > its data based on Y's synchronous response.  It has to block until Y
>> > responds.  And it wouldn't be pretty, I think, because nobody wants to
>> run
>> > apps that talk to remove servers and hang on to connections more than
>> they
>> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
>> > guessing the delay would be more or less the same as if the Producer was
>> > using SYNC mode?" is YES, in which case the connection from X to Y would
>> be
>> > open for just as long as with a SYNC producer running in Y?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira 
>> wrote:
>> >
>> >> Can Y have a callback that will handle the notification to X?
>> >> In this case, perhaps Y can be async and X can buffer the data until
>> >> the callback triggers and says "all good" (or resend if the callback
>> >> indicates an error)
>> >>
>> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>> >>  wrote:
>> >> > Hi,
>> >> >
>> >> > Thanks for the info.  Here's the use case.  We have something up
>> stream
>> >> > sending data, say a log shipper called X.  It sends it to some remote
>> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
>> But Y
>> >> > needs to send a reply to X and tell it whether it successfully put all
>> >> its
>> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
>> locally
>> >> > and resend it later.
>> >> >
>> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
>> just
>> >> > need to wait for the Future to come back and only then send the
>> response
>> >> > back to X?  If so, I'm guessing the delay would be more or less the
>> same
>> >> as
>> >> > if the Producer was using SYNC mode?
>> >> >
>> >> > Thanks,
>> >> > Otis
>> >> > --
>> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> >> > Solr & Elasticsearch Support * http://sematext.com/
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps 
>> wrote:
>> >> >
>> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> >> >> configuration which does a lot of what async did in terms of allowing
>> >> >> batching:
>> >> >>
>> >> >> batch.size - This is the target amount of data per partition the
>> server
>> >> >> will attempt to batch together.
>> >> >> linger.ms - This is the time the producer will wait for more data
>> to be
>> >> >> sent to better batch up writes. The default is 0 (send immediately).
>> So
>> >> if
>> >> >> you set this to 50 ms the client will send immediately if it has
>> already
>> >> >> filled up its batch, otherwise it will wait to accumulate the number
>> of
>> >> >> bytes given by batch.size.
>> >> >>
>> >> >> To send asynchronously you do
>> >> >>producer.send(record)
>> >> >> whereas to block on a response you do
>> >> >>producer.send(record).get();
>> >> >> which will wait for acknowledgement from the server.
>> >> >>
>> >>

Re: New Producer - ONLY sync mode?

2015-02-02 Thread Pradeep Gollakota
This is a great question Otis. Like Gwen said, you can accomplish Sync mode
by setting the batch size to 1. But this does highlight a shortcoming of
the new producer API.

I really like the design of the new API and it has really great properties
and I'm enjoying working with it. However, once API that I think we're
lacking is a "batch" API. Currently, I have to iterate over a batch and
call .send() on each record, which returns n callbacks instead of 1
callback for the whole batch. This significantly complicates recovery logic
where we need to commit a batch as opposed 1 record at a time.

Do you guys have any plans to add better semantics around batches?

On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira  wrote:

> If I understood the code and Jay correctly - if you wait for the
> future it will be a similar delay to that of the old sync producer.
>
> Put another way, if you test it out and see longer delays than the
> sync producer had, we need to find out why and fix it.
>
> Gwen
>
> On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
>  wrote:
> > Hi,
> >
> > Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
> to
> > anything external, calls Y via HTTPS.  So X has to decide what to do with
> > its data based on Y's synchronous response.  It has to block until Y
> > responds.  And it wouldn't be pretty, I think, because nobody wants to
> run
> > apps that talk to remove servers and hang on to connections more than
> they
> > have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
> > guessing the delay would be more or less the same as if the Producer was
> > using SYNC mode?" is YES, in which case the connection from X to Y would
> be
> > open for just as long as with a SYNC producer running in Y?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira 
> wrote:
> >
> >> Can Y have a callback that will handle the notification to X?
> >> In this case, perhaps Y can be async and X can buffer the data until
> >> the callback triggers and says "all good" (or resend if the callback
> >> indicates an error)
> >>
> >> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
> >>  wrote:
> >> > Hi,
> >> >
> >> > Thanks for the info.  Here's the use case.  We have something up
> stream
> >> > sending data, say a log shipper called X.  It sends it to some remote
> >> > component Y.  Y is the Kafka Producer and it puts data into Kafka.
> But Y
> >> > needs to send a reply to X and tell it whether it successfully put all
> >> its
> >> > data into Kafka.  If it did not, Y wants to tell X to buffer data
> locally
> >> > and resend it later.
> >> >
> >> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would
> just
> >> > need to wait for the Future to come back and only then send the
> response
> >> > back to X?  If so, I'm guessing the delay would be more or less the
> same
> >> as
> >> > if the Producer was using SYNC mode?
> >> >
> >> > Thanks,
> >> > Otis
> >> > --
> >> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> >> > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps 
> wrote:
> >> >
> >> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> >> >> configuration which does a lot of what async did in terms of allowing
> >> >> batching:
> >> >>
> >> >> batch.size - This is the target amount of data per partition the
> server
> >> >> will attempt to batch together.
> >> >> linger.ms - This is the time the producer will wait for more data
> to be
> >> >> sent to better batch up writes. The default is 0 (send immediately).
> So
> >> if
> >> >> you set this to 50 ms the client will send immediately if it has
> already
> >> >> filled up its batch, otherwise it will wait to accumulate the number
> of
> >> >> bytes given by batch.size.
> >> >>
> >> >> To send asynchronously you do
> >> >>producer.send(record)
> >> >> whereas to block on a response you do
> >> >>producer.send(record).get();
> >> >> which will wait for acknowledgement from the server.
> >> >>
> >> >> One advantage of this model is that the client will do it's best to
> >> batch
> >> >> under the covers even if linger.ms=0. It will do this by batching
> any
> >> data
> >> >> that arrives while another send is in progress into a single
> >> >> request--giving a kind of "group commit" effect.
> >> >>
> >> >> The hope is that this will be both simpler to understand (a single
> api
> >> that
> >> >> always works the same) and more powerful (you always get a response
> with
> >> >> error and offset information whether or not you choose to use it).
> >> >>
> >> >> -Jay
> >> >>
> >> >>
> >> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira  >
> >> >> wrote:
> >> >>
> >> >> > If you want to emulate the old sync producer behavior, you need to
>

Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
If I understood the code and Jay correctly - if you wait for the
future it will be a similar delay to that of the old sync producer.

Put another way, if you test it out and see longer delays than the
sync producer had, we need to find out why and fix it.

Gwen

On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
 wrote:
> Hi,
>
> Nope, unfortunately it can't do that.  X is a remote app, doesn't listen to
> anything external, calls Y via HTTPS.  So X has to decide what to do with
> its data based on Y's synchronous response.  It has to block until Y
> responds.  And it wouldn't be pretty, I think, because nobody wants to run
> apps that talk to remove servers and hang on to connections more than they
> have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
> guessing the delay would be more or less the same as if the Producer was
> using SYNC mode?" is YES, in which case the connection from X to Y would be
> open for just as long as with a SYNC producer running in Y?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira  wrote:
>
>> Can Y have a callback that will handle the notification to X?
>> In this case, perhaps Y can be async and X can buffer the data until
>> the callback triggers and says "all good" (or resend if the callback
>> indicates an error)
>>
>> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>>  wrote:
>> > Hi,
>> >
>> > Thanks for the info.  Here's the use case.  We have something up stream
>> > sending data, say a log shipper called X.  It sends it to some remote
>> > component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
>> > needs to send a reply to X and tell it whether it successfully put all
>> its
>> > data into Kafka.  If it did not, Y wants to tell X to buffer data locally
>> > and resend it later.
>> >
>> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
>> > need to wait for the Future to come back and only then send the response
>> > back to X?  If so, I'm guessing the delay would be more or less the same
>> as
>> > if the Producer was using SYNC mode?
>> >
>> > Thanks,
>> > Otis
>> > --
>> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > Solr & Elasticsearch Support * http://sematext.com/
>> >
>> >
>> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps  wrote:
>> >
>> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> >> configuration which does a lot of what async did in terms of allowing
>> >> batching:
>> >>
>> >> batch.size - This is the target amount of data per partition the server
>> >> will attempt to batch together.
>> >> linger.ms - This is the time the producer will wait for more data to be
>> >> sent to better batch up writes. The default is 0 (send immediately). So
>> if
>> >> you set this to 50 ms the client will send immediately if it has already
>> >> filled up its batch, otherwise it will wait to accumulate the number of
>> >> bytes given by batch.size.
>> >>
>> >> To send asynchronously you do
>> >>producer.send(record)
>> >> whereas to block on a response you do
>> >>producer.send(record).get();
>> >> which will wait for acknowledgement from the server.
>> >>
>> >> One advantage of this model is that the client will do it's best to
>> batch
>> >> under the covers even if linger.ms=0. It will do this by batching any
>> data
>> >> that arrives while another send is in progress into a single
>> >> request--giving a kind of "group commit" effect.
>> >>
>> >> The hope is that this will be both simpler to understand (a single api
>> that
>> >> always works the same) and more powerful (you always get a response with
>> >> error and offset information whether or not you choose to use it).
>> >>
>> >> -Jay
>> >>
>> >>
>> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
>> >> wrote:
>> >>
>> >> > If you want to emulate the old sync producer behavior, you need to set
>> >> > the batch size to 1  (in producer config) and wait on the future you
>> >> > get from Send (i.e. future.get)
>> >> >
>> >> > I can't think of good reasons to do so, though.
>> >> >
>> >> > Gwen
>> >> >
>> >> >
>> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >> >  wrote:
>> >> > > Hi,
>> >> > >
>> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
>> >> because
>> >> > > of this info from the Wiki:
>> >> > >
>> >> > >
>> >> > >- The producer will always attempt to batch data and will always
>> >> > >immediately return a SendResponse which acts as a Future to allow
>> >> the
>> >> > >client to await the completion of the request.
>> >> > >
>> >> > >
>> >> > > The word "always" makes me think there will be no sync mode.
>> >> > >
>> >> > > Thanks,
>> >> > > Otis
>> >> > > --
>> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
>> Management
>> >> > > Solr & Elasticsearch Supp

Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-02 Thread Joe Stein
Huzzah!

Thanks Jun for preparing the release candidates and getting this out to the
community.

- Joe Stein

On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao  wrote:

> The following are the results of the votes.
>
> +1 binding = 3 votes
> +1 non-binding = 1 votes
> -1 = 0 votes
> 0 = 0 votes
>
> The vote passes.
>
> I will release artifacts to maven central, update the dist svn and download
> site. Will send out an announce after that.
>
> Thanks everyone that contributed to the work in 0.8.2.0!
>
> Jun
>
> On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao  wrote:
>
>> This is the third candidate for release of Apache Kafka 0.8.2.0.
>>
>> Release Notes for the 0.8.2.0 release
>>
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>>
>> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
>> (SHA256) checksum.
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>>
>> * Maven artifacts to be voted upon prior to release:
>> https://repository.apache.org/content/groups/staging/
>>
>> * scala-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>>
>> * java-doc
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>>
>> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>>
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
>> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>>
>> /***
>>
>> Thanks,
>>
>> Jun
>>
>>
>  --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at http://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com
> 
> .
>
> For more options, visit https://groups.google.com/d/optout.
>


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Otis Gospodnetic
Hi,

Nope, unfortunately it can't do that.  X is a remote app, doesn't listen to
anything external, calls Y via HTTPS.  So X has to decide what to do with
its data based on Y's synchronous response.  It has to block until Y
responds.  And it wouldn't be pretty, I think, because nobody wants to run
apps that talk to remove servers and hang on to connections more than they
have to.  But perhaps that is the only way?  Or maybe the answer to "I'm
guessing the delay would be more or less the same as if the Producer was
using SYNC mode?" is YES, in which case the connection from X to Y would be
open for just as long as with a SYNC producer running in Y?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira  wrote:

> Can Y have a callback that will handle the notification to X?
> In this case, perhaps Y can be async and X can buffer the data until
> the callback triggers and says "all good" (or resend if the callback
> indicates an error)
>
> On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
>  wrote:
> > Hi,
> >
> > Thanks for the info.  Here's the use case.  We have something up stream
> > sending data, say a log shipper called X.  It sends it to some remote
> > component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
> > needs to send a reply to X and tell it whether it successfully put all
> its
> > data into Kafka.  If it did not, Y wants to tell X to buffer data locally
> > and resend it later.
> >
> > If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
> > need to wait for the Future to come back and only then send the response
> > back to X?  If so, I'm guessing the delay would be more or less the same
> as
> > if the Producer was using SYNC mode?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
> > On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps  wrote:
> >
> >> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> >> configuration which does a lot of what async did in terms of allowing
> >> batching:
> >>
> >> batch.size - This is the target amount of data per partition the server
> >> will attempt to batch together.
> >> linger.ms - This is the time the producer will wait for more data to be
> >> sent to better batch up writes. The default is 0 (send immediately). So
> if
> >> you set this to 50 ms the client will send immediately if it has already
> >> filled up its batch, otherwise it will wait to accumulate the number of
> >> bytes given by batch.size.
> >>
> >> To send asynchronously you do
> >>producer.send(record)
> >> whereas to block on a response you do
> >>producer.send(record).get();
> >> which will wait for acknowledgement from the server.
> >>
> >> One advantage of this model is that the client will do it's best to
> batch
> >> under the covers even if linger.ms=0. It will do this by batching any
> data
> >> that arrives while another send is in progress into a single
> >> request--giving a kind of "group commit" effect.
> >>
> >> The hope is that this will be both simpler to understand (a single api
> that
> >> always works the same) and more powerful (you always get a response with
> >> error and offset information whether or not you choose to use it).
> >>
> >> -Jay
> >>
> >>
> >> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
> >> wrote:
> >>
> >> > If you want to emulate the old sync producer behavior, you need to set
> >> > the batch size to 1  (in producer config) and wait on the future you
> >> > get from Send (i.e. future.get)
> >> >
> >> > I can't think of good reasons to do so, though.
> >> >
> >> > Gwen
> >> >
> >> >
> >> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >> >  wrote:
> >> > > Hi,
> >> > >
> >> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
> >> because
> >> > > of this info from the Wiki:
> >> > >
> >> > >
> >> > >- The producer will always attempt to batch data and will always
> >> > >immediately return a SendResponse which acts as a Future to allow
> >> the
> >> > >client to await the completion of the request.
> >> > >
> >> > >
> >> > > The word "always" makes me think there will be no sync mode.
> >> > >
> >> > > Thanks,
> >> > > Otis
> >> > > --
> >> > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> >> > > Solr & Elasticsearch Support * http://sematext.com/
> >> >
> >>
>


Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Mathias Söderberg
 76.85%1165 305661 org.xerial.snappy.
> SnappyNative.rawCompress
>   52  0.09% 76.94%1152 307069 org.xerial.snappy.
> SnappyNative.rawCompress
>   53  0.09% 77.02%1121 305997 org.xerial.snappy.
> SnappyNative.rawCompress
>   54  0.09% 77.11%1117 307035 org.xerial.snappy.
> SnappyNative.rawCompress
>   55  0.09% 77.19%1106 306190 org.xerial.snappy.
> SnappyNative.rawCompress
>
> On Mon, Feb 2, 2015 at 9:39 AM, Jay Kreps  wrote:
>
> > Ah, yeah, you're right. That is just wait time not CPU time. We should
> > check that profile it must be something else on the list.
> >
> > -Jay
> >
> > On Mon, Feb 2, 2015 at 9:33 AM, Jun Rao  wrote:
> >
> >> Hi, Mathias,
> >>
> >> From the hprof output, it seems that the top CPU consumers are
> >> socketAccept() and epollWait(). As far as I am aware, there hasn't been
> >> any
> >> significant changes in the socket server code btw 0.8.1 and 0.8.2. Could
> >> you run the same hprof test on 0.8.1 so that we can see the difference?
> >>
> >> Jaikiran,
> >>
> >> The fix you provided in probably unnecessary. The channel that we use in
> >> SimpleConsumer (BlockingChannel) is configured to be blocking. So even
> >> though the read from the socket is in a loop, each read blocks if there
> is
> >> no bytes received from the broker. So, that shouldn't cause extra CPU
> >> consumption.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >> On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg <
> >> mathias.soederb...@gmail.com> wrote:
> >>
> >> > Hi Neha,
> >> >
> >> > I sent an e-mail earlier today, but noticed now that it didn't
> actually
> >> go
> >> > through.
> >> >
> >> > Anyhow, I've attached two files, one with output from a 10 minute run
> >> and
> >> > one with output from a 30 minute run. Realized that maybe I should've
> >> done
> >> > one or two runs with 0.8.1.1 as well, but nevertheless.
> >> >
> >> > I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same
> >> CPU
> >> > usage as with the beta version (basically pegging all cores). If I
> >> manage
> >> > to find the time I'll do another run with hprof on the rc2 version
> later
> >> > today.
> >> >
> >> > Best regards,
> >> > Mathias
> >> >
> >> > On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede 
> >> wrote:
> >> >
> >> >> The following should be sufficient
> >> >>
> >> >> java
> >> >> -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
> >> >> y,thread=y,file=kafka.hprof
> >> >> 
> >> >>
> >> >> You would need to start the Kafka server with the settings above for
> >> >> sometime until you observe the problem.
> >> >>
> >> >> On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
> >> >> mathias.soederb...@gmail.com> wrote:
> >> >>
> >> >> > Hi Neha,
> >> >> >
> >> >> > Yeah sure. I'm not familiar with hprof, so any particular options I
> >> >> should
> >> >> > include or just run with defaults?
> >> >> >
> >> >> > Best regards,
> >> >> > Mathias
> >> >> >
> >> >> > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede 
> >> >> wrote:
> >> >> >
> >> >> > > Thanks for reporting the issue. Would you mind running hprof and
> >> >> sending
> >> >> > > the output?
> >> >> > >
> >> >> > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
> >> >> > > mathias.soederb...@gmail.com> wrote:
> >> >> > >
> >> >> > > > Good day,
> >> >> > > >
> >> >> > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
> >> noticed
> >> >> > that
> >> >> > > > the CPU usage on the broker machines went up by roughly 40%,
> from
> >> >> ~60%
> >> >> > to
> >> >> > > > ~100% and am wondering if anyone else has experienced something
> >> >> > similar?
> >> >> > > > The load average also went up by 2x-3x.
> >> >> > > >
> >> >> > > > We're running on EC2 and the cluster currently consists of four
> >> >> > > m1.xlarge,
> >> >> > > > with roughly 1100 topics / 4000 partitions. Using Java 7
> >> (1.7.0_65
> >> >> to
> >> >> > be
> >> >> > > > exact) and Scala 2.9.2. Configurations can be found over here:
> >> >> > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
> >> >> > > >
> >> >> > > > I'm assuming that this is not expected behaviour for
> 0.8.2-beta?
> >> >> > > >
> >> >> > > > Best regards,
> >> >> > > > Mathias
> >> >> > > >
> >> >> > >
> >> >> > >
> >> >> > >
> >> >> > > --
> >> >> > > Thanks,
> >> >> > > Neha
> >> >> > >
> >> >> >
> >> >>
> >> >>
> >> >>
> >> >> --
> >> >> Thanks,
> >> >> Neha
> >> >>
> >> >
> >>
> >
> >
>


kafka-0.8.1.1-20150202.hprof.gz
Description: GNU Zip compressed data


kafka-0.8.2.0-20150202.hprof.gz
Description: GNU Zip compressed data


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
Can Y have a callback that will handle the notification to X?
In this case, perhaps Y can be async and X can buffer the data until
the callback triggers and says "all good" (or resend if the callback
indicates an error)

On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
 wrote:
> Hi,
>
> Thanks for the info.  Here's the use case.  We have something up stream
> sending data, say a log shipper called X.  It sends it to some remote
> component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
> needs to send a reply to X and tell it whether it successfully put all its
> data into Kafka.  If it did not, Y wants to tell X to buffer data locally
> and resend it later.
>
> If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
> need to wait for the Future to come back and only then send the response
> back to X?  If so, I'm guessing the delay would be more or less the same as
> if the Producer was using SYNC mode?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>
>
> On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps  wrote:
>
>> Yeah as Gwen says there is no sync/async mode anymore. There is a new
>> configuration which does a lot of what async did in terms of allowing
>> batching:
>>
>> batch.size - This is the target amount of data per partition the server
>> will attempt to batch together.
>> linger.ms - This is the time the producer will wait for more data to be
>> sent to better batch up writes. The default is 0 (send immediately). So if
>> you set this to 50 ms the client will send immediately if it has already
>> filled up its batch, otherwise it will wait to accumulate the number of
>> bytes given by batch.size.
>>
>> To send asynchronously you do
>>producer.send(record)
>> whereas to block on a response you do
>>producer.send(record).get();
>> which will wait for acknowledgement from the server.
>>
>> One advantage of this model is that the client will do it's best to batch
>> under the covers even if linger.ms=0. It will do this by batching any data
>> that arrives while another send is in progress into a single
>> request--giving a kind of "group commit" effect.
>>
>> The hope is that this will be both simpler to understand (a single api that
>> always works the same) and more powerful (you always get a response with
>> error and offset information whether or not you choose to use it).
>>
>> -Jay
>>
>>
>> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
>> wrote:
>>
>> > If you want to emulate the old sync producer behavior, you need to set
>> > the batch size to 1  (in producer config) and wait on the future you
>> > get from Send (i.e. future.get)
>> >
>> > I can't think of good reasons to do so, though.
>> >
>> > Gwen
>> >
>> >
>> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>> >  wrote:
>> > > Hi,
>> > >
>> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
>> because
>> > > of this info from the Wiki:
>> > >
>> > >
>> > >- The producer will always attempt to batch data and will always
>> > >immediately return a SendResponse which acts as a Future to allow
>> the
>> > >client to await the completion of the request.
>> > >
>> > >
>> > > The word "always" makes me think there will be no sync mode.
>> > >
>> > > Thanks,
>> > > Otis
>> > > --
>> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
>> > > Solr & Elasticsearch Support * http://sematext.com/
>> >
>>


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Otis Gospodnetic
Hi,

Thanks for the info.  Here's the use case.  We have something up stream
sending data, say a log shipper called X.  It sends it to some remote
component Y.  Y is the Kafka Producer and it puts data into Kafka.  But Y
needs to send a reply to X and tell it whether it successfully put all its
data into Kafka.  If it did not, Y wants to tell X to buffer data locally
and resend it later.

If producer is ONLY async, Y can't easily do that.  Or maybe Y would just
need to wait for the Future to come back and only then send the response
back to X?  If so, I'm guessing the delay would be more or less the same as
if the Producer was using SYNC mode?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps  wrote:

> Yeah as Gwen says there is no sync/async mode anymore. There is a new
> configuration which does a lot of what async did in terms of allowing
> batching:
>
> batch.size - This is the target amount of data per partition the server
> will attempt to batch together.
> linger.ms - This is the time the producer will wait for more data to be
> sent to better batch up writes. The default is 0 (send immediately). So if
> you set this to 50 ms the client will send immediately if it has already
> filled up its batch, otherwise it will wait to accumulate the number of
> bytes given by batch.size.
>
> To send asynchronously you do
>producer.send(record)
> whereas to block on a response you do
>producer.send(record).get();
> which will wait for acknowledgement from the server.
>
> One advantage of this model is that the client will do it's best to batch
> under the covers even if linger.ms=0. It will do this by batching any data
> that arrives while another send is in progress into a single
> request--giving a kind of "group commit" effect.
>
> The hope is that this will be both simpler to understand (a single api that
> always works the same) and more powerful (you always get a response with
> error and offset information whether or not you choose to use it).
>
> -Jay
>
>
> On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
> wrote:
>
> > If you want to emulate the old sync producer behavior, you need to set
> > the batch size to 1  (in producer config) and wait on the future you
> > get from Send (i.e. future.get)
> >
> > I can't think of good reasons to do so, though.
> >
> > Gwen
> >
> >
> > On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
> >  wrote:
> > > Hi,
> > >
> > > Is the plan for New Producer to have ONLY async mode?  I'm asking
> because
> > > of this info from the Wiki:
> > >
> > >
> > >- The producer will always attempt to batch data and will always
> > >immediately return a SendResponse which acts as a Future to allow
> the
> > >client to await the completion of the request.
> > >
> > >
> > > The word "always" makes me think there will be no sync mode.
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> >
>


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Jay Kreps
Yeah as Gwen says there is no sync/async mode anymore. There is a new
configuration which does a lot of what async did in terms of allowing
batching:

batch.size - This is the target amount of data per partition the server
will attempt to batch together.
linger.ms - This is the time the producer will wait for more data to be
sent to better batch up writes. The default is 0 (send immediately). So if
you set this to 50 ms the client will send immediately if it has already
filled up its batch, otherwise it will wait to accumulate the number of
bytes given by batch.size.

To send asynchronously you do
   producer.send(record)
whereas to block on a response you do
   producer.send(record).get();
which will wait for acknowledgement from the server.

One advantage of this model is that the client will do it's best to batch
under the covers even if linger.ms=0. It will do this by batching any data
that arrives while another send is in progress into a single
request--giving a kind of "group commit" effect.

The hope is that this will be both simpler to understand (a single api that
always works the same) and more powerful (you always get a response with
error and offset information whether or not you choose to use it).

-Jay


On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira  wrote:

> If you want to emulate the old sync producer behavior, you need to set
> the batch size to 1  (in producer config) and wait on the future you
> get from Send (i.e. future.get)
>
> I can't think of good reasons to do so, though.
>
> Gwen
>
>
> On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
>  wrote:
> > Hi,
> >
> > Is the plan for New Producer to have ONLY async mode?  I'm asking because
> > of this info from the Wiki:
> >
> >
> >- The producer will always attempt to batch data and will always
> >immediately return a SendResponse which acts as a Future to allow the
> >client to await the completion of the request.
> >
> >
> > The word "always" makes me think there will be no sync mode.
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
>


Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-02 Thread Jun Rao
The following are the results of the votes.

+1 binding = 3 votes
+1 non-binding = 1 votes
-1 = 0 votes
0 = 0 votes

The vote passes.

I will release artifacts to maven central, update the dist svn and download
site. Will send out an announce after that.

Thanks everyone that contributed to the work in 0.8.2.0!

Jun

On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao  wrote:

> This is the third candidate for release of Apache Kafka 0.8.2.0.
>
> Release Notes for the 0.8.2.0 release
>
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
> (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>
> * Maven artifacts to be voted upon prior to release:
> https://repository.apache.org/content/groups/staging/
>
> * scala-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/
>
> * java-doc
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea
> (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4)
>
> /***
>
> Thanks,
>
> Jun
>
>


Re: New Producer - ONLY sync mode?

2015-02-02 Thread Gwen Shapira
If you want to emulate the old sync producer behavior, you need to set
the batch size to 1  (in producer config) and wait on the future you
get from Send (i.e. future.get)

I can't think of good reasons to do so, though.

Gwen


On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
 wrote:
> Hi,
>
> Is the plan for New Producer to have ONLY async mode?  I'm asking because
> of this info from the Wiki:
>
>
>- The producer will always attempt to batch data and will always
>immediately return a SendResponse which acts as a Future to allow the
>client to await the completion of the request.
>
>
> The word "always" makes me think there will be no sync mode.
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/


Re: Any Java 7 compatibility issues for 0.8.1.1?

2015-02-02 Thread Otis Gospodnetic
I don't think there are any issues.  We use 0.8.1.1 under Oracle Java 7.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Feb 2, 2015 at 5:02 AM, Yury Ruchin  wrote:

> Hello,
>
> I wonder if there are any known issues with running Kafka 0.8.1.1 against
> Oracle JDK 7? Any unsupported JVM options in startup scripts, runtime
> issues, etc.?  I'm trying to understand how easy Kafka migration from JDK 6
> to 7 would be.
>
> Thanks,
> Yury
>


New Producer - ONLY sync mode?

2015-02-02 Thread Otis Gospodnetic
Hi,

Is the plan for New Producer to have ONLY async mode?  I'm asking because
of this info from the Wiki:


   - The producer will always attempt to batch data and will always
   immediately return a SendResponse which acts as a Future to allow the
   client to await the completion of the request.


The word "always" makes me think there will be no sync mode.

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jay Kreps
Looking at that profile:

Misc. blocking socket activity, not actual CPU work:
   1  5.24%  5.24%   67781 300447 java.net.PlainSocketImpl.socketAccept
   2  5.24% 10.49%   67775 300515 java.net.PlainSocketImpl.socketAccept
   3  5.24% 15.73%   67773 300567 java.net.PlainSocketImpl.socketAccept
   4  5.24% 20.97%   67683 301396 sun.nio.ch.ServerSocketChannelImpl.accept0
   5  5.23% 26.20%   67582 301395 sun.nio.ch.EPollArrayWrapper.epollWait
   6  5.23% 31.42%   67529 301519 sun.nio.ch.EPollArrayWrapper.epollWait
   7  5.13% 36.55%   66297 302447 sun.nio.ch.EPollArrayWrapper.epollWait
   8  3.95% 40.51%   51071 302446 sun.nio.ch.EPollArrayWrapper.epollWait
   9  3.65% 44.16%   47234 303479 sun.nio.ch.EPollArrayWrapper.epollWait
  10  3.64% 47.80%   47019 302444 sun.nio.ch.EPollArrayWrapper.epollWait
  11  3.62% 51.42%   46768 302445 sun.nio.ch.EPollArrayWrapper.epollWait
  12  3.27% 54.69%   42237 303475 sun.nio.ch.EPollArrayWrapper.epollWait
  13  3.16% 57.85%   40892 303476 sun.nio.ch.EPollArrayWrapper.epollWait
  14  3.14% 60.99%   40556 303478 sun.nio.ch.EPollArrayWrapper.epollWait
  15  3.05% 64.04%   39428 303480 sun.nio.ch.EPollArrayWrapper.epollWait
  16  2.68% 66.72%   34673 303477 sun.nio.ch.EPollArrayWrapper.epollWait
  17  1.23% 67.95%   15867 303520 sun.nio.ch.EPollArrayWrapper.epollWait
  18  0.98% 68.93%   12663 303541 sun.nio.ch.EPollArrayWrapper.epollWait
  19  0.92% 69.85%   11920 303536 sun.nio.ch.EPollArrayWrapper.epollWait
  20  0.85% 70.70%   11015 303546 sun.nio.ch.EPollArrayWrapper.epollWait
  21  0.82% 71.53%   10625 303534 sun.nio.ch.EPollArrayWrapper.epollWait
  22  0.69% 72.21%8866 303512 sun.nio.ch.EPollArrayWrapper.epollWait

  TransferTo to write data to socket, not actual CPU work (mostly)
  23  0.36% 72.57%4592 304991 sun.nio.ch.FileDispatcherImpl.write0
  24  0.36% 72.92%4589 305021 sun.nio.ch.FileDispatcherImpl.write0
  25  0.35% 73.27%4503 304992 sun.nio.ch.FileDispatcherImpl.write0
  26  0.32% 73.60%4198 305022 sun.nio.ch.FileDispatcherImpl.write0
  27  0.25% 73.85%3250 305246 sun.nio.ch.FileDispatcherImpl.write0
  28  0.25% 74.10%3249 305497 sun.nio.ch.FileDispatcherImpl.write0

  Request channel - Actual CPU
  29  0.22% 74.32%2862 305000 sun.misc.Unsafe.unpark
  30  0.17% 74.49%2163 304838 sun.misc.Unsafe.unpark
  31  0.14% 74.63%1795 305240 sun.misc.Unsafe.unpark

  Purgatory - Actual CPU
  32  0.12% 74.75%1553 305137 scala.collection.immutable.HashMap.$plus
  33  0.12% 74.87%1546 305100 java.util.concurrent.ConcurrentHashMap.get
  34  0.12% 74.98%1531 305181 java.util.concurrent.ConcurrentHashMap.get
  35  0.12% 75.10%1526 305234 scala.collection.immutable.HashMap.$plus
  36  0.12% 75.22%1521 305401 scala.collection.immutable.HashMap.$plus
  37  0.12% 75.34%1519 305186 java.util.concurrent.ConcurrentHashMap.get
  38  0.12% 75.46%1517 305264 java.util.concurrent.ConcurrentHashMap.get
  39  0.12% 75.57%1514 305271 java.util.concurrent.ConcurrentHashMap.get
  40  0.12% 75.69%1511 305250 scala.collection.immutable.HashMap.$plus
  41  0.12% 75.81%1499 305155 java.util.concurrent.ConcurrentHashMap.get
  42  0.12% 75.92%1496 305113 scala.collection.immutable.HashMap.$plus
  43  0.12% 76.04%1496 305263 scala.collection.immutable.HashMap.$plus
  44  0.11% 76.15%1480 305235 scala.collection.immutable.HashMap.$plus
  45  0.11% 76.26%1444 305185 scala.collection.immutable.HashMap.$plus
  46  0.11% 76.37%1428 305102 java.util.concurrent.ConcurrentHashMap.get
  47  0.11% 76.48%1418 305320 java.util.concurrent.ConcurrentHashMap.get

  Compression - Actual CPU
  48  0.09% 76.58%1187 306546 org.xerial.snappy.SnappyNative.rawCompress
  49  0.09% 76.67%1174 305584 org.xerial.snappy.SnappyNative.rawCompress
  50  0.09% 76.76%1173 305545 org.xerial.snappy.SnappyNative.rawCompress
  51  0.09% 76.85%1165 305661 org.xerial.snappy.SnappyNative.rawCompress
  52  0.09% 76.94%1152 307069 org.xerial.snappy.SnappyNative.rawCompress
  53  0.09% 77.02%1121 305997 org.xerial.snappy.SnappyNative.rawCompress
  54  0.09% 77.11%1117 307035 org.xerial.snappy.SnappyNative.rawCompress
  55  0.09% 77.19%1106 306190 org.xerial.snappy.SnappyNative.rawCompress

On Mon, Feb 2, 2015 at 9:39 AM, Jay Kreps  wrote:

> Ah, yeah, you're right. That is just wait time not CPU time. We should
> check that profile it must be something else on the list.
>
> -Jay
>
> On Mon, Feb 2, 2015 at 9:33 AM, Jun Rao  wrote:
>
>> Hi, Mathias,
>>
>> From the hprof output, it seems that the top CPU consumers are
>> socketAccept() and epollWait(). As far as I am aware, there hasn't been
>> any
>> significant changes in the socket server code btw 0.8.1 and 0.8.2. Could
>> you run the same hprof test on 0.8.1 so that we can see the difference?
>>
>> Jaikiran,
>>
>> The fix you provided in probably unnecessary. The channel that we use in
>> SimpleConsumer (BlockingChannel) is configured to be blocking. So 

Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jay Kreps
Ah, yeah, you're right. That is just wait time not CPU time. We should
check that profile it must be something else on the list.

-Jay

On Mon, Feb 2, 2015 at 9:33 AM, Jun Rao  wrote:

> Hi, Mathias,
>
> From the hprof output, it seems that the top CPU consumers are
> socketAccept() and epollWait(). As far as I am aware, there hasn't been any
> significant changes in the socket server code btw 0.8.1 and 0.8.2. Could
> you run the same hprof test on 0.8.1 so that we can see the difference?
>
> Jaikiran,
>
> The fix you provided in probably unnecessary. The channel that we use in
> SimpleConsumer (BlockingChannel) is configured to be blocking. So even
> though the read from the socket is in a loop, each read blocks if there is
> no bytes received from the broker. So, that shouldn't cause extra CPU
> consumption.
>
> Thanks,
>
> Jun
>
> On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg <
> mathias.soederb...@gmail.com> wrote:
>
> > Hi Neha,
> >
> > I sent an e-mail earlier today, but noticed now that it didn't actually
> go
> > through.
> >
> > Anyhow, I've attached two files, one with output from a 10 minute run and
> > one with output from a 30 minute run. Realized that maybe I should've
> done
> > one or two runs with 0.8.1.1 as well, but nevertheless.
> >
> > I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same
> CPU
> > usage as with the beta version (basically pegging all cores). If I manage
> > to find the time I'll do another run with hprof on the rc2 version later
> > today.
> >
> > Best regards,
> > Mathias
> >
> > On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede 
> wrote:
> >
> >> The following should be sufficient
> >>
> >> java
> >> -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
> >> y,thread=y,file=kafka.hprof
> >> 
> >>
> >> You would need to start the Kafka server with the settings above for
> >> sometime until you observe the problem.
> >>
> >> On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
> >> mathias.soederb...@gmail.com> wrote:
> >>
> >> > Hi Neha,
> >> >
> >> > Yeah sure. I'm not familiar with hprof, so any particular options I
> >> should
> >> > include or just run with defaults?
> >> >
> >> > Best regards,
> >> > Mathias
> >> >
> >> > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede 
> >> wrote:
> >> >
> >> > > Thanks for reporting the issue. Would you mind running hprof and
> >> sending
> >> > > the output?
> >> > >
> >> > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
> >> > > mathias.soederb...@gmail.com> wrote:
> >> > >
> >> > > > Good day,
> >> > > >
> >> > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
> noticed
> >> > that
> >> > > > the CPU usage on the broker machines went up by roughly 40%, from
> >> ~60%
> >> > to
> >> > > > ~100% and am wondering if anyone else has experienced something
> >> > similar?
> >> > > > The load average also went up by 2x-3x.
> >> > > >
> >> > > > We're running on EC2 and the cluster currently consists of four
> >> > > m1.xlarge,
> >> > > > with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65
> >> to
> >> > be
> >> > > > exact) and Scala 2.9.2. Configurations can be found over here:
> >> > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
> >> > > >
> >> > > > I'm assuming that this is not expected behaviour for 0.8.2-beta?
> >> > > >
> >> > > > Best regards,
> >> > > > Mathias
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > Thanks,
> >> > > Neha
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> Thanks,
> >> Neha
> >>
> >
>


Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jun Rao
Hi, Mathias,

>From the hprof output, it seems that the top CPU consumers are
socketAccept() and epollWait(). As far as I am aware, there hasn't been any
significant changes in the socket server code btw 0.8.1 and 0.8.2. Could
you run the same hprof test on 0.8.1 so that we can see the difference?

Jaikiran,

The fix you provided in probably unnecessary. The channel that we use in
SimpleConsumer (BlockingChannel) is configured to be blocking. So even
though the read from the socket is in a loop, each read blocks if there is
no bytes received from the broker. So, that shouldn't cause extra CPU
consumption.

Thanks,

Jun

On Mon, Jan 26, 2015 at 10:05 AM, Mathias Söderberg <
mathias.soederb...@gmail.com> wrote:

> Hi Neha,
>
> I sent an e-mail earlier today, but noticed now that it didn't actually go
> through.
>
> Anyhow, I've attached two files, one with output from a 10 minute run and
> one with output from a 30 minute run. Realized that maybe I should've done
> one or two runs with 0.8.1.1 as well, but nevertheless.
>
> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same CPU
> usage as with the beta version (basically pegging all cores). If I manage
> to find the time I'll do another run with hprof on the rc2 version later
> today.
>
> Best regards,
> Mathias
>
> On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede  wrote:
>
>> The following should be sufficient
>>
>> java
>> -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=
>> y,thread=y,file=kafka.hprof
>> 
>>
>> You would need to start the Kafka server with the settings above for
>> sometime until you observe the problem.
>>
>> On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
>> mathias.soederb...@gmail.com> wrote:
>>
>> > Hi Neha,
>> >
>> > Yeah sure. I'm not familiar with hprof, so any particular options I
>> should
>> > include or just run with defaults?
>> >
>> > Best regards,
>> > Mathias
>> >
>> > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede 
>> wrote:
>> >
>> > > Thanks for reporting the issue. Would you mind running hprof and
>> sending
>> > > the output?
>> > >
>> > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
>> > > mathias.soederb...@gmail.com> wrote:
>> > >
>> > > > Good day,
>> > > >
>> > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and noticed
>> > that
>> > > > the CPU usage on the broker machines went up by roughly 40%, from
>> ~60%
>> > to
>> > > > ~100% and am wondering if anyone else has experienced something
>> > similar?
>> > > > The load average also went up by 2x-3x.
>> > > >
>> > > > We're running on EC2 and the cluster currently consists of four
>> > > m1.xlarge,
>> > > > with roughly 1100 topics / 4000 partitions. Using Java 7 (1.7.0_65
>> to
>> > be
>> > > > exact) and Scala 2.9.2. Configurations can be found over here:
>> > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
>> > > >
>> > > > I'm assuming that this is not expected behaviour for 0.8.2-beta?
>> > > >
>> > > > Best regards,
>> > > > Mathias
>> > > >
>> > >
>> > >
>> > >
>> > > --
>> > > Thanks,
>> > > Neha
>> > >
>> >
>>
>>
>>
>> --
>> Thanks,
>> Neha
>>
>


Re: Increased CPU usage with 0.8.2-beta

2015-02-02 Thread Jay Kreps
Actually that fetch call blocks on the server side. That is, if there is no
data, the server will wait until data arrives or the timeout occurs to send
a response. This is done to help simplify the client development. If that
isn't happening it is likely a bug or a configuration change in the timeout.

I think we should try to ascertain how widespread this issue is, it could
be pretty serious if it is always happening.

Mattias, could you share your server configuration?

-Jay

On Sun, Feb 1, 2015 at 11:17 PM, Jaikiran Pai 
wrote:

> Hi Mathias,
>
> Looking at that thread dump, I think the potential culprit is this one:
>
> TRACE 303545: (thread=200049)
> sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown
> line)
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
> sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
> sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
> kafka.utils.Utils$.read(Utils.scala:380)
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.
> scala:67)
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> kafka.network.BoundedByteBufferReceive.readCompletely(
> BoundedByteBufferReceive.scala:29)
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$
> $sendRequest(SimpleConsumer.scala:69)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$
> apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(
> SimpleConsumer.scala:112)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:112)
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(
> SimpleConsumer.scala:112)
> kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
> kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:97)
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> I see many such threads all triggered through the SimpleConsumer and
> ending up polling. Looking at the code, in theory, I can see why there
> might be a busy CPU loop generated by that code path. If my guess is right,
> it could be because of an issue in the implementation of how data is read
> off a channel in a blocking manner and I think this patch might help
> overcome that problem:
>
> diff --git a/core/src/main/scala/kafka/network/Transmission.scala
> b/core/src/main/scala/kafka/network/Transmission.scala
> index 2827103..0bab9ed 100644
> --- a/core/src/main/scala/kafka/network/Transmission.scala
> +++ b/core/src/main/scala/kafka/network/Transmission.scala
> @@ -54,8 +54,15 @@ trait Receive extends Transmission {
>  var totalRead = 0
>  while(!complete) {
>val read = readFrom(channel)
> -  trace(read + " bytes read.")
> -  totalRead += read
> +  if (read > 0) {
> +trace(read + " bytes read.")
> +totalRead += read
> +  } else if (read == 0) {
> +// it's possible that nothing was read (see javadoc of
> ReadableByteChannel#read), from the backing channel,
> +// so we wait for a while before polling again, so that we don't
> end up with a busy CPU loop
> +// TODO: For now, this 30 milli seconds is a random value.
> +Thread.sleep(30)
> +  }
>  }
>  totalRead
>}
>
> Is this something that you would be able to apply against the latest 0.8.2
> branch of Kafka, build the Kafka binary, try it out and see if it improves
> the situation?
>
> -Jaikiran
>
> On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:
>
>> Hi Neha,
>>
>> I sent an e-mail earlier today, but noticed now that it didn't actually
>> go through.
>>
>> Anyhow, I've attached two files, one with output from a 10 minute run and
>> one with output from a 30 minute run. Realized that maybe I should've done
>> one or two runs with 0.8.1.1 as well, but nevertheless.
>>
>> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same
>> CPU usage as with the beta version (basically pegging all cores). If I
>> manage to find the time I'll do another run with hprof on the rc2 version
>> later today.
>>
>> Best regards,
>> Math

subscribe email

2015-02-02 Thread Xinyi Su



Re: create topic does not really executed successfully

2015-02-02 Thread Gwen Shapira
IIRC, the directory is only created after you send data to the topic.

Do you get errors when your producer sends data?

Another common issue is that you specify replication-factor 3 when you
have fewer than 3 brokers.

Gwen

On Mon, Feb 2, 2015 at 2:34 AM, Xinyi Su  wrote:
> Hi,
>
> I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to create
> topic, I observed sometimes the topic is not really created successfully as
> the output shows in console.
>
> Below is my command line:
>
> # bin/kafka-topics.sh  --zookeeper :2181 --create --topic zerg.hydra
> --partitions 3 --replication-factor 3
>
> The command prompts "created topic xxx", but local storage directory used
> for this topic under "log.dirs" does not created at all. Normally, there
> should be some folders like zerg.hydra-0, zerg.hydra-1... just named
> according to partion id and assignment policy.
>
> I come across this issue about four times, the disk is not full and
> directory access permission is legal. Do you know about the cause of this
> issue?
>
> Thanks.
>
> Xinyi


create topic does not really executed successfully

2015-02-02 Thread Xinyi Su
Hi,

I am using Kafka_2.9.2-0.8.2-beta.  When I use kafka-topic.sh to create
topic, I observed sometimes the topic is not really created successfully as
the output shows in console.

Below is my command line:

# bin/kafka-topics.sh  --zookeeper :2181 --create --topic zerg.hydra
--partitions 3 --replication-factor 3

The command prompts "created topic xxx", but local storage directory used
for this topic under "log.dirs" does not created at all. Normally, there
should be some folders like zerg.hydra-0, zerg.hydra-1... just named
according to partion id and assignment policy.

I come across this issue about four times, the disk is not full and
directory access permission is legal. Do you know about the cause of this
issue?

Thanks.

Xinyi


Any Java 7 compatibility issues for 0.8.1.1?

2015-02-02 Thread Yury Ruchin
Hello,

I wonder if there are any known issues with running Kafka 0.8.1.1 against
Oracle JDK 7? Any unsupported JVM options in startup scripts, runtime
issues, etc.?  I'm trying to understand how easy Kafka migration from JDK 6
to 7 would be.

Thanks,
Yury


how to fetch old message from kafka

2015-02-02 Thread Snehalata Nagaje

Hi ,


We are using kafka for storing messages in chat application.

Currently we divided each topic in multiple partitions. each partition stores 
data for given customer who uses the application.

Right now on very first request, application fetches log from kafka from 
earliest valid offset to maxiumum 10 bytes. hence it reads all messages for 
given topic

for given partition. Now we want to apply pagination as linkedin, facebook 
does. Only latest 10-15 messages should be displayed. And then on scroll down

fetch next set of previous messages, we are using Simple consumer to fetch 
messages.

Can you please guide on this?


Thanks,
Snehalata











- Original Message -
From: "Jaikiran Pai" 
To: users@kafka.apache.org
Sent: Monday, February 2, 2015 12:47:19 PM
Subject: Re: Increased CPU usage with 0.8.2-beta

Hi Mathias,

Looking at that thread dump, I think the potential culprit is this one:

TRACE 303545: (thread=200049)
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
 sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
 sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
 sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
 kafka.utils.Utils$.read(Utils.scala:380)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
 kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
 kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
 kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


I see many such threads all triggered through the SimpleConsumer and 
ending up polling. Looking at the code, in theory, I can see why there 
might be a busy CPU loop generated by that code path. If my guess is 
right, it could be because of an issue in the implementation of how data 
is read off a channel in a blocking manner and I think this patch might 
help overcome that problem:

diff --git a/core/src/main/scala/kafka/network/Transmission.scala 
b/core/src/main/scala/kafka/network/Transmission.scala
index 2827103..0bab9ed 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -54,8 +54,15 @@ trait Receive extends Transmission {
  var totalRead = 0
  while(!complete) {
val read = readFrom(channel)
-  trace(read + " bytes read.")
-  totalRead += read
+  if (read > 0) {
+trace(read + " bytes read.")
+totalRead += read
+  } else if (read == 0) {
+// it's possible that nothing was read (see javadoc of 
ReadableByteChannel#read), from the backing channel,
+// so we wait for a while before polling again, so that we 
don't end up with a busy CPU loop
+// TODO: For now, this 30 milli seconds is a random value.
+Thread.sleep(30)
+  }
  }
  totalRead
}

Is this something that you would be able to apply against the latest 
0.8.2 branch of Kafka, build the Kafka binary, try it out and see if it 
improves the situation?

-Jaikiran

On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:
> Hi Neha,
>
> I sent an e-mail earlier today, but noticed now that it didn't 
> actually go through.
>
> Anyhow, I've attached two files, one with output from a 10 minute run 
> and one with output from a 30 minute run. Realized that maybe I 
> should've done one or two runs with 0.8.1.1 as well, but nevertheless.
>
> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same 
> CPU usage as with the beta version (basically