Issue with async producer

2014-11-03 Thread Devendra Tagare
Hi,

We are using an async producer to send data to kafka.

The load the sender handles is around 250 rps ,the size of a message is around 
25K.

The configs used in the producer are :

request.required.acks=0
producer.type=async
batch.num.messages=10
topic.metadata.refresh.interval.ms=3
queue.buffering.max.ms=300
queue.enqueue.timeout.ms=50


While the asnc producer works  perfectly fine  at 150-175 rps,the invoking 
method returns under 10 ms.The invoking method takes around 2ms to return 
when the load increases to 250rps.

On investigation we noticed QueueFullExceptions in the logs.

On the kafka side the memory utilization was high.Is is because the fsync to 
memory is not happening fast enough?
The document says the OS level fsync interval should take care of the interval 
at which writes are happening.Should we expediate the write using

log.flush.interval.messages
and log.flush.interval.ms.

Also, we tried the sync producer but were not able to enforce the timeouts on 
it.

We have 45 producers & 11 Kafka servers which handle a load of around 500mn 
events per day.

Some of the server side properties we are using:

log.segment.bytes=536870912
log.retention.check.interval.ms=6
zookeeper.connection.timeout.ms=100

Regards,
Dev


Re: kafka.message.InvalidMessageException: Message is corrupt

2014-11-03 Thread Jun Rao
Are you using the java producer?

Thanks,

Jun

On Mon, Nov 3, 2014 at 3:31 AM, Fredrik S Loekke 
wrote:

>  Hi
>
>
>
> We are experimenting with running kafka server on a windows machine, but
> keep getting exeptions when producing a lot of messages (in the
> neighborhood of 1 million)..
>
>
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 29639294
>
> 31, computed crc = 2364631640)
>
> at kafka.message.Message.ensureValid(Message.scala:166)
>
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala
>
> :330)
>
> at
> kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala
>
> :318)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>
> at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:318)
>
> at kafka.log.Log.append(Log.scala:231)
>
>
>
> Any suggestions?
>
>
>
> Best regards / Med venlig hilsen
>
>
>
> *Fredrik Skeel Løkke*
>
> Software Developer ǀ IT & Analysis
>
>
>
> Mob.: +45 3176 8438
>
> f...@lindcapital.com
>
>
>
> [image: Beskrivelse: Beskrivelse: Beskrivelse: Beskrivelse:
> cid:image001.png@01CD4A0C.218B6960]
>
>
>
> Lind Capital A/S
>
> Værkmestergade 3, 2
>
> DK-8000 Aarhus C
>
> www.lindcapital.com
>
> Follow us on
>
> ­[image: linkedin]   [image:
> facebook] 
>
>
>


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Guozhang Wang
Koert, these two classes belong to the 0.9 consumer api, which are not dev
ready yet. We only checked in the apis so people can review | comment on.

Guozhang
On Nov 1, 2014 8:26 AM, "Koert Kuipers"  wrote:

> joe,
> looking at those 0.8.2 beta javadoc I also see a Consumer api and
> KafkaConsumer class. they look different from what I currently use in
> 8.1.1. Is this new? And this is not the 0.9 consumer?
> thanks, koert
> On Oct 30, 2014 8:01 AM, "Joe Stein"  wrote:
>
> > Hey, yeah!
> >
> > For the new producer
> > https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
> >
> > The java consumer is slated in 0.9 more on that here
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> > On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard  >
> > wrote:
> >
> > > Hi Joe et al.
> > >
> > > Congrats on the beta release!
> > > Do I read correctly that libraries can now rely on
> > > org.apache.kafka/kafka-clients which does not pull in scala anymore ?
> > >
> > > If so, awesome!
> > >
> > >   - pyr
> > >
> > > On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu  wrote:
> > >
> > > > Congrats! When do you think the final 0.82 will be released?
> > > >
> > > > > To: annou...@apache.org; users@kafka.apache.org;
> > d...@kafka.apache.org
> > > > > Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
> > > > > Date: Tue, 28 Oct 2014 00:50:35 +
> > > > > From: joest...@apache.org
> > > > >
> > > > > The Apache Kafka community is pleased to announce the beta release
> > for
> > > > Apache Kafka 0.8.2.
> > > > >
> > > > > The 0.8.2-beta release introduces many new features, improvements
> and
> > > > fixes including:
> > > > >  - A new Java producer for ease of implementation and enhanced
> > > > performance.
> > > > >  - Delete topic support.
> > > > >  - Per topic configuration of preference for consistency over
> > > > availability.
> > > > >  - Scala 2.11 support and dropping support for Scala 2.8.
> > > > >  - LZ4 Compression.
> > > > >
> > > > > All of the changes in this release can be found:
> > > > https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> > > > >
> > > > > Apache Kafka is high-throughput, publish-subscribe messaging system
> > > > rethought of as a distributed commit log.
> > > > >
> > > > > ** Fast => A single Kafka broker can handle hundreds of megabytes
> of
> > > > reads and
> > > > > writes per second from thousands of clients.
> > > > >
> > > > > ** Scalable => Kafka is designed to allow a single cluster to serve
> > as
> > > > the central data backbone
> > > > > for a large organization. It can be elastically and transparently
> > > > expanded without downtime.
> > > > > Data streams are partitioned and spread over a cluster of machines
> to
> > > > allow data streams
> > > > > larger than the capability of any single machine and to allow
> > clusters
> > > > of co-ordinated consumers.
> > > > >
> > > > > ** Durable => Messages are persisted on disk and replicated within
> > the
> > > > cluster to prevent
> > > > > data loss. Each broker can handle terabytes of messages without
> > > > performance impact.
> > > > >
> > > > > ** Distributed by Design => Kafka has a modern cluster-centric
> design
> > > > that offers
> > > > > strong durability and fault-tolerance guarantees.
> > > > >
> > > > > You can download the release from:
> > > > http://kafka.apache.org/downloads.html
> > > > >
> > > > > We welcome your help and feedback. For more information on how to
> > > > > report problems, and to get involved, visit the project website at
> > > > http://kafka.apache.org/
> > > > >
> > > >
> > > >
> > >
> >
>


Re: Consumer keeps looking connection

2014-11-03 Thread Jun Rao
It seems that the consumer can't connect to the broker for some reason. Any
other error on the broker? Any issue with the network?

Thanks,

Jun

On Sat, Nov 1, 2014 at 9:46 PM, Chen Wang 
wrote:

> Hello Folks,
> I am using Highlevel consumer, and it seems to drop connections
> intermittently:
>
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> Received -1 when reading from channel, socket has likely been closed.
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
>
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20220;
> ClientId:
>
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304),[test_topic,21]
> -> PartitionFetchInfo(141266339,4194304)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> at kafka.network.BlockingChannel.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
> at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
> at kafka.utils.ShutdownableThread.run(Unknown Source)
> 2014-11-01 13:34:40 VerifiableProperties [INFO] Verifying properties
>
> or sometimes:
> 2014-11-01 13:34:40 SimpleConsumer [INFO] Reconnect due to socket error:
> null
> 2014-11-01 13:34:40 ConsumerFetcherThread [WARN]
>
> [ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5],
> Error in fetch Name: FetchRequest; Version: 0; CorrelationId: 20222;
> ClientId:
>
> campaign_open_consumer_targeting_20141031-ConsumerFetcherThread-campaign_open_consumer_targeting_20141031_trgt-storm03-1414801367127-40cc618a-0-5;
> ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo:
> [test_topic,18] -> PartitionFetchInfo(1681313989,4194304)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.Net.connect(Native Method)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:534)
> at kafka.network.BlockingChannel.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.connect(Unknown Source)
> at kafka.consumer.SimpleConsumer.reconnect(Unknown Source)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(Unknown Source)
> at
>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown
> Source)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Unknown
> Source)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at
>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(Unknown
> Source)
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(Unknown Source)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
> at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(Unknown
> Source)
> at kafka.metrics.KafkaTimer.time(Unknown Source)
> at kafka.consumer.SimpleConsumer.fetch(Unknown Source)
> at kafka.server.AbstractFetcherThread.processFetchRequest(Unknown
> Source)
> at kafka.server.AbstractFetcherThread.doWork(Unknown Source)
> at kafka.utils.ShutdownableThread.run(Unknown Source)
>
> The config I am using is:
> kafka.config.fetch.message.max.bytes4194304kafka.config.group.idmygroupid
> kafka.config.rebalance.backoff.ms6000k

Re: High Level Consumer Iterator IllegalStateException Issue

2014-11-03 Thread Jun Rao
Bhavesh,

That example has a lot of code. Could you provide a simpler test that
demonstrates the problem?

Thanks,

Jun

On Fri, Oct 31, 2014 at 10:07 PM, Bhavesh Mistry  wrote:

> Hi Jun,
>
> Here is code base:
>
> https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java
>
> Please let me know if you can help me determine  the root cause.   Why
> there is illegal state and blocking ?
>
> Thanks,
>
> Bhavesh
>
> On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao  wrote:
>
> > Do you have a simple test that can reproduce this issue?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> > mistry.p.bhav...@gmail.com>
> > wrote:
> >
> > > HI Jun,
> > >
> > > Consumer Connector is not closed because I can see the ConsumerFetcher
> > > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > > This is what I see after recovery.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao  wrote:
> > >
> > > > Another possibility is that the consumer connector is already closed
> > and
> > > > then you call hasNext() on the iterator.
> > > >
> > > > Thanks,
> > > >
> > > >
> > > > Jun
> > > >
> > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > The hasNext() itself throws this error.  I have to manually reset
> > state
> > > > and
> > > > > sometime it is able to recover and other it is not. Any other clue
> ?
> > > > >
> > > > > public boolean hasNext() {
> > > > > LOG.info("called of  hasNext() :");
> > > > > int retry = 3;
> > > > > while(retry > 0){
> > > > > try{
> > > > > // this hasNext is blocking call..
> > > > > boolean result = iterator.hasNext();
> > > > > return result;
> > > > > }catch(IllegalStateException exp){
> > > > > iterator.resetState();
> > > > > LOG.error("GOT IllegalStateException arg trying
> > to
> > > > > recover", exp);
> > > > > retry--;
> > > > > }
> > > > > }
> > > > > return false;
> > > > > }
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao  wrote:
> > > > >
> > > > > > The IllegalStateException typically happens if you call next()
> > before
> > > > > > hasNext() on the iterator.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > > mistry.p.bhav...@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi Neha,
> > > > > > >
> > > > > > > Thanks for your answer.  Can you please let me know how I can
> > > resolve
> > > > > the
> > > > > > > Iterator IllegalStateException ?  I would appreciate your is
> this
> > > is
> > > > > bug
> > > > > > I
> > > > > > > can file one or let me know if this is use case specific ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > > neha.narkh...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > queued.max.message.chunks controls the consumer's fetcher
> > queue.
> > > > > > > >
> > > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > > > > mistry.p.bhav...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > HI Neha,
> > > > > > > > >
> > > > > > > > > If I solved the problem number 1 think and number 2 will be
> > > > solved
> > > > > > > (prob
> > > > > > > > > 1 is causing problem number 2(blocked)).  Can you please
> let
> > me
> > > > > know
> > > > > > > what
> > > > > > > > > controls the queue size for *ConsumerFetcherThread* thread
> ?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Please see the attached java source code which will
> reproduce
> > > the
> > > > > > > > > problem.  You may remove the recovery process...  Please
> > check.
> > > > We
> > > > > > > have
> > > > > > > > to
> > > > > > > > > do some work before we start reading from Kafka Stream
> > > Interator
> > > > > and
> > > > > > > this
> > > > > > > > > seems to cause some issue with java.lang.
> > > > > > > > > IllegalStateException: Iterator is in failed state*.
> > > > > > > > >
> > > > > > > > > Please let me know your finding and recommendation.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Bhavesh
> > > > > > > > >
> > > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> > > > > > > neha.narkh...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> >> Sometime it give following exception.
> > > > > > > > >>
> > > > > > > > >> It will help to have a more specific test case that
> > reproduces
> > > > the
> > > > > > >

Re: MBeans, dashes, underscores, and KAFKA-1481

2014-11-03 Thread Jun Rao
Erik,

It seems that we can customized the mbean names with Metrics 2.2.0. Any
other reasons that we need to downgrade to Metrics 2.1.5?

Thanks,

Jun

On Sun, Nov 2, 2014 at 12:10 PM, Erik van Oosten <
e.vanoos...@grons.nl.invalid> wrote:

> Hi Jun,
>
> The quotes are because of a regression in Metrics 2.2.0. IMHO Metrics
> 2.2.0 should not be used because of this. Just downgrade to Metrics 2.1.5
> and you are good.
>
> Of course, upgrading to Metrics 3 would do the trick also.
>
> Kind regards,
> Erik.
>
>
> Jun Rao schreef op 17-10-14 om 20:54:
>
>  Hi, everyone,
>>
>> We are fixing the mbean names in kafka-1482, by adding separate explicit
>> tags in the name for things like clientId and topic. Another thing that
>> some people have complained before is that we use quotes in the jmx name.
>> Should we also just get rid of the quotes as part of kafka-1482? So,
>> instead of
>> "kafka.server":type="BrokerTopicMetrics",name="topic-1-BytesInPerSec"
>> we will have
>> kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=topic-1
>>
>> Thanks,
>>
>> Jun
>>
>>
>
> --
> Erik van Oosten
> http://www.day-to-day-stuff.blogspot.com/
>
>


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Jun Rao
Koert,

The java consumer in 0.8.2 beta only has the api and hasn't been
implemented yet. The implementation will likely complete in 0.9.

Thanks,

Jun

On Sat, Nov 1, 2014 at 8:18 AM, Koert Kuipers  wrote:

> joe,
> looking at those 0.8.2 beta javadoc I also see a Consumer api and
> KafkaConsumer class. they look different from what I currently use in
> 8.1.1. Is this new? And this is not the 0.9 consumer?
> thanks, koert
> On Oct 30, 2014 8:01 AM, "Joe Stein"  wrote:
>
> > Hey, yeah!
> >
> > For the new producer
> > https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
> >
> > The java consumer is slated in 0.9 more on that here
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> > On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard  >
> > wrote:
> >
> > > Hi Joe et al.
> > >
> > > Congrats on the beta release!
> > > Do I read correctly that libraries can now rely on
> > > org.apache.kafka/kafka-clients which does not pull in scala anymore ?
> > >
> > > If so, awesome!
> > >
> > >   - pyr
> > >
> > > On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu  wrote:
> > >
> > > > Congrats! When do you think the final 0.82 will be released?
> > > >
> > > > > To: annou...@apache.org; users@kafka.apache.org;
> > d...@kafka.apache.org
> > > > > Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
> > > > > Date: Tue, 28 Oct 2014 00:50:35 +
> > > > > From: joest...@apache.org
> > > > >
> > > > > The Apache Kafka community is pleased to announce the beta release
> > for
> > > > Apache Kafka 0.8.2.
> > > > >
> > > > > The 0.8.2-beta release introduces many new features, improvements
> and
> > > > fixes including:
> > > > >  - A new Java producer for ease of implementation and enhanced
> > > > performance.
> > > > >  - Delete topic support.
> > > > >  - Per topic configuration of preference for consistency over
> > > > availability.
> > > > >  - Scala 2.11 support and dropping support for Scala 2.8.
> > > > >  - LZ4 Compression.
> > > > >
> > > > > All of the changes in this release can be found:
> > > > https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> > > > >
> > > > > Apache Kafka is high-throughput, publish-subscribe messaging system
> > > > rethought of as a distributed commit log.
> > > > >
> > > > > ** Fast => A single Kafka broker can handle hundreds of megabytes
> of
> > > > reads and
> > > > > writes per second from thousands of clients.
> > > > >
> > > > > ** Scalable => Kafka is designed to allow a single cluster to serve
> > as
> > > > the central data backbone
> > > > > for a large organization. It can be elastically and transparently
> > > > expanded without downtime.
> > > > > Data streams are partitioned and spread over a cluster of machines
> to
> > > > allow data streams
> > > > > larger than the capability of any single machine and to allow
> > clusters
> > > > of co-ordinated consumers.
> > > > >
> > > > > ** Durable => Messages are persisted on disk and replicated within
> > the
> > > > cluster to prevent
> > > > > data loss. Each broker can handle terabytes of messages without
> > > > performance impact.
> > > > >
> > > > > ** Distributed by Design => Kafka has a modern cluster-centric
> design
> > > > that offers
> > > > > strong durability and fault-tolerance guarantees.
> > > > >
> > > > > You can download the release from:
> > > > http://kafka.apache.org/downloads.html
> > > > >
> > > > > We welcome your help and feedback. For more information on how to
> > > > > report problems, and to get involved, visit the project website at
> > > > http://kafka.apache.org/
> > > > >
> > > >
> > > >
> > >
> >
>


Re: Dynamically adding Kafka brokers

2014-11-03 Thread Jay Kreps
I agree it would be really nice to get KAFKA-1070 figured out.

FWIW, the reason for having a name or id other than ip was to make it
possible to move the identity to another physical server (e.g. scp the data
directory) and have it perform the same role on that new piece of hardware.
Systems that tie the data to a particular host tend to be sort of hated on
since you can't do anything simple/stupid to back them up or replace them.

-Jay

On Mon, Nov 3, 2014 at 2:23 PM, Gwen Shapira  wrote:

> +1
> Thats what we use to generate broker id in automatic deployments.
> This method makes troubleshooting easier (you know where each broker is
> running), and doesn't require keeping extra files around.
>
> On Mon, Nov 3, 2014 at 2:17 PM, Joe Stein  wrote:
>
> > Most folks strip the IP and use that as the broker.id. KAFKA-1070 does
> not
> > yet accommodate for that very widely used method. I think it would be bad
> > if KAFKA-1070 only worked for new installations because that is how
> people
> > use Kafka today (per
> >
> >
> https://issues.apache.org/jira/browse/KAFKA-1070?focusedCommentId=14085808&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14085808
> > )
> >
> > On Mon, Nov 3, 2014 at 2:12 PM, Joel Koshy  wrote:
> >
> > > KAFKA-1070 will help with this and is pending a review.
> > >
> > > On Mon, Nov 03, 2014 at 05:03:20PM -0500, Otis Gospodnetic wrote:
> > > > Hi,
> > > >
> > > > How do people handle situations, and specifically the broker.id
> > > property,
> > > > where the Kafka (broker) cluster is not fully defined right away?
> > > >
> > > > Here's the use case we have at Sematext:
> > > > * Our software ships as a VM
> > > > * All components run in this single VM, including 1 Kafka broker
> > > > * Of course, this is just for a nice OOTB experience, but to scale
> one
> > > > needs to have more instances of this VM, including more Kafka brokers
> > > > * *One can clone our VM and launch N instances of it, but because we
> > > have a
> > > > single Kafka broker config with a single broker.id  >
> > in
> > > > it, one can't just launch more of these VMs and expect to see more
> > Kafka
> > > > brokers join the cluster.  One would have to change the broker.id
> > > >  on each new VM instance.*
> > > >
> > > > How do others handle this in a software that is packages and ships to
> > > user
> > > > and is not in your direct control to allow you to edit configs?
> > > >
> > > > Would it be best to have a script that connect to ZooKeeper to get
> the
> > > list
> > > > of all existing brokers and their IDs and then generate a new
> distinct
> > > ID +
> > > > config for the new Kafka broker?
> > > >
> > > > Or are there slicker ways to do this that people use?
> > > >
> > > > Thanks,
> > > > Otis
> > > > --
> > > > Monitoring * Alerting * Anomaly Detection * Centralized Log
> Management
> > > > Solr & Elasticsearch Support * http://sematext.com/
> > >
> > >
> >
>


Re: Dynamically adding Kafka brokers

2014-11-03 Thread Gwen Shapira
+1
Thats what we use to generate broker id in automatic deployments.
This method makes troubleshooting easier (you know where each broker is
running), and doesn't require keeping extra files around.

On Mon, Nov 3, 2014 at 2:17 PM, Joe Stein  wrote:

> Most folks strip the IP and use that as the broker.id. KAFKA-1070 does not
> yet accommodate for that very widely used method. I think it would be bad
> if KAFKA-1070 only worked for new installations because that is how people
> use Kafka today (per
>
> https://issues.apache.org/jira/browse/KAFKA-1070?focusedCommentId=14085808&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14085808
> )
>
> On Mon, Nov 3, 2014 at 2:12 PM, Joel Koshy  wrote:
>
> > KAFKA-1070 will help with this and is pending a review.
> >
> > On Mon, Nov 03, 2014 at 05:03:20PM -0500, Otis Gospodnetic wrote:
> > > Hi,
> > >
> > > How do people handle situations, and specifically the broker.id
> > property,
> > > where the Kafka (broker) cluster is not fully defined right away?
> > >
> > > Here's the use case we have at Sematext:
> > > * Our software ships as a VM
> > > * All components run in this single VM, including 1 Kafka broker
> > > * Of course, this is just for a nice OOTB experience, but to scale one
> > > needs to have more instances of this VM, including more Kafka brokers
> > > * *One can clone our VM and launch N instances of it, but because we
> > have a
> > > single Kafka broker config with a single broker.id 
> in
> > > it, one can't just launch more of these VMs and expect to see more
> Kafka
> > > brokers join the cluster.  One would have to change the broker.id
> > >  on each new VM instance.*
> > >
> > > How do others handle this in a software that is packages and ships to
> > user
> > > and is not in your direct control to allow you to edit configs?
> > >
> > > Would it be best to have a script that connect to ZooKeeper to get the
> > list
> > > of all existing brokers and their IDs and then generate a new distinct
> > ID +
> > > config for the new Kafka broker?
> > >
> > > Or are there slicker ways to do this that people use?
> > >
> > > Thanks,
> > > Otis
> > > --
> > > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > > Solr & Elasticsearch Support * http://sematext.com/
> >
> >
>


Re: Dynamically adding Kafka brokers

2014-11-03 Thread Joe Stein
Most folks strip the IP and use that as the broker.id. KAFKA-1070 does not
yet accommodate for that very widely used method. I think it would be bad
if KAFKA-1070 only worked for new installations because that is how people
use Kafka today (per
https://issues.apache.org/jira/browse/KAFKA-1070?focusedCommentId=14085808&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14085808
)

On Mon, Nov 3, 2014 at 2:12 PM, Joel Koshy  wrote:

> KAFKA-1070 will help with this and is pending a review.
>
> On Mon, Nov 03, 2014 at 05:03:20PM -0500, Otis Gospodnetic wrote:
> > Hi,
> >
> > How do people handle situations, and specifically the broker.id
> property,
> > where the Kafka (broker) cluster is not fully defined right away?
> >
> > Here's the use case we have at Sematext:
> > * Our software ships as a VM
> > * All components run in this single VM, including 1 Kafka broker
> > * Of course, this is just for a nice OOTB experience, but to scale one
> > needs to have more instances of this VM, including more Kafka brokers
> > * *One can clone our VM and launch N instances of it, but because we
> have a
> > single Kafka broker config with a single broker.id  in
> > it, one can't just launch more of these VMs and expect to see more Kafka
> > brokers join the cluster.  One would have to change the broker.id
> >  on each new VM instance.*
> >
> > How do others handle this in a software that is packages and ships to
> user
> > and is not in your direct control to allow you to edit configs?
> >
> > Would it be best to have a script that connect to ZooKeeper to get the
> list
> > of all existing brokers and their IDs and then generate a new distinct
> ID +
> > config for the new Kafka broker?
> >
> > Or are there slicker ways to do this that people use?
> >
> > Thanks,
> > Otis
> > --
> > Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> > Solr & Elasticsearch Support * http://sematext.com/
>
>


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Joel Koshy
Yes there are some changes but will be checked-in prior to the full
release:
https://issues.apache.org/jira/browse/KAFKA-1728

Joel

On Mon, Nov 03, 2014 at 04:46:12PM -0500, Jason Rosenberg wrote:
> Are there any config parameter updates/changes?  I see the doc here:
> http://kafka.apache.org/documentation.html#configuration
> now defaults to 0.8.2-beta.  But it would be useful to know if anything has
> changed from 0.8.1.1, just so we can be sure to update things, etc.
> 
> 
> 
> On Sat, Nov 1, 2014 at 11:18 AM, Koert Kuipers  wrote:
> 
> > joe,
> > looking at those 0.8.2 beta javadoc I also see a Consumer api and
> > KafkaConsumer class. they look different from what I currently use in
> > 8.1.1. Is this new? And this is not the 0.9 consumer?
> > thanks, koert
> > On Oct 30, 2014 8:01 AM, "Joe Stein"  wrote:
> >
> > > Hey, yeah!
> > >
> > > For the new producer
> > > https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
> > >
> > > The java consumer is slated in 0.9 more on that here
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop 
> > > /
> > >
> > > On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard  > >
> > > wrote:
> > >
> > > > Hi Joe et al.
> > > >
> > > > Congrats on the beta release!
> > > > Do I read correctly that libraries can now rely on
> > > > org.apache.kafka/kafka-clients which does not pull in scala anymore ?
> > > >
> > > > If so, awesome!
> > > >
> > > >   - pyr
> > > >
> > > > On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu  wrote:
> > > >
> > > > > Congrats! When do you think the final 0.82 will be released?
> > > > >
> > > > > > To: annou...@apache.org; users@kafka.apache.org;
> > > d...@kafka.apache.org
> > > > > > Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
> > > > > > Date: Tue, 28 Oct 2014 00:50:35 +
> > > > > > From: joest...@apache.org
> > > > > >
> > > > > > The Apache Kafka community is pleased to announce the beta release
> > > for
> > > > > Apache Kafka 0.8.2.
> > > > > >
> > > > > > The 0.8.2-beta release introduces many new features, improvements
> > and
> > > > > fixes including:
> > > > > >  - A new Java producer for ease of implementation and enhanced
> > > > > performance.
> > > > > >  - Delete topic support.
> > > > > >  - Per topic configuration of preference for consistency over
> > > > > availability.
> > > > > >  - Scala 2.11 support and dropping support for Scala 2.8.
> > > > > >  - LZ4 Compression.
> > > > > >
> > > > > > All of the changes in this release can be found:
> > > > > https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> > > > > >
> > > > > > Apache Kafka is high-throughput, publish-subscribe messaging system
> > > > > rethought of as a distributed commit log.
> > > > > >
> > > > > > ** Fast => A single Kafka broker can handle hundreds of megabytes
> > of
> > > > > reads and
> > > > > > writes per second from thousands of clients.
> > > > > >
> > > > > > ** Scalable => Kafka is designed to allow a single cluster to serve
> > > as
> > > > > the central data backbone
> > > > > > for a large organization. It can be elastically and transparently
> > > > > expanded without downtime.
> > > > > > Data streams are partitioned and spread over a cluster of machines
> > to
> > > > > allow data streams
> > > > > > larger than the capability of any single machine and to allow
> > > clusters
> > > > > of co-ordinated consumers.
> > > > > >
> > > > > > ** Durable => Messages are persisted on disk and replicated within
> > > the
> > > > > cluster to prevent
> > > > > > data loss. Each broker can handle terabytes of messages without
> > > > > performance impact.
> > > > > >
> > > > > > ** Distributed by Design => Kafka has a modern cluster-centric
> > design
> > > > > that offers
> > > > > > strong durability and fault-tolerance guarantees.
> > > > > >
> > > > > > You can download the release from:
> > > > > http://kafka.apache.org/downloads.html
> > > > > >
> > > > > > We welcome your help and feedback. For more information on how to
> > > > > > report problems, and to get involved, visit the project website at
> > > > > http://kafka.apache.org/
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >



Re: Dynamically adding Kafka brokers

2014-11-03 Thread Joel Koshy
KAFKA-1070 will help with this and is pending a review.

On Mon, Nov 03, 2014 at 05:03:20PM -0500, Otis Gospodnetic wrote:
> Hi,
> 
> How do people handle situations, and specifically the broker.id property,
> where the Kafka (broker) cluster is not fully defined right away?
> 
> Here's the use case we have at Sematext:
> * Our software ships as a VM
> * All components run in this single VM, including 1 Kafka broker
> * Of course, this is just for a nice OOTB experience, but to scale one
> needs to have more instances of this VM, including more Kafka brokers
> * *One can clone our VM and launch N instances of it, but because we have a
> single Kafka broker config with a single broker.id  in
> it, one can't just launch more of these VMs and expect to see more Kafka
> brokers join the cluster.  One would have to change the broker.id
>  on each new VM instance.*
> 
> How do others handle this in a software that is packages and ships to user
> and is not in your direct control to allow you to edit configs?
> 
> Would it be best to have a script that connect to ZooKeeper to get the list
> of all existing brokers and their IDs and then generate a new distinct ID +
> config for the new Kafka broker?
> 
> Or are there slicker ways to do this that people use?
> 
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/



Re: Dynamically adding Kafka brokers

2014-11-03 Thread Florian Dambrine
I am automating Kafka clusters in the cloud and I am using Ips (stripped to
keep only numbers) to define the broker ids.

You might have to add a shell script or whatever fits to set the broker id.

I hope this might help a bit.

Regards,
Le 3 nov. 2014 23:04, "Otis Gospodnetic"  a
écrit :

> Hi,
>
> How do people handle situations, and specifically the broker.id property,
> where the Kafka (broker) cluster is not fully defined right away?
>
> Here's the use case we have at Sematext:
> * Our software ships as a VM
> * All components run in this single VM, including 1 Kafka broker
> * Of course, this is just for a nice OOTB experience, but to scale one
> needs to have more instances of this VM, including more Kafka brokers
> * *One can clone our VM and launch N instances of it, but because we have a
> single Kafka broker config with a single broker.id  in
> it, one can't just launch more of these VMs and expect to see more Kafka
> brokers join the cluster.  One would have to change the broker.id
>  on each new VM instance.*
>
> How do others handle this in a software that is packages and ships to user
> and is not in your direct control to allow you to edit configs?
>
> Would it be best to have a script that connect to ZooKeeper to get the list
> of all existing brokers and their IDs and then generate a new distinct ID +
> config for the new Kafka broker?
>
> Or are there slicker ways to do this that people use?
>
> Thanks,
> Otis
> --
> Monitoring * Alerting * Anomaly Detection * Centralized Log Management
> Solr & Elasticsearch Support * http://sematext.com/
>


Dynamically adding Kafka brokers

2014-11-03 Thread Otis Gospodnetic
Hi,

How do people handle situations, and specifically the broker.id property,
where the Kafka (broker) cluster is not fully defined right away?

Here's the use case we have at Sematext:
* Our software ships as a VM
* All components run in this single VM, including 1 Kafka broker
* Of course, this is just for a nice OOTB experience, but to scale one
needs to have more instances of this VM, including more Kafka brokers
* *One can clone our VM and launch N instances of it, but because we have a
single Kafka broker config with a single broker.id  in
it, one can't just launch more of these VMs and expect to see more Kafka
brokers join the cluster.  One would have to change the broker.id
 on each new VM instance.*

How do others handle this in a software that is packages and ships to user
and is not in your direct control to allow you to edit configs?

Would it be best to have a script that connect to ZooKeeper to get the list
of all existing brokers and their IDs and then generate a new distinct ID +
config for the new Kafka broker?

Or are there slicker ways to do this that people use?

Thanks,
Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Jason Rosenberg
Also, that doc refers to the 'new producer' as available in trunk and of
beta quality.

But from the announcement, it seems it's now more properly integrated in
the release?  Also, where can I read about the 'kafka-client' referred to
above?

Thanks,

Jason



On Mon, Nov 3, 2014 at 4:46 PM, Jason Rosenberg  wrote:

> Are there any config parameter updates/changes?  I see the doc here:
> http://kafka.apache.org/documentation.html#configuration
> now defaults to 0.8.2-beta.  But it would be useful to know if anything
> has changed from 0.8.1.1, just so we can be sure to update things, etc.
>
>
>
> On Sat, Nov 1, 2014 at 11:18 AM, Koert Kuipers  wrote:
>
>> joe,
>> looking at those 0.8.2 beta javadoc I also see a Consumer api and
>> KafkaConsumer class. they look different from what I currently use in
>> 8.1.1. Is this new? And this is not the 0.9 consumer?
>> thanks, koert
>> On Oct 30, 2014 8:01 AM, "Joe Stein"  wrote:
>>
>> > Hey, yeah!
>> >
>> > For the new producer
>> > https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
>> >
>> > The java consumer is slated in 0.9 more on that here
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>> >
>> > /***
>> >  Joe Stein
>> >  Founder, Principal Consultant
>> >  Big Data Open Source Security LLC
>> >  http://www.stealth.ly
>> >  Twitter: @allthingshadoop 
>> > /
>> >
>> > On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard <
>> p...@spootnik.org>
>> > wrote:
>> >
>> > > Hi Joe et al.
>> > >
>> > > Congrats on the beta release!
>> > > Do I read correctly that libraries can now rely on
>> > > org.apache.kafka/kafka-clients which does not pull in scala anymore ?
>> > >
>> > > If so, awesome!
>> > >
>> > >   - pyr
>> > >
>> > > On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu  wrote:
>> > >
>> > > > Congrats! When do you think the final 0.82 will be released?
>> > > >
>> > > > > To: annou...@apache.org; users@kafka.apache.org;
>> > d...@kafka.apache.org
>> > > > > Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
>> > > > > Date: Tue, 28 Oct 2014 00:50:35 +
>> > > > > From: joest...@apache.org
>> > > > >
>> > > > > The Apache Kafka community is pleased to announce the beta release
>> > for
>> > > > Apache Kafka 0.8.2.
>> > > > >
>> > > > > The 0.8.2-beta release introduces many new features, improvements
>> and
>> > > > fixes including:
>> > > > >  - A new Java producer for ease of implementation and enhanced
>> > > > performance.
>> > > > >  - Delete topic support.
>> > > > >  - Per topic configuration of preference for consistency over
>> > > > availability.
>> > > > >  - Scala 2.11 support and dropping support for Scala 2.8.
>> > > > >  - LZ4 Compression.
>> > > > >
>> > > > > All of the changes in this release can be found:
>> > > > https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
>> > > > >
>> > > > > Apache Kafka is high-throughput, publish-subscribe messaging
>> system
>> > > > rethought of as a distributed commit log.
>> > > > >
>> > > > > ** Fast => A single Kafka broker can handle hundreds of megabytes
>> of
>> > > > reads and
>> > > > > writes per second from thousands of clients.
>> > > > >
>> > > > > ** Scalable => Kafka is designed to allow a single cluster to
>> serve
>> > as
>> > > > the central data backbone
>> > > > > for a large organization. It can be elastically and transparently
>> > > > expanded without downtime.
>> > > > > Data streams are partitioned and spread over a cluster of
>> machines to
>> > > > allow data streams
>> > > > > larger than the capability of any single machine and to allow
>> > clusters
>> > > > of co-ordinated consumers.
>> > > > >
>> > > > > ** Durable => Messages are persisted on disk and replicated within
>> > the
>> > > > cluster to prevent
>> > > > > data loss. Each broker can handle terabytes of messages without
>> > > > performance impact.
>> > > > >
>> > > > > ** Distributed by Design => Kafka has a modern cluster-centric
>> design
>> > > > that offers
>> > > > > strong durability and fault-tolerance guarantees.
>> > > > >
>> > > > > You can download the release from:
>> > > > http://kafka.apache.org/downloads.html
>> > > > >
>> > > > > We welcome your help and feedback. For more information on how to
>> > > > > report problems, and to get involved, visit the project website at
>> > > > http://kafka.apache.org/
>> > > > >
>> > > >
>> > > >
>> > >
>> >
>>
>
>


Re: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released

2014-11-03 Thread Jason Rosenberg
Are there any config parameter updates/changes?  I see the doc here:
http://kafka.apache.org/documentation.html#configuration
now defaults to 0.8.2-beta.  But it would be useful to know if anything has
changed from 0.8.1.1, just so we can be sure to update things, etc.



On Sat, Nov 1, 2014 at 11:18 AM, Koert Kuipers  wrote:

> joe,
> looking at those 0.8.2 beta javadoc I also see a Consumer api and
> KafkaConsumer class. they look different from what I currently use in
> 8.1.1. Is this new? And this is not the 0.9 consumer?
> thanks, koert
> On Oct 30, 2014 8:01 AM, "Joe Stein"  wrote:
>
> > Hey, yeah!
> >
> > For the new producer
> > https://archive.apache.org/dist/kafka/0.8.2-beta/java-doc/
> >
> > The java consumer is slated in 0.9 more on that here
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> > On Thu, Oct 30, 2014 at 6:08 AM, Pierre-Yves Ritschard  >
> > wrote:
> >
> > > Hi Joe et al.
> > >
> > > Congrats on the beta release!
> > > Do I read correctly that libraries can now rely on
> > > org.apache.kafka/kafka-clients which does not pull in scala anymore ?
> > >
> > > If so, awesome!
> > >
> > >   - pyr
> > >
> > > On Tue, Oct 28, 2014 at 2:01 AM, Libo Yu  wrote:
> > >
> > > > Congrats! When do you think the final 0.82 will be released?
> > > >
> > > > > To: annou...@apache.org; users@kafka.apache.org;
> > d...@kafka.apache.org
> > > > > Subject: [ANNOUNCEMENT] Apache Kafka 0.8.2-beta Released
> > > > > Date: Tue, 28 Oct 2014 00:50:35 +
> > > > > From: joest...@apache.org
> > > > >
> > > > > The Apache Kafka community is pleased to announce the beta release
> > for
> > > > Apache Kafka 0.8.2.
> > > > >
> > > > > The 0.8.2-beta release introduces many new features, improvements
> and
> > > > fixes including:
> > > > >  - A new Java producer for ease of implementation and enhanced
> > > > performance.
> > > > >  - Delete topic support.
> > > > >  - Per topic configuration of preference for consistency over
> > > > availability.
> > > > >  - Scala 2.11 support and dropping support for Scala 2.8.
> > > > >  - LZ4 Compression.
> > > > >
> > > > > All of the changes in this release can be found:
> > > > https://archive.apache.org/dist/kafka/0.8.2-beta/RELEASE_NOTES.html
> > > > >
> > > > > Apache Kafka is high-throughput, publish-subscribe messaging system
> > > > rethought of as a distributed commit log.
> > > > >
> > > > > ** Fast => A single Kafka broker can handle hundreds of megabytes
> of
> > > > reads and
> > > > > writes per second from thousands of clients.
> > > > >
> > > > > ** Scalable => Kafka is designed to allow a single cluster to serve
> > as
> > > > the central data backbone
> > > > > for a large organization. It can be elastically and transparently
> > > > expanded without downtime.
> > > > > Data streams are partitioned and spread over a cluster of machines
> to
> > > > allow data streams
> > > > > larger than the capability of any single machine and to allow
> > clusters
> > > > of co-ordinated consumers.
> > > > >
> > > > > ** Durable => Messages are persisted on disk and replicated within
> > the
> > > > cluster to prevent
> > > > > data loss. Each broker can handle terabytes of messages without
> > > > performance impact.
> > > > >
> > > > > ** Distributed by Design => Kafka has a modern cluster-centric
> design
> > > > that offers
> > > > > strong durability and fault-tolerance guarantees.
> > > > >
> > > > > You can download the release from:
> > > > http://kafka.apache.org/downloads.html
> > > > >
> > > > > We welcome your help and feedback. For more information on how to
> > > > > report problems, and to get involved, visit the project website at
> > > > http://kafka.apache.org/
> > > > >
> > > >
> > > >
> > >
> >
>


kafka producer example

2014-11-03 Thread Sa Li
Hi, All

I am running the kafka producer code:

import java.util.*;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();

Properties props = new Properties();
props.put("metadata.broker.list", "10.100.70.128:9092,
10.100.70.128:9093,10.100.70.128:9094");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner"
);
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);

Producer producer = new Producer(config);

for (long nEvents = 0; nEvents < events; nEvents++) {
   long runtime = new Date().getTime();
   String ip = “192.168.2.” + rnd.nextInt(255);
   String msg = runtime + “,www.example.com,” + ip;
   KeyedMessage data = new KeyedMessage("page_visits", ip, msg);
   producer.send(data);
}
producer.close();
}
}

It should be straightforwards, but when I compile it in IntelliJ IDEA, I
got such error
 Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:2.5.1:compile
(default-compile) on project kafka-producer: Compilation failure
[ERROR]
C:\Users\sa\Desktop\Workload\kafka\kafkaprj\kafka-json-producer\src\main\java\kafka\example\TestProducer.java:[35,20]
error: cannot access Serializable, which is the producer object.

Any idea of this?

thanks

-- 

Alec Li


Re: Spark Kafka Performance

2014-11-03 Thread Gwen Shapira
Not sure about the throughput, but:

"I mean that the words counted in spark should grow up" - The spark
word-count example doesn't accumulate.
It gets an RDD every n seconds and counts the words in that RDD. So we
don't expect the count to go up.



On Mon, Nov 3, 2014 at 6:57 AM, Eduardo Costa Alfaia  wrote:

> Hi Guys,
> Anyone could explain me how to work Kafka with Spark, I am using the
> JavaKafkaWordCount.java like a test and the line command is:
>
> ./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount
> spark://192.168.0.13:7077 computer49:2181 test-consumer-group unibs.it 3
>
> and like a producer I am using this command:
>
> rdkafka_cachesender -t unibs.nec -p 1 -b 192.168.0.46:9092 -f output.txt
> -l 100 -n 10
>
>
> rdkafka_cachesender is a program that was developed by me which send to
> kafka the output.txt’s content where -l is the length of each send(upper
> bound) and -n is the lines to send in a row. Bellow is the throughput
> calculated by the program:
>
> File is 2235755 bytes
> throughput (b/s) = 699751388
> throughput (b/s) = 723542382
> throughput (b/s) = 662989745
> throughput (b/s) = 505028200
> throughput (b/s) = 471263416
> throughput (b/s) = 446837266
> throughput (b/s) = 409856716
> throughput (b/s) = 373994467
> throughput (b/s) = 366343097
> throughput (b/s) = 373240017
> throughput (b/s) = 386139016
> throughput (b/s) = 373802209
> throughput (b/s) = 369308515
> throughput (b/s) = 366935820
> throughput (b/s) = 365175388
> throughput (b/s) = 362175419
> throughput (b/s) = 358356633
> throughput (b/s) = 357219124
> throughput (b/s) = 352174125
> throughput (b/s) = 348313093
> throughput (b/s) = 355099099
> throughput (b/s) = 348069777
> throughput (b/s) = 348478302
> throughput (b/s) = 340404276
> throughput (b/s) = 339876031
> throughput (b/s) = 339175102
> throughput (b/s) = 327555252
> throughput (b/s) = 324272374
> throughput (b/s) = 322479222
> throughput (b/s) = 319544906
> throughput (b/s) = 317201853
> throughput (b/s) = 317351399
> throughput (b/s) = 315027978
> throughput (b/s) = 313831014
> throughput (b/s) = 310050384
> throughput (b/s) = 307654601
> throughput (b/s) = 305707061
> throughput (b/s) = 307961102
> throughput (b/s) = 296898200
> throughput (b/s) = 296409904
> throughput (b/s) = 294609332
> throughput (b/s) = 293397843
> throughput (b/s) = 293194876
> throughput (b/s) = 291724886
> throughput (b/s) = 290031314
> throughput (b/s) = 289747022
> throughput (b/s) = 289299632
>
> The throughput goes down after some seconds and it does not maintain the
> performance like the initial values:
>
> throughput (b/s) = 699751388
> throughput (b/s) = 723542382
> throughput (b/s) = 662989745
>
> Another question is about spark, after I have started the spark line
> command after 15 sec spark continue to repeat the words counted, but my
> program continue to send words to kafka, so I mean that the words counted
> in spark should grow up. I have attached the log from spark.
>
> My Case is:
>
> ComputerA(Kafka_cachsesender) -> ComputerB(Kakfa-Brokers-Zookeeper) ->
> ComputerC (Spark)
>
> If I don’t explain very well send a reply to me.
>
> Thanks Guys
> --
> Informativa sulla Privacy: http://www.unibs.it/node/8155
>


Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-03 Thread Mohit Kathuria
Neha,

In my last reply, the subject got changed thats why it got marked as new
message on
http://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/date.
Please ignore that. Below text is the reply in continuation to
http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAALehC3X2i7+n8aaEkXtxBduwUUp6Zk7=-jxn8yrdcjy1f...@mail.gmail.com%3E


Looks like an issue with the consumer rebalance not able to complete
successfully. We were able to reproduce the issue on topic with 30
partitions,  3 consumer processes(p1,p2 and p3), properties -  40
rebalance.max.retries and 1(10s) rebalance.backoff.ms.

Before the process p3 was started, partition ownership was as expected:

partitions 0-14 owned by p1
partitions 15-29 -> owner p2

As the process p3 started, rebalance was triggered. Process p3 was
successfully able to acquire partition ownership for partitions 20-29 as
expected as per the rebalance algorithm. However, process p2 while trying
to acquire ownership of partitions 10-19 saw rebalance failure after 40
retries.

Attaching the logs from process p2 and process p1. It says that p2 was
attempting to rebalance, it was trying to acquire ownership of partitions
10-14 which were owned by process p1. However, at the same time process p1
did not get any event for giving up the partition ownership for partitions
1-14.
We were expecting a rebalance to have triggered in p1 - but it didn't and
hence not giving up ownership. Is our assumption correct/incorrect?
And if the rebalance gets triggered in p1 - how to figure out apart from
logs as the logs on p1 did not have anything.

*2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
[topic_consumerIdString], waiting for the partition ownership to be
deleted: 11*

During and after the rebalance failed on process p2, Partition Ownership
was as below:
0-14 -> owner p1
15-19 -> none
20-29 -> owner p3

This left the consumers in inconsistent state as 5 partitions were never
consumer from and neither was the partitions ownership balanced.

However, there was no conflict in creating the ephemeral node which was the
case last time. Just to note that the ephemeral node conflict which we were
seeing earlier also appeared after rebalance failed. My hunch is that
fixing the rebalance failure will fix that issue as well.

-Thanks,
Mohit



On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede 
wrote:

> Mohit,
>
> I wonder if it is related to
> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a
> session, it doesn't delete the ephemeral nodes immediately. So if you end
> up trying to recreate ephemeral nodes quickly, it could either be in the
> valid latest session or from the previously expired session. If you hit
> this problem, then waiting would resolve it. But if not, then this may be a
> legitimate bug in ZK 3.4.6.
>
> Can you try shutting down all your consumers, waiting until session timeout
> and restarting them?
>
> Thanks,
> Neha
>
> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria 
> wrote:
>
> > Dear Experts,
> >
> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
> > topic with 30 partitions and 2 replicas. We are using High level consumer
> > api.
> > Each consumer process which is a storm topolofy has 5 streams which
> > connects to 1 or more partitions. We are not using storm's inbuilt kafka
> > spout. Everything runs fine till the 5th consumer process(25 streams) is
> > added for this topic.
> >
> > As soon as the sixth consumer process is added, the newly added partition
> > does not get the ownership of the partitions that it requests for as the
> > already existing owners have not yet given up the ownership.
> >
> > We changed certain properties on consumer :
> >
> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
> > rebalance.max.retries >> zk connection timeout)
> > 2. Back off ms between rebalances - 1 (10seconds)
> > 3. ZK connection timeout - 100,000 (100 seconds)
> >
> > Although when I am looking in the zookeeper shell when the rebalance is
> > happening, the consumer is registered fine on the zookeeper. Just that
> the
> > rebalance does not happen.
> > After the 20th rebalance gets completed, we get
> >
> >
> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
> > offsets after clearing the fetcher queues*
> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
> > exception while trying to start streamer threads:
> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
> after
> > 20 retries*
> > *kafka.common.ConsumerRebalanceFailedException:
> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
> after
> > 20 retries*
> > *at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
> > ~[stormjar.jar:na]*
> > *at
> >
> >
> kafka.consumer.ZookeeperConsu

Re: Can both uncompressed data and compressed data reside in the same topic (partition)

2014-11-03 Thread Zuoning Yin
Thanks for the clarification, Magnus!

On Mon, Nov 3, 2014 at 11:15 AM, Magnus Edenhill  wrote:

> Hi Zuoning,
> since compression is a per-message(set) attribute a topic can have both
> compressed and uncompressed messages, as Guozhang says,
> and yes, this is supported by both the broker and client (librdkafka in
> this case).
>
> Regards,
> Magnus
>
>
> 2014-10-31 17:14 GMT+01:00 Zuoning Yin :
>
> > Hi Guozhang,
> > Just want to double check: does this have some requirement on the
> Kafka
> > version (we are using 0.8.0) or the kafka client (we are using
> librdkafka)?
> >
> > Thanks,
> > --Zuoning
> >
> > On Fri, Oct 31, 2014 at 12:09 PM, Zuoning Yin 
> > wrote:
> >
> > > Thanks so much for the reply, Guozhang!
> > >
> > > On Fri, Oct 31, 2014 at 12:06 PM, Guozhang Wang 
> > > wrote:
> > >
> > >> Hi Zuoning,
> > >>
> > >> Yes, the same topic's partitions can hold both compressed and
> > uncompressed
> > >> data, and the consumer is able to read them in mixed mode.
> > >>
> > >> Guozhang
> > >>
> > >> On Fri, Oct 31, 2014 at 8:40 AM, Zuoning Yin 
> > >> wrote:
> > >>
> > >> > Hey Guys,
> > >> >   If at the beginning, I configure the producer to not use
> > >> compression
> > >> > and produce a number of message to a topic. Then later, I
> reconfigure
> > >> the
> > >> > producer to use compression and produce another batch of messages to
> > the
> > >> > same topic. Can a consumer correctly read all the messages in the
> > topic?
> > >> > Will the consumer stop reading correctly when it reach the
> compressed
> > >> part?
> > >> >
> > >> > Thanks so much,
> > >> > --Zuoning
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> > >
> >
>


Re: Can both uncompressed data and compressed data reside in the same topic (partition)

2014-11-03 Thread Magnus Edenhill
Hi Zuoning,
since compression is a per-message(set) attribute a topic can have both
compressed and uncompressed messages, as Guozhang says,
and yes, this is supported by both the broker and client (librdkafka in
this case).

Regards,
Magnus


2014-10-31 17:14 GMT+01:00 Zuoning Yin :

> Hi Guozhang,
> Just want to double check: does this have some requirement on the Kafka
> version (we are using 0.8.0) or the kafka client (we are using librdkafka)?
>
> Thanks,
> --Zuoning
>
> On Fri, Oct 31, 2014 at 12:09 PM, Zuoning Yin 
> wrote:
>
> > Thanks so much for the reply, Guozhang!
> >
> > On Fri, Oct 31, 2014 at 12:06 PM, Guozhang Wang 
> > wrote:
> >
> >> Hi Zuoning,
> >>
> >> Yes, the same topic's partitions can hold both compressed and
> uncompressed
> >> data, and the consumer is able to read them in mixed mode.
> >>
> >> Guozhang
> >>
> >> On Fri, Oct 31, 2014 at 8:40 AM, Zuoning Yin 
> >> wrote:
> >>
> >> > Hey Guys,
> >> >   If at the beginning, I configure the producer to not use
> >> compression
> >> > and produce a number of message to a topic. Then later, I reconfigure
> >> the
> >> > producer to use compression and produce another batch of messages to
> the
> >> > same topic. Can a consumer correctly read all the messages in the
> topic?
> >> > Will the consumer stop reading correctly when it reach the compressed
> >> part?
> >> >
> >> > Thanks so much,
> >> > --Zuoning
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>


Re: question about console producer behavior

2014-11-03 Thread Guozhang Wang
Hi,

Producers will only refresh their metadata periodically if there is no
exceptions caught sending the data, you can config this period in "
topic.metadata.refresh.interval.ms" (default is 600 seconds).

Guozhang

On Mon, Nov 3, 2014 at 6:51 AM, raymond  wrote:

> Hi
>
> It appears to me that, when I have opened a console producer, it will not
> notice the topic’s partition change?
>
> say I have a topic with 3 partitions. And I opened a console producer to
> produce data with key, then it will distribute data into 3 partitions. Then
> , I keep this producer open, and change the topic’s partition to e.g. 4.
> This producer will not distribute data into the 4th partition, while a
> newly created producer will.
>
> Is this behavior expected?
>
>


-- 
-- Guozhang


Re: Indefinite growth of FetchRequestPurgatory

2014-11-03 Thread Guozhang Wang
Hi Andras,

Could you try 0.8.2-beta and see if this issue comes out again? We fixed a
couple of the purgatory issues (e.g. KAFKA-1616
) in 0.8.2, but I do not
remember any of them will cause OOM.

Guozhang

On Mon, Nov 3, 2014 at 5:42 AM, András Serény 
wrote:

> Hi Kafka users,
>
> we're running a cluster of two Kafka 0.8.1.1 brokers, with a twofold
> replicaton of each topic.
>
> When both brokers are up, after a short while the FetchRequestPurgatory
> starts to grow indefinitely on the leader (detectable via a heap dump and
> also via the "FetchRequestPurgatory"."PurgatorySize" JMX metric),
> eventually leading to an OOM error. When one of the brokers is shut down,
> the purgatory stops growing in size, and the remaining broker runs fine. In
> https://issues.apache.org/jira/browse/KAFKA-1016, I see this can occur
> when a fetcher specifies a too large max wait time, but we don't override
> replica.fetch.wait.max.ms, leaving it at the default 500 ms.
>
> Do you have any suggestions what can be the cause and how to fix it?
>
> Thanks a lot,
> András
>



-- 
-- Guozhang


Re: Kafka Cluster disaster decovery

2014-11-03 Thread Guozhang Wang
Yingkai,

Kafka uses persistent storage so the data written to it will not be lost,
you just need to restart the cluster. But during the down time it will
become un-available.

Guozhang



On Fri, Oct 31, 2014 at 2:06 PM, Yingkai Hu  wrote:

> Hi All,
>
> I’m new to Kafka, please direct me to the right path if it is a duplicate
> question.
>
> Basically I deployed Kafka to a 4 machine cluster, what if the whole
> cluster went down, does kafka provide any backup/restore mechanism? Please
> advise.
>
> Thanks!
> Yingkai




-- 
-- Guozhang


corrupt message

2014-11-03 Thread Fredrik S Loekke
When running a C# producer against a kafka 0.8.1.1 server running on a virtual 
linux (virtualbox, Ubuntu) I keep getting the following error:

[2014-11-03 15:19:08,595] ERROR [KafkaApi-0] Error processing ProducerRequest 
with correlation id 601 from client Kafka-Net on partition [x,0] 
(kafka.server.KafkaApis)
kafka.message.InvalidMessageException: Message is corrupt (stored crc = 
1767811542, computed crc = 1256103753)
at kafka.message.Message.ensureValid(Message.scala:166)
at 
kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:330)
at 
kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala:318)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:318)
at kafka.log.Log.append(Log.scala:231)
at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
at 
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
at 
kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at 
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at 
scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
at 
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:95)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:233)
at scala.collection.mutable.HashMap.map(HashMap.scala:45)
at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
at java.lang.Thread.run(Thread.java:745)

Any suggestion on how to resolve this issue?

Best regards / Med venlig hilsen

Fredrik Skeel Løkke
Software Developer ǀ IT & Analysis

Mob.: +45 3176 8438
f...@lindcapital.com

[Beskrivelse: Beskrivelse: Beskrivelse: Beskrivelse: 
cid:image001.png@01CD4A0C.218B6960]

Lind Capital A/S
Værkmestergade 3, 2
DK-8000 Aarhus C
www.lindcapital.com
Follow us on
­[linkedin]  [facebook] 




Spark Kafka Performance

2014-11-03 Thread Eduardo Costa Alfaia
Hi Guys,
Anyone could explain me how to work Kafka with Spark, I am using the 
JavaKafkaWordCount.java like a test and the line command is:

./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount 
spark://192.168.0.13:7077 computer49:2181 test-consumer-group unibs.it 3

and like a producer I am using this command:

rdkafka_cachesender -t unibs.nec -p 1 -b 192.168.0.46:9092 -f output.txt -l 100 
-n 10


rdkafka_cachesender is a program that was developed by me which send to kafka 
the output.txt’s content where -l is the length of each send(upper bound) and 
-n is the lines to send in a row. Bellow is the throughput calculated by the 
program:

File is 2235755 bytes
throughput (b/s) = 699751388
throughput (b/s) = 723542382
throughput (b/s) = 662989745
throughput (b/s) = 505028200
throughput (b/s) = 471263416
throughput (b/s) = 446837266
throughput (b/s) = 409856716
throughput (b/s) = 373994467
throughput (b/s) = 366343097
throughput (b/s) = 373240017
throughput (b/s) = 386139016
throughput (b/s) = 373802209
throughput (b/s) = 369308515
throughput (b/s) = 366935820
throughput (b/s) = 365175388
throughput (b/s) = 362175419
throughput (b/s) = 358356633
throughput (b/s) = 357219124
throughput (b/s) = 352174125
throughput (b/s) = 348313093
throughput (b/s) = 355099099
throughput (b/s) = 348069777
throughput (b/s) = 348478302
throughput (b/s) = 340404276
throughput (b/s) = 339876031
throughput (b/s) = 339175102
throughput (b/s) = 327555252
throughput (b/s) = 324272374
throughput (b/s) = 322479222
throughput (b/s) = 319544906
throughput (b/s) = 317201853
throughput (b/s) = 317351399
throughput (b/s) = 315027978
throughput (b/s) = 313831014
throughput (b/s) = 310050384
throughput (b/s) = 307654601
throughput (b/s) = 305707061
throughput (b/s) = 307961102
throughput (b/s) = 296898200
throughput (b/s) = 296409904
throughput (b/s) = 294609332
throughput (b/s) = 293397843
throughput (b/s) = 293194876
throughput (b/s) = 291724886
throughput (b/s) = 290031314
throughput (b/s) = 289747022
throughput (b/s) = 289299632

The throughput goes down after some seconds and it does not maintain the 
performance like the initial values:

throughput (b/s) = 699751388
throughput (b/s) = 723542382
throughput (b/s) = 662989745

Another question is about spark, after I have started the spark line command 
after 15 sec spark continue to repeat the words counted, but my program 
continue to send words to kafka, so I mean that the words counted in spark 
should grow up. I have attached the log from spark.
  
My Case is:

ComputerA(Kafka_cachsesender) -> ComputerB(Kakfa-Brokers-Zookeeper) -> 
ComputerC (Spark)
 
If I don’t explain very well send a reply to me.

Thanks Guys
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


question about console producer behavior

2014-11-03 Thread raymond
Hi

It appears to me that, when I have opened a console producer, it will not 
notice the topic’s partition change?

say I have a topic with 3 partitions. And I opened a console producer to 
produce data with key, then it will distribute data into 3 partitions. Then , I 
keep this producer open, and change the topic’s partition to e.g. 4. This 
producer will not distribute data into the 4th partition, while a newly created 
producer will.

Is this behavior expected? 



Indefinite growth of FetchRequestPurgatory

2014-11-03 Thread András Serény

Hi Kafka users,

we're running a cluster of two Kafka 0.8.1.1 brokers, with a twofold 
replicaton of each topic.


When both brokers are up, after a short while the FetchRequestPurgatory 
starts to grow indefinitely on the leader (detectable via a heap dump 
and also via the "FetchRequestPurgatory"."PurgatorySize" JMX metric), 
eventually leading to an OOM error. When one of the brokers is shut 
down, the purgatory stops growing in size, and the remaining broker runs 
fine. In https://issues.apache.org/jira/browse/KAFKA-1016, I see this 
can occur when a fetcher specifies a too large max wait time, but we 
don't override replica.fetch.wait.max.ms, leaving it at the default 500 ms.


Do you have any suggestions what can be the cause and how to fix it?

Thanks a lot,
András


Fwd: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-03 Thread Mohit Kathuria
Neha,

Looks like an issue with the consumer rebalance not able to complete
successfully. We were able to reproduce the issue on topic with 30
partitions,  3 consumer processes(p1,p2 and p3), properties -  40
rebalance.max.retries and 1(10s) rebalance.backoff.ms.

Before the process p3 was started, partition ownership was as expected:

partitions 0-14 owned by p1
partitions 15-29 -> owner p2

As the process p3 started, rebalance was triggered. Process p3 was
successfully able to acquire partition ownership for partitions 20-29 as
expected as per the rebalance algorithm. However, process p2 while trying
to acquire ownership of partitions 10-19 saw rebalance failure after 40
retries.

Attaching the logs from process p2 and process p1. It says that p2 was
attempting to rebalance, it was trying to acquire ownership of partitions
10-14 which were owned by process p1. However, at the same time process p1
did not get any event for giving up the partition ownership for partitions
1-14.
We were expecting a rebalance to have triggered in p1 - but it didn't and
hence not giving up ownership. Is our assumption correct/incorrect?
And if the rebalance gets triggered in p1 - how to figure out apart from
logs as the logs on p1 did not have anything.

*2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
[topic_consumerIdString], waiting for the partition ownership to be
deleted: 11*

During and after the rebalance failed on process p2, Partition Ownership
was as below:
0-14 -> owner p1
15-19 -> none
20-29 -> owner p3

This left the consumers in inconsistent state as 5 partitions were never
consumer from and neither was the partitions ownership balanced.

However, there was no conflict in creating the ephemeral node which was the
case last time. Just to note that the ephemeral node conflict which we were
seeing earlier also appeared after rebalance failed. My hunch is that
fixing the rebalance failure will fix that issue as well.

-Thanks,
Mohit


kafka.message.InvalidMessageException: Message is corrupt

2014-11-03 Thread Fredrik S Loekke
Hi

We are experimenting with running kafka server on a windows machine, but keep 
getting exeptions when producing a lot of messages (in the neighborhood of 1 
million)..

kafka.message.InvalidMessageException: Message is corrupt (stored crc = 29639294
31, computed crc = 2364631640)
at kafka.message.Message.ensureValid(Message.scala:166)
at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala
:330)
at kafka.log.Log$$anonfun$analyzeAndValidateMessageSet$1.apply(Log.scala
:318)
at scala.collection.Iterator$class.foreach(Iterator.scala:772)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
at kafka.log.Log.analyzeAndValidateMessageSet(Log.scala:318)
at kafka.log.Log.append(Log.scala:231)

Any suggestions?

Best regards / Med venlig hilsen

Fredrik Skeel Løkke
Software Developer ǀ IT & Analysis

Mob.: +45 3176 8438
f...@lindcapital.com

[Beskrivelse: Beskrivelse: Beskrivelse: Beskrivelse: 
cid:image001.png@01CD4A0C.218B6960]

Lind Capital A/S
Værkmestergade 3, 2
DK-8000 Aarhus C
www.lindcapital.com
Follow us on
­[linkedin]  [facebook]