Re: Increased CPU usage with 0.8.2-beta

2015-02-01 Thread Jaikiran Pai

Hi Mathias,

Looking at that thread dump, I think the potential culprit is this one:

TRACE 303545: (thread=200049)
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
kafka.utils.Utils$.read(Utils.scala:380)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


I see many such threads all triggered through the SimpleConsumer and 
ending up polling. Looking at the code, in theory, I can see why there 
might be a busy CPU loop generated by that code path. If my guess is 
right, it could be because of an issue in the implementation of how data 
is read off a channel in a blocking manner and I think this patch might 
help overcome that problem:


diff --git a/core/src/main/scala/kafka/network/Transmission.scala 
b/core/src/main/scala/kafka/network/Transmission.scala

index 2827103..0bab9ed 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -54,8 +54,15 @@ trait Receive extends Transmission {
 var totalRead = 0
 while(!complete) {
   val read = readFrom(channel)
-  trace(read + " bytes read.")
-  totalRead += read
+  if (read > 0) {
+trace(read + " bytes read.")
+totalRead += read
+  } else if (read == 0) {
+// it's possible that nothing was read (see javadoc of 
ReadableByteChannel#read), from the backing channel,
+// so we wait for a while before polling again, so that we 
don't end up with a busy CPU loop

+// TODO: For now, this 30 milli seconds is a random value.
+Thread.sleep(30)
+  }
 }
 totalRead
   }

Is this something that you would be able to apply against the latest 
0.8.2 branch of Kafka, build the Kafka binary, try it out and see if it 
improves the situation?


-Jaikiran

On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:

Hi Neha,

I sent an e-mail earlier today, but noticed now that it didn't 
actually go through.


Anyhow, I've attached two files, one with output from a 10 minute run 
and one with output from a 30 minute run. Realized that maybe I 
should've done one or two runs with 0.8.1.1 as well, but nevertheless.


I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same 
CPU usage as with the beta version (basically pegging all cores). If I 
manage to find the time I'll do another run with hprof on the rc2 
version later today.


Best regards,
Mathias

On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede > wrote:


The following should be sufficient

java

-agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=kafka.hprof


You would need to start the Kafka server with the settings above for
sometime until you observe the problem.

On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
mathias.soederb...@gmail.com
> wrote:

> Hi Neha,
>
> Yeah sure. I'm not familiar with hprof, so any particular
options I should
> include or just run with defaults?
>
> Best regards,
> Mathias
>
> On Mon Dec 08 2014 at 7:41:32 PM Neha N

Re: question on the mailing list

2015-02-01 Thread Jaikiran Pai
I use a simple email client (Thunderbird) and have a filter setup so 
that mails to the Kafka user mailing list are moved to a specific 
folder. I then have "thread view" enabled so that the replies/discussion 
shows up in the right context. I have the same for some other mailing 
lists too and haven't felt the need for any other tool.


-Jaikiran
On Wednesday 28 January 2015 11:01 PM, Dillian Murphey wrote:

Hi all,

Sorry for asking, but is there some easier way to use the mailing list?
Maybe a tool which makes reading and replying to messages more like google
groups?  I like the hadoop searcher, but the UI on that is really bad.

tnx





Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Neha Narkhede
+1 (binding). Verified quickstart and unit tests ran ok.

On Sun, Feb 1, 2015 at 9:11 AM, Steven Wu  wrote:

> In Netflix, we have been using route53 DNS name as bootstrap servers in AWS
> env. Basically, when a kafka broker start, we add it to route53 DNS name
> for the cluster. this is like the VIP that Jay suggested.
>
> But we are also moving toward to use Eureka service registry for
> bootstrapping. We are worried that if DNS name happens to resolve to a bad
> broker. it might impact the bootstrap process/resiliency. We want to get a
> list of brokers from Eureka to pass in as "bootstrap.servers".
>
>
>
> On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps  wrote:
>
> > You may already know this but the producer doesn't require a complete
> list
> > of brokers in its config, it just requires the connection info for one
> > active broker which it uses to discover the rest of the brokers. We allow
> > you to specify multiple urls here for failover in cases where you aren't
> > using a vip. So if you can put three brokers into the VIP for metadata
> > bootstrapping you can still scale up and down the rest of the cluster.
> >
> > -Jay
> >
> > On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker 
> > wrote:
> >
> > > Jun:
> > >
> > > You raise a very good question: let me explain why we use
> > > Broker.getConnectionString(), so may be we'll get a supported way to
> > > answer our need.
> > >
> > > We use Broker.getConnectionString() because we deploy "Kafka services"
> > > in Amazon EC2 with the following architecture:
> > > * Three VMs dedicated to Zookeeper processes
> > > * At least two VMs with Kafka broker, but depending on load it can be
> > > scaled to more broker VMs. Brokers self-register their address in
> > > Zookeeper by serializing Broker objects in Zk.
> > >
> > > The VMs with Zookeeper have Elastic IPs = stable public IPs,
> > >
> > > These public IPs are fed to the  various "Application services" which
> > > rely on Kafka to stream their logs & monitoring data to our central
> > > Hadoop system.
> > >
> > > Using zkclient and the above mentionned public zookeeper IPs, we get
> > > the list of brokers registrered to a given "Kafka service":  this is
> > > where we unserializer Broker objects and then use
> > > getConnectionString() to discover the brokers' addresses. Then,
> > > brokers addresses are used to initialize the Kafka producer(s).
> > >
> > > The whole trick is that we cannot use Elastic IP (=stable IPs) for
> > > Kafka VMs, because of their 'elastic" nature : we want to be able to
> > > scale up / down the number of VMs with Kafka brokers.
> > >
> > > Now, we understand that using non public Kafka API is bad : we've been
> > > broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...
> > >
> > > So it's time to raise the right question: what would be the supported
> > > way to configure our producers given our dynamic-IP-for-brokers
> > > context?
> > >
> > > Thanks,
> > > Alex.
> > >
> > > 2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
> > > :
> > > >
> > > > -Original Message-
> > > > From: Jun Rao [mailto:j...@confluent.io]
> > > > Sent: Sunday, February 01, 2015 3:03
> > > > To: users@kafka.apache.org; kafka-clie...@googlegroups.com
> > > > Cc: d...@kafka.apache.org
> > > > Subject: Re: [VOTE] 0.8.2.0 Candidate 3
> > > >
> > > > Hi, Alex,
> > > >
> > > > Thanks for testing RC3.
> > > >
> > > > Broker.connectionString() is actually not part of the public api for
> > the
> > > producer. Is there a particular reason that you need to use this api?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker <
> alex.m3...@gmail.com
> > >
> > > > wrote:
> > > >
> > > >> Hello,
> > > >>
> > > >> I have read Broker.scala source code, and I found the answer:
> > > >>  - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our
> Java
> > > >> code.
> > > >>  - With Kafka 0.8.2.0, this method has been replaced by a 0-arity
> > > >> method without the "get" prefix, so we have to change our Java code
> to
> > > >> call
> > > >> Broker.connectionString()
> > > >>
> > > >> So despite binary compatibility is broken, we have a by-pass.
> > > >> I hope this will help other people relying on this API...
> > > >>
> > > >> and I'm going to continue tests with 0.8.2 rc3..
> > > >>
> > > >> Alex
> > > >>
> > > >> 2015-01-31 21:23 GMT+01:00 Alex The Rocker :
> > > >>
> > > >> > Hello,
> > > >> >
> > > >> > I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
> > > >> > our
> > > >> > application:
> > > >> >
> > > >> > 1st test:
> > > >> > ==
> > > >> >   replace all kafka .jar files in our application on consumming
> side
> > > >> >   (without recompiling anything)
> > > >> >   => tests passed, OK
> > > >> >
> > > >> > 2nd test:
> > > >> > ===
> > > >> >   replace all kafka .jar files in our application on producubg
> side
> > > >> >   (without recompiling anything)
> > > >> >   => KO, we get this error:
> > > >> >
>

Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Steven Wu
In Netflix, we have been using route53 DNS name as bootstrap servers in AWS
env. Basically, when a kafka broker start, we add it to route53 DNS name
for the cluster. this is like the VIP that Jay suggested.

But we are also moving toward to use Eureka service registry for
bootstrapping. We are worried that if DNS name happens to resolve to a bad
broker. it might impact the bootstrap process/resiliency. We want to get a
list of brokers from Eureka to pass in as "bootstrap.servers".



On Sun, Feb 1, 2015 at 5:30 AM, Jay Kreps  wrote:

> You may already know this but the producer doesn't require a complete list
> of brokers in its config, it just requires the connection info for one
> active broker which it uses to discover the rest of the brokers. We allow
> you to specify multiple urls here for failover in cases where you aren't
> using a vip. So if you can put three brokers into the VIP for metadata
> bootstrapping you can still scale up and down the rest of the cluster.
>
> -Jay
>
> On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker 
> wrote:
>
> > Jun:
> >
> > You raise a very good question: let me explain why we use
> > Broker.getConnectionString(), so may be we'll get a supported way to
> > answer our need.
> >
> > We use Broker.getConnectionString() because we deploy "Kafka services"
> > in Amazon EC2 with the following architecture:
> > * Three VMs dedicated to Zookeeper processes
> > * At least two VMs with Kafka broker, but depending on load it can be
> > scaled to more broker VMs. Brokers self-register their address in
> > Zookeeper by serializing Broker objects in Zk.
> >
> > The VMs with Zookeeper have Elastic IPs = stable public IPs,
> >
> > These public IPs are fed to the  various "Application services" which
> > rely on Kafka to stream their logs & monitoring data to our central
> > Hadoop system.
> >
> > Using zkclient and the above mentionned public zookeeper IPs, we get
> > the list of brokers registrered to a given "Kafka service":  this is
> > where we unserializer Broker objects and then use
> > getConnectionString() to discover the brokers' addresses. Then,
> > brokers addresses are used to initialize the Kafka producer(s).
> >
> > The whole trick is that we cannot use Elastic IP (=stable IPs) for
> > Kafka VMs, because of their 'elastic" nature : we want to be able to
> > scale up / down the number of VMs with Kafka brokers.
> >
> > Now, we understand that using non public Kafka API is bad : we've been
> > broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...
> >
> > So it's time to raise the right question: what would be the supported
> > way to configure our producers given our dynamic-IP-for-brokers
> > context?
> >
> > Thanks,
> > Alex.
> >
> > 2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
> > :
> > >
> > > -Original Message-
> > > From: Jun Rao [mailto:j...@confluent.io]
> > > Sent: Sunday, February 01, 2015 3:03
> > > To: users@kafka.apache.org; kafka-clie...@googlegroups.com
> > > Cc: d...@kafka.apache.org
> > > Subject: Re: [VOTE] 0.8.2.0 Candidate 3
> > >
> > > Hi, Alex,
> > >
> > > Thanks for testing RC3.
> > >
> > > Broker.connectionString() is actually not part of the public api for
> the
> > producer. Is there a particular reason that you need to use this api?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker  >
> > > wrote:
> > >
> > >> Hello,
> > >>
> > >> I have read Broker.scala source code, and I found the answer:
> > >>  - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java
> > >> code.
> > >>  - With Kafka 0.8.2.0, this method has been replaced by a 0-arity
> > >> method without the "get" prefix, so we have to change our Java code to
> > >> call
> > >> Broker.connectionString()
> > >>
> > >> So despite binary compatibility is broken, we have a by-pass.
> > >> I hope this will help other people relying on this API...
> > >>
> > >> and I'm going to continue tests with 0.8.2 rc3..
> > >>
> > >> Alex
> > >>
> > >> 2015-01-31 21:23 GMT+01:00 Alex The Rocker :
> > >>
> > >> > Hello,
> > >> >
> > >> > I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
> > >> > our
> > >> > application:
> > >> >
> > >> > 1st test:
> > >> > ==
> > >> >   replace all kafka .jar files in our application on consumming side
> > >> >   (without recompiling anything)
> > >> >   => tests passed, OK
> > >> >
> > >> > 2nd test:
> > >> > ===
> > >> >   replace all kafka .jar files in our application on producubg side
> > >> >   (without recompiling anything)
> > >> >   => KO, we get this error:
> > >> >
> > >> > 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
> > >> > Exception in thread "Timer-2"
> > >> > 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
> > >> > java.lang.NoSuchMethodError:
> > >> > kafka.cluster.Broker.getConnectionString()Ljava/lang/String;
> > >> >
> > >> > Which means that binary compatibility with 0.8.1.1 version has been
> > >> b

Re: [kafka-clients] is RequestTimedOut (error code: 7) retryable in producer?

2015-02-01 Thread Evan Huus
Hi there,

As far as I know, RequestTimedOut is returned exclusively when the
`Timeout` value provided in a Produce Request is exceeded. The message
will likely still be committed on the local broker, but it means that
some replicas may not have received it yet. I'm honestly not sure what
the use case is for this particular feature, but perhaps somebody else
can explain.

Sarama's choice to retry on `UnknownTopicOrPartition` is probably
unnecessary - I'm not actually sure that code could ever be hit, since
in that case the same error would be raised earlier probably via the
call to `assignPartitions`. Certainly the two important ones are
`LeaderNotAvailable` and `NotLeaderForPartition`.

Evan

On Sat, Jan 31, 2015 at 8:19 AM, DeJiang Zhu  wrote:
> Hi all,
>
> I'm working on implementing a lua kafka client [1], but I'm getting
> confused: what does `RequestTimedOut` exactly mean.
>
> At first, from The Kafka Protocal [2], I thought producer should retry when
> received `RequestTimedOut`.
>
> But later, I found sometimes the messages may have `commited` as well even I
> got `RequestTimedOut`, while sometimes not
>
> Also, in sarama[3], retry happens only on `UnknownTopicOrPartition,
> NotLeaderForPartition, LeaderNotAvailable` error code;only
> `LeaderNotAvailableException` and `NotLeaderForPartitionException` extends
> `RetryAbleException` in kafka[4]
>
> I wonder did I misunderstood The Kafka Protocal or was I wrong on something
> else I haven't realized.
>
> Looking forward to a reply. Thank you guys in advance. :)
>
>
> [1] https://github.com/doujiang24/lua-resty-kafka
> [2]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> [3] https://github.com/Shopify/sarama
> [4] https://github.com/apache/kafka
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at http://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAEZxTmk8YsF3T%2BENcmoVCidOctnbS6wAa08ap9agFP0B7tOXKA%40mail.gmail.com.
> For more options, visit https://groups.google.com/d/optout.


Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Jay Kreps
You may already know this but the producer doesn't require a complete list
of brokers in its config, it just requires the connection info for one
active broker which it uses to discover the rest of the brokers. We allow
you to specify multiple urls here for failover in cases where you aren't
using a vip. So if you can put three brokers into the VIP for metadata
bootstrapping you can still scale up and down the rest of the cluster.

-Jay

On Sun, Feb 1, 2015 at 12:17 AM, Alex The Rocker 
wrote:

> Jun:
>
> You raise a very good question: let me explain why we use
> Broker.getConnectionString(), so may be we'll get a supported way to
> answer our need.
>
> We use Broker.getConnectionString() because we deploy "Kafka services"
> in Amazon EC2 with the following architecture:
> * Three VMs dedicated to Zookeeper processes
> * At least two VMs with Kafka broker, but depending on load it can be
> scaled to more broker VMs. Brokers self-register their address in
> Zookeeper by serializing Broker objects in Zk.
>
> The VMs with Zookeeper have Elastic IPs = stable public IPs,
>
> These public IPs are fed to the  various "Application services" which
> rely on Kafka to stream their logs & monitoring data to our central
> Hadoop system.
>
> Using zkclient and the above mentionned public zookeeper IPs, we get
> the list of brokers registrered to a given "Kafka service":  this is
> where we unserializer Broker objects and then use
> getConnectionString() to discover the brokers' addresses. Then,
> brokers addresses are used to initialize the Kafka producer(s).
>
> The whole trick is that we cannot use Elastic IP (=stable IPs) for
> Kafka VMs, because of their 'elastic" nature : we want to be able to
> scale up / down the number of VMs with Kafka brokers.
>
> Now, we understand that using non public Kafka API is bad : we've been
> broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...
>
> So it's time to raise the right question: what would be the supported
> way to configure our producers given our dynamic-IP-for-brokers
> context?
>
> Thanks,
> Alex.
>
> 2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
> :
> >
> > -Original Message-
> > From: Jun Rao [mailto:j...@confluent.io]
> > Sent: Sunday, February 01, 2015 3:03
> > To: users@kafka.apache.org; kafka-clie...@googlegroups.com
> > Cc: d...@kafka.apache.org
> > Subject: Re: [VOTE] 0.8.2.0 Candidate 3
> >
> > Hi, Alex,
> >
> > Thanks for testing RC3.
> >
> > Broker.connectionString() is actually not part of the public api for the
> producer. Is there a particular reason that you need to use this api?
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker 
> > wrote:
> >
> >> Hello,
> >>
> >> I have read Broker.scala source code, and I found the answer:
> >>  - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java
> >> code.
> >>  - With Kafka 0.8.2.0, this method has been replaced by a 0-arity
> >> method without the "get" prefix, so we have to change our Java code to
> >> call
> >> Broker.connectionString()
> >>
> >> So despite binary compatibility is broken, we have a by-pass.
> >> I hope this will help other people relying on this API...
> >>
> >> and I'm going to continue tests with 0.8.2 rc3..
> >>
> >> Alex
> >>
> >> 2015-01-31 21:23 GMT+01:00 Alex The Rocker :
> >>
> >> > Hello,
> >> >
> >> > I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
> >> > our
> >> > application:
> >> >
> >> > 1st test:
> >> > ==
> >> >   replace all kafka .jar files in our application on consumming side
> >> >   (without recompiling anything)
> >> >   => tests passed, OK
> >> >
> >> > 2nd test:
> >> > ===
> >> >   replace all kafka .jar files in our application on producubg side
> >> >   (without recompiling anything)
> >> >   => KO, we get this error:
> >> >
> >> > 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
> >> > Exception in thread "Timer-2"
> >> > 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
> >> > java.lang.NoSuchMethodError:
> >> > kafka.cluster.Broker.getConnectionString()Ljava/lang/String;
> >> >
> >> > Which means that binary compatibility with 0.8.1.1 version has been
> >> broken.
> >> > We use getConnectionString() to get Broker's zookeepers adresses,
> >> > see
> >> this
> >> > answer from Neha:
> >> >
> >> >
> >> >
> >> http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCA
> >> OG_4QYnWrB=tmrtcryf8-pdagy_cgfe_cxotqbclrkj2+x...@mail.gmail.com%3E
> >> >
> >> > If the kafka.cluster.Broker.getConnectionString() method has been
> >> > removed with Kafka 0.8.2.0, then what is the suitable replacement for
> it ?
> >> >
> >> > Thanks
> >> > Alex
> >> >
> >> >
> >> >> -Original Message-
> >> >> From: Jun Rao [mailto:j...@confluent.io]
> >> >> Sent: Thursday, January 29, 2015 6:22
> >> >> To: d...@kafka.apache.org; users@kafka.apache.org;
> >> >> kafka-clie...@googlegroups.com
> >> >> Subject: [VOTE] 0.8.2.0 Candidate 3
> 

Re: [VOTE] 0.8.2.0 Candidate 3

2015-02-01 Thread Alex The Rocker
Jun:

You raise a very good question: let me explain why we use
Broker.getConnectionString(), so may be we'll get a supported way to
answer our need.

We use Broker.getConnectionString() because we deploy "Kafka services"
in Amazon EC2 with the following architecture:
* Three VMs dedicated to Zookeeper processes
* At least two VMs with Kafka broker, but depending on load it can be
scaled to more broker VMs. Brokers self-register their address in
Zookeeper by serializing Broker objects in Zk.

The VMs with Zookeeper have Elastic IPs = stable public IPs,

These public IPs are fed to the  various "Application services" which
rely on Kafka to stream their logs & monitoring data to our central
Hadoop system.

Using zkclient and the above mentionned public zookeeper IPs, we get
the list of brokers registrered to a given "Kafka service":  this is
where we unserializer Broker objects and then use
getConnectionString() to discover the brokers' addresses. Then,
brokers addresses are used to initialize the Kafka producer(s).

The whole trick is that we cannot use Elastic IP (=stable IPs) for
Kafka VMs, because of their 'elastic" nature : we want to be able to
scale up / down the number of VMs with Kafka brokers.

Now, we understand that using non public Kafka API is bad : we've been
broken when moving to 0.8.1.1, then again when moving to 0.8.2.0...

So it's time to raise the right question: what would be the supported
way to configure our producers given our dynamic-IP-for-brokers
context?

Thanks,
Alex.

2015-02-01 8:55 GMT+01:00 VERMEERBERGEN Alexandre
:
>
> -Original Message-
> From: Jun Rao [mailto:j...@confluent.io]
> Sent: Sunday, February 01, 2015 3:03
> To: users@kafka.apache.org; kafka-clie...@googlegroups.com
> Cc: d...@kafka.apache.org
> Subject: Re: [VOTE] 0.8.2.0 Candidate 3
>
> Hi, Alex,
>
> Thanks for testing RC3.
>
> Broker.connectionString() is actually not part of the public api for the 
> producer. Is there a particular reason that you need to use this api?
>
> Thanks,
>
> Jun
>
> On Sat, Jan 31, 2015 at 1:53 PM, Alex The Rocker 
> wrote:
>
>> Hello,
>>
>> I have read Broker.scala source code, and I found the answer:
>>  - With Kafka 0.8.1.1 we used Broker.getConnectionString() in our Java
>> code.
>>  - With Kafka 0.8.2.0, this method has been replaced by a 0-arity
>> method without the "get" prefix, so we have to change our Java code to
>> call
>> Broker.connectionString()
>>
>> So despite binary compatibility is broken, we have a by-pass.
>> I hope this will help other people relying on this API...
>>
>> and I'm going to continue tests with 0.8.2 rc3..
>>
>> Alex
>>
>> 2015-01-31 21:23 GMT+01:00 Alex The Rocker :
>>
>> > Hello,
>> >
>> > I ran my own tests made with kafka_2.10-0.8.1.1.tgz binaries with
>> > our
>> > application:
>> >
>> > 1st test:
>> > ==
>> >   replace all kafka .jar files in our application on consumming side
>> >   (without recompiling anything)
>> >   => tests passed, OK
>> >
>> > 2nd test:
>> > ===
>> >   replace all kafka .jar files in our application on producubg side
>> >   (without recompiling anything)
>> >   => KO, we get this error:
>> >
>> > 2015-01-31 20:54:00,094 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
>> > Exception in thread "Timer-2"
>> > 2015-01-31 20:54:00,111 [Timer-2] ERROR c.d.i.t.StdOutErrRedirect -
>> > java.lang.NoSuchMethodError:
>> > kafka.cluster.Broker.getConnectionString()Ljava/lang/String;
>> >
>> > Which means that binary compatibility with 0.8.1.1 version has been
>> broken.
>> > We use getConnectionString() to get Broker's zookeepers adresses,
>> > see
>> this
>> > answer from Neha:
>> >
>> >
>> >
>> http://mail-archives.apache.org/mod_mbox/kafka-users/201404.mbox/%3CCA
>> OG_4QYnWrB=tmrtcryf8-pdagy_cgfe_cxotqbclrkj2+x...@mail.gmail.com%3E
>> >
>> > If the kafka.cluster.Broker.getConnectionString() method has been
>> > removed with Kafka 0.8.2.0, then what is the suitable replacement for it ?
>> >
>> > Thanks
>> > Alex
>> >
>> >
>> >> -Original Message-
>> >> From: Jun Rao [mailto:j...@confluent.io]
>> >> Sent: Thursday, January 29, 2015 6:22
>> >> To: d...@kafka.apache.org; users@kafka.apache.org;
>> >> kafka-clie...@googlegroups.com
>> >> Subject: [VOTE] 0.8.2.0 Candidate 3
>> >>
>> >> This is the third candidate for release of Apache Kafka 0.8.2.0.
>> >>
>> >> Release Notes for the 0.8.2.0 release
>> >>
>> >>
>> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOT
>> ES.html
>> >>
>> >> *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT
>> >>
>> >> Kafka's KEYS file containing PGP keys we use to sign the release:
>> >> http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2
>> >> (SHA256) checksum.
>> >>
>> >> * Release artifacts to be voted upon (source and binary):
>> >> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/
>> >>
>> >> * Maven artifacts to be voted upon prior to release:
>> >> https://repository.apache.org/content/groups/staging/
>> >>
>> >> *