Re: Kafka New(Java) Producer Connection reset by peer error and LB

2015-02-09 Thread Bhavesh Mistry
HI Kafka Team,

Please confirm if you would like to open Jira issue to track this ?

Thanks,

Bhavesh

On Mon, Feb 9, 2015 at 12:39 PM, Bhavesh Mistry 
wrote:

> Hi Kakfa Team,
>
> We are getting this connection reset by pears after couple of minute aster
> start-up of producer due to infrastructure deployment strategies we have
> adopted from LinkedIn.
>
> We have LB hostname and port as seed server, and all producers are getting
> following exception because of TCP idle connection timeout set on LB (which
> is 2 minutes and Kafka TCP connection idle is set to 10 minutes).   This
> seems to be  minor bug to close TCP connection after discovering that seed
> server is not part of brokers list immediately.
>
>
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> at
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
> at
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> at java.lang.Thread.run(Thread.java:662)
>
>
> Thanks,
>
> Bhavesh
>
>


Re: Get Latest Offset for Specific Topic for All Partition

2015-02-09 Thread Bhavesh Mistry
Hi Gwen,

This JMX stats  is good for calculate  injection rate per partition.  I do
not have to depend on ZK to figuring out who is leader what is latest
offset.

One quick question,  what is Size # ?  is it # of bytes particular
partition has on disk ?   Unfortunately, MBean description is very limited
and does not help much(it is "Information on the management interface of
the MBean").  It is gauge that is all I can tell .

1189855393 LogEndOffset
1165330350 Size
1176813232 LogStartOffset

Thanks for your help !!

Thanks,

Bhaevsh

Thanks,
Bhavesh

On Thu, Feb 5, 2015 at 12:55 PM, Gwen Shapira  wrote:

> You can use the metrics Kafka publishes.  I think the relevant metrics are:
> Log.LogEndOffset
> Log.LogStartOffset
> Log.size
>
>
> Gwen
>
> On Thu, Feb 5, 2015 at 11:54 AM, Bhavesh Mistry
>  wrote:
> > HI All,
> >
> > I just need to get the latest offset # for topic (not for consumer
> group).
> > Which API to get this info ?
> >
> > My use case is to analyze the data injection rate to each of  partition
> is
> > uniform or not (close). For this,  I am planing to dump the latest offset
> > into graphite  for each partition and look at derivative over time.
> >
> > Thanks,
> >
> > Bhavesh
>


Re: Kafka producer perf script throw java.io.IOException

2015-02-09 Thread Xinyi Su
Hi, Jun
I create a Jira issue to track it. Please have a look at it.

https://issues.apache.org/jira/browse/KAFKA-1939

Thanks.
Xinyi

On 10 February 2015 at 10:38, Jun Rao  wrote:

> The new producer uses a different built-in metrics package. Currently, it
> only supports a jmx reporter for the metrics. So you will have to get the
> metrics from jmx.
>
> We can add the csv reporter in ProducerPerformance for the new producer by
> using the new metrics api. Could you file a jira for that?
>
> Thanks,
>
> Jun
>
> On Wed, Feb 4, 2015 at 11:48 PM, Xinyi Su  wrote:
>
> > Hi,
> >
> > You are right. The file is created by Kafka and Kafka producer csv
> reporter
> > append metrics continuously during perf test. It should not be created
> > continuously during test but create once only.
> >
> > Another thing is when I turn on "--new-producer" and "--sync" option,
> > nothing is appended into  ProducerRequestSize.csv. From source code,  I
> > have not got any NewShinyProducer record stats code. It may be the
> reason.
> >
> > Thanks.
> > Xinyi
> >
> > On 5 February 2015 at 15:26, Jaikiran Pai 
> > wrote:
> >
> > > >> java.io.IOException: Unable to create /tmp/PerfTopic22_1/
> > > ProducerRequestSize.csv
> > >
> > > It looks like a file with that exact same name already exists which is
> > > causing that file creation request to fail. This indicates that
> probably
> > > the metric name (ProducerRequestSize) from which the file is created,
> is
> > > duplicate for whatever reason.
> > >
> > > -Jaikiran
> > >
> > >
> > > On Thursday 05 February 2015 12:49 PM, Xinyi Su wrote:
> > >
> > >> Hi,
> > >>
> > >> I need to get more metrics from csv reporter.  If turn off
> csv-reporter,
> > >> few output is shown.
> > >>
> > >> Thanks.
> > >> Xinyi
> > >>
> > >> On 5 February 2015 at 13:09, tao xiao  wrote:
> > >>
> > >>  Hi,
> > >>>
> > >>> In order to get it work you can turn off csv-reporter.
> > >>>
> > >>> On Thu, Feb 5, 2015 at 1:06 PM, Xinyi Su  wrote:
> > >>>
> > >>>  Hi,
> > 
> >  Today I updated Kafka cluster from 0.8.2-beta to 0.8.2.0 and run
> kafka
> >  producer performance test.
> > 
> >  The test cannot continue because of some exceptions thrown which
> does
> >  not
> >  occur at 0.8.2-beta. My perf library is kafka-perf_2.9.2-0.8.0.jar
> > which
> > 
> > >>> is
> > >>>
> >  the latest version on maven repository.
> > 
> >  -bash-4.1$ bin/kafka-producer-perf-test.sh   --broker-list  >  list>
> >  --topics PerfTopic22 --sync --initial-message-id 1 --messages 20
> >  --csv-reporter-enabled --metrics-dir /tmp/PerfTopic22_1
> >  --message-send-gap-ms 20 --request-num-acks -1 --batch-size 1
> > 
> >  java.io.IOException: Unable to create
> >  /tmp/PerfTopic22_1/ProducerRequestSize.csv
> >  at
> > 
> > 
> >   com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(
> > >>> CsvReporter.java:141)
> > >>>
> >  at
> > 
> > 
> >   com.yammer.metrics.reporting.CsvReporter.getPrintStream(
> > >>> CsvReporter.java:257)
> > >>>
> >  at
> > 
> > >>>
> > com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22)
> > >>>
> >  at
> > 
> >   com.yammer.metrics.reporting.CsvReporter$1.getStream(
> > >>> CsvReporter.java:156)
> > >>>
> >  at
> > 
> > 
> >   com.yammer.metrics.reporting.CsvReporter.processHistogram(
> > >>> CsvReporter.java:194)
> > >>>
> >  at
> > 
> > 
> >   com.yammer.metrics.reporting.CsvReporter.processHistogram(
> > >>> CsvReporter.java:22)
> > >>>
> >  at com.yammer.metrics.core.Histogram.processWith(Histogram.java:231)
> >  at
> com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163)
> >  at
> > 
> > >>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> > >>>
> >  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
> >  at
> > 
> > 
> >   java.util.concurrent.ScheduledThreadPoolExecutor$
> > >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> > >>>
> >  at
> > 
> > 
> >   java.util.concurrent.ScheduledThreadPoolExecutor$
> > >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> > >>>
> >  at
> > 
> > 
> >   java.util.concurrent.ThreadPoolExecutor.runWorker(
> > >>> ThreadPoolExecutor.java:1145)
> > >>>
> >  at
> > 
> > 
> >   java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > >>> ThreadPoolExecutor.java:615)
> > >>>
> >  at java.lang.Thread.run(Thread.java:745)
> > 
> > 
> > >>>
> > >>> --
> > >>> Regards,
> > >>> Tao
> > >>>
> > >>>
> > >
> >
>


Re: Delete topic functionality can't use in 0.8.1

2015-02-09 Thread Stephen Boesch
Ryco, you are correct: delete topic is a new feature for 0.8.2

2015-02-09 19:53 GMT-08:00 Ryco Xiao :

> when I exec the delete command,return information is below:
> It mark the kafka-topic.sh not support the delete parameter.
> my package is compiled by myself.
>
>
> ​
>


Re: How to fetch old messages from kafka

2015-02-09 Thread Snehalata Nagaje

Hi Mayuresh,


Does it mean , we need to maintain the offset in db, After every message post, 
we can check the last offset maintained in db, for e.g it is 20, then on every 
message post we can check latest offset present in given topic, if it greater 
that 30, then we can add one more entry in db.

And while consuming in reverse direction, we can give offset 20, with certain 
size of bytes, using simple comsumer, once while reading once we reach next 
offset 30, stop consuming, display those set of messages?

But is the offset numbers are sequential?, Can I assume one message is 
represented by one offset?

Thanks,
Snehalata



- Original Message -
From: "Mayuresh Gharat" 
To: users@kafka.apache.org
Sent: Friday, February 6, 2015 3:48:42 AM
Subject: Re: How to fetch old messages from kafka

If you see the code for getOffsetsBefore() :

 /**
   *  Get a list of valid offsets (up to maxSize) before the given time.
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object.
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */
  def getOffsetsBefore(request: *OffsetRequest*):
kafka.javaapi.OffsetResponse = {
import kafka.javaapi.Implicits._
underlying.getOffsetsBefore(request.underlying)
  }


It says get Offsets before a valid time.

Also if you dig deeper in to OffsetRequest :

case class OffsetRequest(requestInfo: Map[TopicAndPartition,
*PartitionOffsetRequestInfo*],
 versionId: Short = OffsetRequest.CurrentVersion,
 correlationId: Int = 0,
 clientId: String = OffsetRequest.DefaultClientId,
 replicaId: Int = Request.OrdinaryConsumerId)
extends RequestOrResponse(Some(RequestKeys.OffsetsKey))


case class PartitionOffsetRequestInfo(*time: Long*, maxNumOffsets: Int)


So I don't think you can use it.
What you can do is maintain the beginning and end offsets for each page,
for your example it would be range 10 and use those to do the resetting.
You can maintain the max offsets the user can go back, it would be like
maintaining a sliding window and moving the window back only some
configurable max number of times.


Thanks,

Mayuresh



On Wed, Feb 4, 2015 at 1:29 AM, Snehalata Nagaje <
snehalata.nag...@harbingergroup.com> wrote:

>
>
> Hi Mayuresh,
>
>
> Thanks for quick response.
>
> We can reset the offset and get first 10 messages, but since we need to
> back in reverse sequence, suppose user has consumed messages upto 100
> offset , currently there are only last 10 messages are visible, from 100
> -90, now I want to retrieve messages from 80 to 90, how can we do that?
>
> Can we use getOffsetBefore() function to get valid offset before given
> time, this will return all valid offsets. we can get all valid offsets
> before latest time.
>
> Then we can fetch messages from any given valid offset returned from
> getOffsetBefore().
>
> is this correct approach?
>
> Thanks,
> Snehalata
>
> - Original Message -
> From: gharatmayures...@gmail.com
> To: users@kafka.apache.org
> Sent: Wednesday, February 4, 2015 11:24:46 AM
> Subject: Re: How to fetch old messages from kafka
>
> In that case you will have to maintain the offsets consumed and reset the
> offsets in case you need to consume from past.
>
> For example, suppose you have a userA for which you have a partitionA for
> topic TopicA. Each page shown to user increments the offset by 10. You have
> consumed till offset 100 and the user wants to go back 1 page you will have
> to reset the offset for TopicA partitionA in the zookeeper. Since you are
> using simple consumer the offset management has to be done by your
> application.
>
> Thanks,
>
> Mayuresh
>
> Sent from my iPhone
>
> > On Feb 3, 2015, at 9:18 PM, Snehalata Nagaje <
> snehalata.nag...@harbingergroup.com> wrote:
> >
> >
> >
> > Hi ,
> >
> >
> > We are using kafka for storing messages in chat application.
> >
> > Currently we divided each topic in multiple partitions. each partition
> stores data for given customer who uses the application.
> >
> > Right now on very first request, application fetches log from kafka from
> earliest valid offset to maxiumum 10 bytes. hence it reads all messages
> for given topic
> >
> > for given partition. Now we want to apply pagination as linkedin,
> facebook does. Only latest 10-15 messages should be displayed. And then on
> scroll down
> >
> > fetch next set of previous messages, we are using Simple consumer to
> fetch messages.
> >
> > Can you please guide on this?
> >
> >
> > Thanks,
> > Snehalata
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: could new java producer miss callbacks after successful send?

2015-02-09 Thread Steven Wu
I don't have strong evidence that this is a bug yet. let me write some test
program and see if I can confirm/reproduce the issue.

On Mon, Feb 9, 2015 at 7:59 PM, Jay Kreps  wrote:

> Hmm, that does sound like a bug, we haven't seen that. How easy is it to
> reproduce this?
>
> -Jay
>
> On Mon, Feb 9, 2015 at 5:19 PM, Steven Wu  wrote:
>
> > We observed some small discrepancy in messages sent per second reported
> at
> > different points. 1) and 4) matches very close. 2) and 3) matches very
> > close but are about *5-6% lower* compared to 1) and 4).
> > 1) send attempt from producer
> > 2) send success from producer
> > 3) record-send-rate reported by kafka producer
> > 4) MessagesInPerSecond reported by kafka broker
> >
> > note that send success for 2) is incremented when onCompletion is called
> > without error/exception). there is also a send error count when
> > onCompletion is called with error. it is always zero.
> >
> > that's why I am wondering whether there are some callback misses?
> >
> > some info about the setup
> > * producer: 0.8.2-beta
> > * broker: 0.8.1.1
> > * acks=1
> >
> > Thanks,
> > Steven
> >
>


Delete topic functionality can't use in 0.8.1

2015-02-09 Thread Ryco Xiao
when I exec the delete command,return information is below:
It mark the kafka-topic.sh not support the delete parameter.
my package is compiled by myself.


​


Re: could new java producer miss callbacks after successful send?

2015-02-09 Thread Jay Kreps
Hmm, that does sound like a bug, we haven't seen that. How easy is it to
reproduce this?

-Jay

On Mon, Feb 9, 2015 at 5:19 PM, Steven Wu  wrote:

> We observed some small discrepancy in messages sent per second reported at
> different points. 1) and 4) matches very close. 2) and 3) matches very
> close but are about *5-6% lower* compared to 1) and 4).
> 1) send attempt from producer
> 2) send success from producer
> 3) record-send-rate reported by kafka producer
> 4) MessagesInPerSecond reported by kafka broker
>
> note that send success for 2) is incremented when onCompletion is called
> without error/exception). there is also a send error count when
> onCompletion is called with error. it is always zero.
>
> that's why I am wondering whether there are some callback misses?
>
> some info about the setup
> * producer: 0.8.2-beta
> * broker: 0.8.1.1
> * acks=1
>
> Thanks,
> Steven
>


Re: Kafka producer perf script throw java.io.IOException

2015-02-09 Thread Jun Rao
The new producer uses a different built-in metrics package. Currently, it
only supports a jmx reporter for the metrics. So you will have to get the
metrics from jmx.

We can add the csv reporter in ProducerPerformance for the new producer by
using the new metrics api. Could you file a jira for that?

Thanks,

Jun

On Wed, Feb 4, 2015 at 11:48 PM, Xinyi Su  wrote:

> Hi,
>
> You are right. The file is created by Kafka and Kafka producer csv reporter
> append metrics continuously during perf test. It should not be created
> continuously during test but create once only.
>
> Another thing is when I turn on "--new-producer" and "--sync" option,
> nothing is appended into  ProducerRequestSize.csv. From source code,  I
> have not got any NewShinyProducer record stats code. It may be the reason.
>
> Thanks.
> Xinyi
>
> On 5 February 2015 at 15:26, Jaikiran Pai 
> wrote:
>
> > >> java.io.IOException: Unable to create /tmp/PerfTopic22_1/
> > ProducerRequestSize.csv
> >
> > It looks like a file with that exact same name already exists which is
> > causing that file creation request to fail. This indicates that probably
> > the metric name (ProducerRequestSize) from which the file is created, is
> > duplicate for whatever reason.
> >
> > -Jaikiran
> >
> >
> > On Thursday 05 February 2015 12:49 PM, Xinyi Su wrote:
> >
> >> Hi,
> >>
> >> I need to get more metrics from csv reporter.  If turn off csv-reporter,
> >> few output is shown.
> >>
> >> Thanks.
> >> Xinyi
> >>
> >> On 5 February 2015 at 13:09, tao xiao  wrote:
> >>
> >>  Hi,
> >>>
> >>> In order to get it work you can turn off csv-reporter.
> >>>
> >>> On Thu, Feb 5, 2015 at 1:06 PM, Xinyi Su  wrote:
> >>>
> >>>  Hi,
> 
>  Today I updated Kafka cluster from 0.8.2-beta to 0.8.2.0 and run kafka
>  producer performance test.
> 
>  The test cannot continue because of some exceptions thrown which does
>  not
>  occur at 0.8.2-beta. My perf library is kafka-perf_2.9.2-0.8.0.jar
> which
> 
> >>> is
> >>>
>  the latest version on maven repository.
> 
>  -bash-4.1$ bin/kafka-producer-perf-test.sh   --broker-list   list>
>  --topics PerfTopic22 --sync --initial-message-id 1 --messages 20
>  --csv-reporter-enabled --metrics-dir /tmp/PerfTopic22_1
>  --message-send-gap-ms 20 --request-num-acks -1 --batch-size 1
> 
>  java.io.IOException: Unable to create
>  /tmp/PerfTopic22_1/ProducerRequestSize.csv
>  at
> 
> 
>   com.yammer.metrics.reporting.CsvReporter.createStreamForMetric(
> >>> CsvReporter.java:141)
> >>>
>  at
> 
> 
>   com.yammer.metrics.reporting.CsvReporter.getPrintStream(
> >>> CsvReporter.java:257)
> >>>
>  at
> 
> >>>
> com.yammer.metrics.reporting.CsvReporter.access$000(CsvReporter.java:22)
> >>>
>  at
> 
>   com.yammer.metrics.reporting.CsvReporter$1.getStream(
> >>> CsvReporter.java:156)
> >>>
>  at
> 
> 
>   com.yammer.metrics.reporting.CsvReporter.processHistogram(
> >>> CsvReporter.java:194)
> >>>
>  at
> 
> 
>   com.yammer.metrics.reporting.CsvReporter.processHistogram(
> >>> CsvReporter.java:22)
> >>>
>  at com.yammer.metrics.core.Histogram.processWith(Histogram.java:231)
>  at com.yammer.metrics.reporting.CsvReporter.run(CsvReporter.java:163)
>  at
> 
> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> >>>
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at
> 
> 
>   java.util.concurrent.ScheduledThreadPoolExecutor$
> >>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
> >>>
>  at
> 
> 
>   java.util.concurrent.ScheduledThreadPoolExecutor$
> >>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >>>
>  at
> 
> 
>   java.util.concurrent.ThreadPoolExecutor.runWorker(
> >>> ThreadPoolExecutor.java:1145)
> >>>
>  at
> 
> 
>   java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >>> ThreadPoolExecutor.java:615)
> >>>
>  at java.lang.Thread.run(Thread.java:745)
> 
> 
> >>>
> >>> --
> >>> Regards,
> >>> Tao
> >>>
> >>>
> >
>


Re: Got ClosedByInterruptException when closing ConsumerConnector

2015-02-09 Thread Guozhang Wang
This exception should be transient (and that is why we captured it as INFO
level log entries) and can be ignored.

We are currently working on the new consumer APIs, and will improve our
logging pattern to avoid such confusing information.

Guozhang

On Mon, Feb 9, 2015 at 5:33 PM, tao xiao  wrote:

> It happens every time I shutdown the connector. It doesn't block the
> shutdown process though
>
> On Tue, Feb 10, 2015 at 1:09 AM, Guozhang Wang  wrote:
>
> > Is this exception transient or consistent and blocking the shutdown
> > process?
> >
> > On Mon, Feb 9, 2015 at 3:07 AM, tao xiao  wrote:
> >
> > > Hi team,
> > >
> > > I got java.nio.channels.ClosedByInterruptException when
> > > closing ConsumerConnector using kafka 0.8.2
> > >
> > > Here is the exception
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [test12345_localhost], ZKConsumerConnector shutting down
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [ConsumerFetcherManager-1423479848796] Stopping leader finder thread
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [test12345_localhost], Shutting down
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [test12345_localhost], Shutdown completed
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [ConsumerFetcherManager-1423479848796] Stopping all fetchers
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [test12345_localhost], Stopped
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [ConsumerFetcherThread-test12345_localhost], Shutting down
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 - Reconnect due
> to
> > > socket error: java.nio.channels.ClosedByInterruptException
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [ConsumerFetcherThread-test12345_localhost], Stopped
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [ConsumerFetcherThread-test12345_localhost], Shutdown completed
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [ConsumerFetcherManager-1423479848796] All connections stopped
> > >
> > > 2015-02-09 19:04:19 INFO  org.I0Itec.zkclient.ZkEventThread:82 -
> > Terminate
> > > ZkClient event thread.
> > >
> > > 2015-02-09 19:04:19 INFO  org.apache.zookeeper.ZooKeeper:684 - Session:
> > > 0x14b6dd8fcf80011 closed
> > >
> > > 2015-02-09 19:04:19 INFO
> > org.apache.zookeeper.ClientCnxn$EventThread:512 -
> > > EventThread shut down
> > >
> > > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > > [test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms
> > >
> > >
> > > --
> > > Regards,
> > > Tao
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> Regards,
> Tao
>



-- 
-- Guozhang


could new java producer miss callbacks after successful send?

2015-02-09 Thread Steven Wu
We observed some small discrepancy in messages sent per second reported at
different points. 1) and 4) matches very close. 2) and 3) matches very
close but are about *5-6% lower* compared to 1) and 4).
1) send attempt from producer
2) send success from producer
3) record-send-rate reported by kafka producer
4) MessagesInPerSecond reported by kafka broker

note that send success for 2) is incremented when onCompletion is called
without error/exception). there is also a send error count when
onCompletion is called with error. it is always zero.

that's why I am wondering whether there are some callback misses?

some info about the setup
* producer: 0.8.2-beta
* broker: 0.8.1.1
* acks=1

Thanks,
Steven


Re: Got ClosedByInterruptException when closing ConsumerConnector

2015-02-09 Thread tao xiao
It happens every time I shutdown the connector. It doesn't block the
shutdown process though

On Tue, Feb 10, 2015 at 1:09 AM, Guozhang Wang  wrote:

> Is this exception transient or consistent and blocking the shutdown
> process?
>
> On Mon, Feb 9, 2015 at 3:07 AM, tao xiao  wrote:
>
> > Hi team,
> >
> > I got java.nio.channels.ClosedByInterruptException when
> > closing ConsumerConnector using kafka 0.8.2
> >
> > Here is the exception
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [test12345_localhost], ZKConsumerConnector shutting down
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [ConsumerFetcherManager-1423479848796] Stopping leader finder thread
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [test12345_localhost], Shutting down
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [test12345_localhost], Shutdown completed
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [ConsumerFetcherManager-1423479848796] Stopping all fetchers
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [test12345_localhost], Stopped
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [ConsumerFetcherThread-test12345_localhost], Shutting down
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 - Reconnect due to
> > socket error: java.nio.channels.ClosedByInterruptException
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [ConsumerFetcherThread-test12345_localhost], Stopped
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [ConsumerFetcherThread-test12345_localhost], Shutdown completed
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [ConsumerFetcherManager-1423479848796] All connections stopped
> >
> > 2015-02-09 19:04:19 INFO  org.I0Itec.zkclient.ZkEventThread:82 -
> Terminate
> > ZkClient event thread.
> >
> > 2015-02-09 19:04:19 INFO  org.apache.zookeeper.ZooKeeper:684 - Session:
> > 0x14b6dd8fcf80011 closed
> >
> > 2015-02-09 19:04:19 INFO
> org.apache.zookeeper.ClientCnxn$EventThread:512 -
> > EventThread shut down
> >
> > 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> > [test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms
> >
> >
> > --
> > Regards,
> > Tao
> >
>
>
>
> --
> -- Guozhang
>



-- 
Regards,
Tao


Unable to produce messages to topic: failed to send producer

2015-02-09 Thread Charlie Davis
Hey everyone,

I’m hoping someone can help me with an issues I’m having. I’ll be using my 
console output so I’m sorry for the console spam. :)

So first, I list my topics:

?  kafka_2.11-0.8.2.0  bin/kafka-topics.sh --list --zookeeper localhost:6002
dog
?  kafka_2.11-0.8.2.0

And I have a topic called “dog.” Next, I do a describe..

?  kafka_2.11-0.8.2.0  bin/kafka-topics.sh --describe --zookeeper localhost:6002
Topic:dog PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dog Partition: 0 Leader: 4 Replicas: 4 Isr: 4
?  kafka_2.11-0.8.2.0

Ok. Cool. Now, here’s where I run into problems, when I actually send a message:

?  kafka_2.11-0.8.2.0  bin/kafka-console-producer.sh --broker-list 
localhost:6001 --topic dog
[2015-02-09 17:14:45,060] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
This is a message so why don't you work?
[2015-02-09 17:14:50,235] WARN Failed to send producer request with correlation 
id 2 to broker 4 with data for partitions [dog,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at 
kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at 
kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
at 
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at 
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at 
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:594)
at 
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-02-09 17:14:50,354] WARN Failed to send producer request with correlation 
id 5 to broker 4 with data for partitions [dog,0] 
(kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at 
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at 
kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)

And my kafka service has a pile of… info messages I guess?

nmskafka_1 | [2015-02-10 01:04:46,704] INFO Closing socket connection to 
/10.0.2.2. (kafka.network.Processor)
nmskafka_1 | [2015-02-10 01:04:46,860] INFO Closing socket connection to 
/10.0.2.2. (kafka.network.Pr

Re: ping kafka server

2015-02-09 Thread Koert Kuipers
a simple nagios check_tcp works fine. as gwen indicated kafka closes the
connection on me, but this is (supposedly) harmless. i see in server logs:
[2015-02-09 19:39:17,069] INFO Closing socket connection to /192.168.1.31.
(kafka.network.Processor)


On Mon, Feb 9, 2015 at 6:06 PM, Scott Clasen  wrote:

> I have used nagios in this manner with kafaka before and worked fine.
>
> On Mon, Feb 9, 2015 at 2:48 PM, Koert Kuipers  wrote:
>
> > i would like to be able to ping kafka servers from nagios to confirm they
> > are alive. since kafka servers dont run a http server (web ui) i am not
> > sure how to do this.
> >
> > is it safe to establish a "test" tcp connection (so connect and
> immediately
> > disconnect using telnet or netstat or something like that) to the kafka
> > server on port 9092 to confirm its alive?
> >
> > thanks
> >
>


Re: Current vote - 0.8.2.0-RC1 or 0.8.2.0?

2015-02-09 Thread Stevo Slavić
Thanks for heads up!

Please consider updating versions in JIRA - 0.8.2 --> 0.8.2.0, and labeling
0.8.2.0 as released.

Kind regards,
Stevo Slavic.

On Wed, Jan 14, 2015 at 6:54 PM, Jun Rao  wrote:

> About the versioning, we had released 0.8.1 and 0.8.1.1 before, which is a
> bit inconsistent in terms of versioning format. So picking 0.8.2.0 is
> intended to fix that inconsistency.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 14, 2015 at 9:12 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > Is currently active vote for 0.8.2.0-RC1 or 0.8.2.0?
> >
> > If the vote is for 0.8.2.0-RC1 why isn't that reflected in artifact
> > metadata? Version should be 0.8.2.0-RC1, 0.8.2-RC1 or something similar
> > (0.8.2 beta release had "-beta" and no ".0" suffix - see
> > http://repo1.maven.org/maven2/org/apache/kafka/kafka_2.10/0.8.2-beta/ )
> > If it stays like this, and final gets released later with same 0.8.2.0
> > version, but different content - repositories, both local and remote will
> > get polluted with junk.
> >
> > If the vote is for 0.8.2.0 final GA release, why call the vote candidate
> 1?
> >
> > Also, version related - none of the previous 0.8.x releases had ".0"
> > release i.e. 0.8.x.0. Is this change in version numbering intentional?
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Need sample/example of updating offset in SImple Consumer

2015-02-09 Thread Christopher Piggott
Hi,

Can somebody provide me with an example of how to formulate an
OffsetCommitRequest for a single stream/partition using SimpleConsumer from
java?

Both ends, really ... periodically issuing commits, but also how to get the
current offset when starting up.


I can show what I'm attempting ... but failing to connect the objects and
constructors:


TopicAndPartition key = new TopicAndPartition(topic, shardNum);
OffsetMetadataAndError value = new OffsetMetadataAndError(offset); /* ??? */

Map map =
Collections.singletonMap(key, value);

OffsetCommitRequest request = new OffsetCommitRequest(
groupId,
map,
kafka.api.OffsetCommitRequest.CurrentVersion(),
0, /* what do I do with this - correlation id? */
clientName);


Re: ping kafka server

2015-02-09 Thread Scott Clasen
I have used nagios in this manner with kafaka before and worked fine.

On Mon, Feb 9, 2015 at 2:48 PM, Koert Kuipers  wrote:

> i would like to be able to ping kafka servers from nagios to confirm they
> are alive. since kafka servers dont run a http server (web ui) i am not
> sure how to do this.
>
> is it safe to establish a "test" tcp connection (so connect and immediately
> disconnect using telnet or netstat or something like that) to the kafka
> server on port 9092 to confirm its alive?
>
> thanks
>


Re: ping kafka server

2015-02-09 Thread todd
Hi gwen,

Can you share how you do these end to end latency tests? I am more sysadmin 
than coder and have wanted to get something like that going for my kafka 
clusters. I'd love more details about how you do it, and how you monitor the 
results.

Thanks!

Sent from my BlackBerry 10 smartphone on the TELUS network.
  Original Message  
From: Gwen Shapira
Sent: Monday, February 9, 2015 5:55 PM
To: users@kafka.apache.org
Reply To: users@kafka.apache.org
Subject: Re: ping kafka server

It's safe.

Just note that if you send Kafka anything it does not like, it will close
the connection on you. This is intentional and doesn't signal an issue with
Kafka.

Not sure if Nagios does this, but I like "canary" tests - produce a message
with timestamp every X seconds and have a monitor that consumes the
messages and check timestamps. This way you get both end-to-end monitoring
and latency alerts. What could be more fun? :)

Gwen

On Mon, Feb 9, 2015 at 2:48 PM, Koert Kuipers  wrote:

> i would like to be able to ping kafka servers from nagios to confirm they
> are alive. since kafka servers dont run a http server (web ui) i am not
> sure how to do this.
>
> is it safe to establish a "test" tcp connection (so connect and immediately
> disconnect using telnet or netstat or something like that) to the kafka
> server on port 9092 to confirm its alive?
>
> thanks
>


Re: ping kafka server

2015-02-09 Thread Gwen Shapira
It's safe.

Just note that if you send Kafka anything it does not like, it will close
the connection on you. This is intentional and doesn't signal an issue with
Kafka.

Not sure if Nagios does this, but I like "canary" tests - produce a message
with timestamp every X seconds and have a monitor that consumes the
messages and check timestamps. This way you get both end-to-end monitoring
and latency alerts. What could be more fun? :)

Gwen

On Mon, Feb 9, 2015 at 2:48 PM, Koert Kuipers  wrote:

> i would like to be able to ping kafka servers from nagios to confirm they
> are alive. since kafka servers dont run a http server (web ui) i am not
> sure how to do this.
>
> is it safe to establish a "test" tcp connection (so connect and immediately
> disconnect using telnet or netstat or something like that) to the kafka
> server on port 9092 to confirm its alive?
>
> thanks
>


ping kafka server

2015-02-09 Thread Koert Kuipers
i would like to be able to ping kafka servers from nagios to confirm they
are alive. since kafka servers dont run a http server (web ui) i am not
sure how to do this.

is it safe to establish a "test" tcp connection (so connect and immediately
disconnect using telnet or netstat or something like that) to the kafka
server on port 9092 to confirm its alive?

thanks


Kafka-Consumer-perf.test.sh -- how to check if it is running

2015-02-09 Thread nitin sharma
Hi All,

I am running kafka-consumer-perf.test.sh in my test envrionment to simulate
"consumer" load on my kafka broker.

Currently i have millions of entries in the log on which this shell is
running...  I would like to know how can if check if the .sh is running
fine..

All i see is the below  entry in putty terminal after executing the
Consumer .sh and it has been more than 2 hours.

bin/kafka-consumer-perf-test.sh --zookeeper <>:2181 --messages 417600
 --topic <<< >> --thread 2

start.time,end.time,fetch.size, data.consumed.in.MB, MB.sec,
data.consumed.in.nMsg, nMsg.sec



Regards,
Nitin Kumar Sharma.


Kafka New(Java) Producer Connection reset by peer error and LB

2015-02-09 Thread Bhavesh Mistry
Hi Kakfa Team,

We are getting this connection reset by pears after couple of minute aster
start-up of producer due to infrastructure deployment strategies we have
adopted from LinkedIn.

We have LB hostname and port as seed server, and all producers are getting
following exception because of TCP idle connection timeout set on LB (which
is 2 minutes and Kafka TCP connection idle is set to 10 minutes).   This
seems to be  minor bug to close TCP connection after discovering that seed
server is not part of brokers list immediately.


java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:662)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
at
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
at org.apache.kafka.common.network.Selector.poll(Selector.java:242)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:662)


Thanks,

Bhavesh


Re: Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner

2015-02-09 Thread Zijing Guo
Anyone would like to give some help? Can't send a keyedMessage to brokers with 
partitioner.class=kafka.producer.DefaultPartitioner

|   |
|   |  |   |   |   |   |   |
| Can't send a keyedMessage to brokers with partitioner.cl...I have a 2 nodes 
kafka cluster with 2 instances of brokers and zookeepers. And then I create a 
topic kafka-test with 2 partitions and replication-factor =2. My prod... |
|  |
| View on stackoverflow.com | Preview by Yahoo |
|  |
|   |

   

 On Sunday, February 8, 2015 4:18 AM, Zijing Guo 
 wrote:
   

 Hi,I have a 2 nodes kafka cluster with 2 instances of brokers and zookeepers. 
And then I create a topic kafka-test with 2 partitions and replication-factor 
=2. My producer config is:                      {"partitioner.class" 
"kafka.producer.DefaultPartitioner"                      "metadata.broker.list" 
"172.32.1.248:9092,172.32.1.251:9092"                      
"request.required.acks" "1"}
So for the DefaultPartitoner, it will calculate and hashvalue and divide by the 
num_partiton to decide which partition the data it will go, so I create my 
keyedMessageval key-msg = KeyedMessage("kafka-test","a","test 
message!")prod.send(key-msg)
"a"'s hashValue is 97 and 97 % 2 = 1, so the data should go to partition1. 
However, the data did't get send to the brokers (I have a console consumer 
running that didn't receive any message from this topic). If I create the 
key-msg without the key, it works fine
val key-msg = KeyedMessage("kafka-test","test message!")prod.send(key-msg)
Am I using the key wrong or anything?ThanksEdwin

   

Re: Poor performance consuming multiple topics

2015-02-09 Thread CJ Woolard
Thank you for your help, and I apologize for not adding sufficient detail in my 
original question. To elaborate on our use case we are trying to create a 
system tracing/monitoring app (which is of high importance to our business) 
where we are trying to read messages from all of our Kafka topics. We've been 
profiling a handful of runs with varying permutations of Kafka config settings, 
and are struggling to find a combination that gives us decent throughput. We're 
specifically trying to have one application consume from ~20 topics (which may 
increase over time), some of the topics have several million messages while 
some are currently empty. The behavior we're seeing (regardless of the various 
config settings we've tried) is that the consumer will take about 5 to 10 
minutes creating it's streams, after which point it appears to pull around 
~20,000 messages at a decent rate, and then it starts to throw consumer timeout 
exceptions for several minutes. (Depending on the settings we choose it then 
may repeat that loop where it again will pull a batch of messages at a decent 
rate and then stall again on more timeouts). Our max message size is 20MB, so 
we set our "fetch.message.max.bytes" to 20MB. We then set our 
"consumer.timeout.ms" to "6", so that reading the 20MB doesn't timeout, 
however that appears to block threads for those topics which are empty (If we 
set it too high it appears to block threads on the empty topics for a long 
time, if we set it too low it times out reading large messages). We've tried 
increasing the number of streams (to 20 for example), tried increasing the 
number of thread pool consumers (to 20 for example) and tried increasing the 
number of consumer fetchers (to 4 for example, although in profiling we appear 
to get x2 the number of threads that we specify in config for what it's worth), 
but have yet to find a combination of settings that works for us. Again any 
direction here would be greatly appreciated. 

Here is an example of a consumer config we've tried (again we've tried several 
combinations in testing):

 "group.id" -> Settings.Kafka.ConsumerGroupId,
 "zookeeper.connect" -> Settings.Kafka.ZkConnectionString,
"num.consumer.fetchers" -> "4",
 "consumer.timeout.ms" -> "6",
 "auto.offset.reset" -> "smallest",
 "fetch.message.max.bytes" -> "2000"

In terms of our code we've tried both the whitelist overload:

val numberOfStreams = 4// (we've varied this number)
val filter = topics.mkString("|")
val topicFilter = new Whitelist(filter)
connector.createMessageStreamsByFilter(topicFilter, numberOfStreams, decoder, 
decoder)

And the topic map overload:

private def createMapOfStreams(topics:Seq[String], numberOfPartitions:Int) = {
val connector = createKafkaConnector()
val decoder = new StringDecoder()
val topicsMap = topics.map(topic=>topic->numberOfPartitions).toMap
connector.createMessageStreams(topicsMap, decoder, decoder)
  }

And our broker settings:

# Socket Server Settings 
#

# The port the socket server listens on
port=9092

# The number of threads handling network requests
num.network.threads=2
 
# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection 
against OOM)
socket.request.max.bytes=104857600


# Hostname the broker will bind to. If not set, the server will bind to all 
interfaces
#host.name=kafka-1

# Hostname the broker will advertise to producers and consumers. If not set, it 
uses the
# value for "host.name" if configured.  Otherwise, it will use the value 
returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=kafka-1.qa.ciq-internal.net

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
advertised.port=9092


Thank you again for your help.
CJ







From: Guozhang Wang 
Sent: Monday, February 9, 2015 10:38 AM
To: users@kafka.apache.org
Subject: Re: Poor performance consuming multiple topics

Hello CJ,

You have to set the fetch size to be >=  the maximum message size possible,
otherwise the consumption will block upon encountering these large messages.

I am wondering by saying "poor performance" what do you mean exactly? Are
you seeing low throughput, and can you share your consumer config values?

Guozhang


On Sun, Feb 8, 2015 at 7:39 AM, Cj  wrote:

>
>
> Hi Kafka team,
>
> We have a use case where we need to consume from ~20 topics (each with 24
> partitions), we have a potenti

Re: Handling multi-line messages?

2015-02-09 Thread Scott Chapman
Yea, I think I figured it out. Didn't realize the person doing the test
created the message using the console-consumer, so I think the newline was
escaped.

On Mon Feb 09 2015 at 11:59:57 AM Gwen Shapira 
wrote:

> Since the console-consumer seems to display strings correctly, it sounds
> like an issue with LogStash parser. Perhaps you'll have better luck asking
> on LogStash mailing list?
>
> Kafka just stores the bytes you put in and gives the same bytes out when
> you read messages. There's no parsing or encoding done in Kafka itself
> (other than the encoder/decoder you use in producer / consumer)
>
> Gwen
>
> On Mon, Feb 9, 2015 at 6:23 AM, Scott Chapman 
> wrote:
>
> > So, avoiding a bit of a long explanation on why I'm doing it this way...
> >
> > But essentially, I am trying to put multi-line messages into kafka and
> then
> > parse them in logstash.
> >
> > What I think I am seeing in kafka (using console-consumer) is this:
> >  "line 1 \nline 2 \nline 3\n"
> >
> > Then when I get it into logstash I am seeing it as:
> >{
> > "message" => "line 1 \\nline 2 \\nline \n",
> > "@version" => "1",
> > "@timestamp" => "2015-02-09T13:55:36.566Z",
> >   }
> >
> > My question is, is this what I should expect? I think I can probably
> figure
> > out take the single line and break it apart in logstash. But do I need
> to?
> >
> > Any thoughts?
> >
>


Re: Got ClosedByInterruptException when closing ConsumerConnector

2015-02-09 Thread Guozhang Wang
Is this exception transient or consistent and blocking the shutdown process?

On Mon, Feb 9, 2015 at 3:07 AM, tao xiao  wrote:

> Hi team,
>
> I got java.nio.channels.ClosedByInterruptException when
> closing ConsumerConnector using kafka 0.8.2
>
> Here is the exception
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [test12345_localhost], ZKConsumerConnector shutting down
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [ConsumerFetcherManager-1423479848796] Stopping leader finder thread
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [test12345_localhost], Shutting down
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [test12345_localhost], Shutdown completed
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [ConsumerFetcherManager-1423479848796] Stopping all fetchers
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [test12345_localhost], Stopped
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [ConsumerFetcherThread-test12345_localhost], Shutting down
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 - Reconnect due to
> socket error: java.nio.channels.ClosedByInterruptException
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [ConsumerFetcherThread-test12345_localhost], Stopped
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [ConsumerFetcherThread-test12345_localhost], Shutdown completed
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [ConsumerFetcherManager-1423479848796] All connections stopped
>
> 2015-02-09 19:04:19 INFO  org.I0Itec.zkclient.ZkEventThread:82 - Terminate
> ZkClient event thread.
>
> 2015-02-09 19:04:19 INFO  org.apache.zookeeper.ZooKeeper:684 - Session:
> 0x14b6dd8fcf80011 closed
>
> 2015-02-09 19:04:19 INFO  org.apache.zookeeper.ClientCnxn$EventThread:512 -
> EventThread shut down
>
> 2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
> [test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms
>
>
> --
> Regards,
> Tao
>



-- 
-- Guozhang


Re: Handling multi-line messages?

2015-02-09 Thread Gwen Shapira
Since the console-consumer seems to display strings correctly, it sounds
like an issue with LogStash parser. Perhaps you'll have better luck asking
on LogStash mailing list?

Kafka just stores the bytes you put in and gives the same bytes out when
you read messages. There's no parsing or encoding done in Kafka itself
(other than the encoder/decoder you use in producer / consumer)

Gwen

On Mon, Feb 9, 2015 at 6:23 AM, Scott Chapman  wrote:

> So, avoiding a bit of a long explanation on why I'm doing it this way...
>
> But essentially, I am trying to put multi-line messages into kafka and then
> parse them in logstash.
>
> What I think I am seeing in kafka (using console-consumer) is this:
>  "line 1 \nline 2 \nline 3\n"
>
> Then when I get it into logstash I am seeing it as:
>{
> "message" => "line 1 \\nline 2 \\nline \n",
> "@version" => "1",
> "@timestamp" => "2015-02-09T13:55:36.566Z",
>   }
>
> My question is, is this what I should expect? I think I can probably figure
> out take the single line and break it apart in logstash. But do I need to?
>
> Any thoughts?
>


Re: Poor performance consuming multiple topics

2015-02-09 Thread Guozhang Wang
Hello CJ,

You have to set the fetch size to be >=  the maximum message size possible,
otherwise the consumption will block upon encountering these large messages.

I am wondering by saying "poor performance" what do you mean exactly? Are
you seeing low throughput, and can you share your consumer config values?

Guozhang


On Sun, Feb 8, 2015 at 7:39 AM, Cj  wrote:

>
>
> Hi Kafka team,
>
> We have a use case where we need to consume from ~20 topics (each with 24
> partitions), we have a potential max message size of 20MB so we've set our
> consumer fetch.size to 20MB but that's causing very poor performance on our
> consumer (most of our messages are in the 10-100k range). Is it possible to
> set the fetch size to a lower number than the max message size and
> gracefully handle larger messages (as a trapped exception for example) in
> order to improve our throughput?
>
> Thank you in advance for your help
> CJ Woolard




-- 
-- Guozhang


Re: New Producer - ONLY sync mode?

2015-02-09 Thread Steve Morin
Jay,
Thanks I'll look at that more closely.

On Sat, Feb 7, 2015 at 1:23 PM, Jay Kreps  wrote:

> Steve
>
> In terms of mimicing the sync behavior, I think that is what .get() does,
> no?
>
> We are always returning the offset and error information. The example I
> gave didn't make use of it, but you definitely can make use of it if you
> want to.
>
> -Jay
>
> On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin  wrote:
>
> > Looking at this thread I would ideally want something at least the right
> > recipe to mimic sync behavior like Otis is talking about.
> >
> > In the second case, would like to be able to individually know if
> messages
> > have failed even regardless if they are in separate batches, sort of like
> > what Kinesis does as Pradeep mentioned.
> > -Steve
> >
> > On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps  wrote:
> >
> > > Yeah totally. Using a callback is, of course, the Right Thing for this
> > kind
> > > of stuff. But I have found that kind of asynchronous thinking can be
> hard
> > > for people. Even if you get out of the pre-java 8 syntactic pain that
> > > anonymous inner classes inflict just dealing with multiple threads of
> > > control without creating async spaghetti can be a challenge for complex
> > > stuff. That is really the only reason for the futures in the api, they
> > are
> > > strictly less powerful than the callbacks, but at least using them you
> > can
> > > just call .get() and pretend it is blocking.
> > >
> > > -Jay
> > >
> > > On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein 
> wrote:
> > >
> > > > Now that 0.8.2.0 is in the wild I look forward to working with more
> and
> > > > seeing what folks start to-do with this function
> > > >
> > > >
> > >
> >
> https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
> > > > ,
> > > > org.apache.kafka.clients.producer.Callback) and keeping it fully non
> > > > blocking.
> > > >
> > > > One sprint I know of coming up is going to have the new producer as a
> > > > component in their reactive calls and handling bookkeeping and
> retries
> > > > through that type of call back approach. Should work well (haven't
> > tried
> > > > but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
> > etc
> > > in
> > > > functional languages and frameworks.
> > > >
> > > > I think as JDK 8 starts to get out in the wild too more (may after
> jdk7
> > > > eol) the use of .get will be reduced (imho) and folks will be
> thinking
> > > more
> > > > about non-blocking vs blocking and not as so much sync vs async but
> my
> > > > crystal ball just back from the shop so well see =8^)
> > > >
> > > > /***
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop 
> > > > /
> > > >
> > > > On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps 
> > wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > I guess the question is whether it really matters how many
> underlying
> > > > > network requests occur? It is very hard for an application to
> depend
> > on
> > > > > this even in the old producer since it depends on the partitions
> > > > placement
> > > > > (a send to two partitions may go to either one machine or two and
> so
> > it
> > > > > will send either one or two requests). So when you send a batch in
> > one
> > > > call
> > > > > you may feel that is "all at once", but that is only actually
> > > guaranteed
> > > > if
> > > > > all messages have the same partition.
> > > > >
> > > > > The challenge is allowing even this in the presence of bounded
> > request
> > > > > sizes which we have in the new producer. The user sends a list of
> > > objects
> > > > > and the serialized size that will result is not very apparent to
> > them.
> > > If
> > > > > you break it up into multiple requests then that is kind of further
> > > > ruining
> > > > > the illusion of a single send. If you don't then you have to just
> > error
> > > > out
> > > > > which is equally annoying to have to handle.
> > > > >
> > > > > But I'm not sure if from your description you are saying you
> actually
> > > > care
> > > > > how many physical requests are issued. I think it is more like it
> is
> > > just
> > > > > syntactically annoying to send a batch of data now because it
> needs a
> > > for
> > > > > loop.
> > > > >
> > > > > Currently to do this you would do:
> > > > >
> > > > > List responses = new ArrayList();
> > > > > for(input: recordBatch)
> > > > > responses.add(producer.send(input));
> > > > > for(response: responses)
> > > > > response.get
> > > > >
> > > > > If you don't depend on the offset/error info we could add a flush
> > call
> > > so
> > > > > you could instead do
> > > > > for(input: recordBatch)
> > > > >  

Re: Kafka on ec2

2015-02-09 Thread Su She
If you mean setting up Kafka on ec2:
https://www.youtube.com/watch?v=ArUHr3Czx-8

the commands may differ depending on which type of ec2 instance you are
using.

Also: http://kafka.apache.org/documentation.html#introduction


On Mon, Feb 9, 2015 at 4:53 AM, Sharath N  wrote:

> Hi, any one please help me how to integrate kafka and ec2..
>
>
> Thanks and Regards
>
> Sharath N
>


Kafka on ec2

2015-02-09 Thread Sharath N
Hi, any one please help me how to integrate kafka and ec2..


Thanks and Regards

Sharath N


Handling multi-line messages?

2015-02-09 Thread Scott Chapman
So, avoiding a bit of a long explanation on why I'm doing it this way...

But essentially, I am trying to put multi-line messages into kafka and then
parse them in logstash.

What I think I am seeing in kafka (using console-consumer) is this:
 "line 1 \nline 2 \nline 3\n"

Then when I get it into logstash I am seeing it as:
   {
"message" => "line 1 \\nline 2 \\nline \n",
"@version" => "1",
"@timestamp" => "2015-02-09T13:55:36.566Z",
  }

My question is, is this what I should expect? I think I can probably figure
out take the single line and break it apart in logstash. But do I need to?

Any thoughts?


Re: Is auto.commit.enable still applicable when setting offsets.storage to kafka

2015-02-09 Thread Gwen Shapira
Yep, still applicable.

They will do the same thing (commit offset on regular intervals) only with
Kafka instead of Zookeeper.

On Mon, Feb 9, 2015 at 2:57 AM, tao xiao  wrote:

> Hi team,
>
> If I set offsets.storage=kafka can I still use auto.commit.enable to turn
> off auto commit and auto.commit.interval.ms to control commit interval ?
> As
> the documentation mentions that the above two properties are used to
> control offset to zookeeper.
>
> --
> Regards,
> Tao
>


Re: Apache Kafka 0.8.2 Consumer Example

2015-02-09 Thread Gwen Shapira
I'm guessing the upgrade changed your broker configuration file
(server.properties).

Perhaps take a look and see if things like max.message.bytes are still
where you want them?

Gwen

On Sun, Feb 8, 2015 at 11:24 AM, Ricardo Ferreira <
jricardoferre...@gmail.com> wrote:

> Hi Gwen,
>
> Sorry, both the consumer and the broker are 0.8.2?
> A = Yes
>
> So what's on 0.8.1?
> A = It works fine using 0.8.1 for server AND client.
>
> You probably know the consumer group of your application. Can you use the
> offset checker tool on that?
> A = Yes, I know from the consumer, and the offset checker gave me nothing
> about that group.
>
> Thanks,
>
> Ricardo
>
> On Sun, Feb 8, 2015 at 1:19 PM, Gwen Shapira 
> wrote:
>
>> Sorry, both the consumer and the broker are 0.8.2?
>>
>> So what's on 0.8.1?
>>
>> I seriously doubt downgrading is the solution.
>>
>> You probably know the consumer group of your application. Can you use the
>> offset checker tool on that?
>>
>> Gwen
>>
>> On Sun, Feb 8, 2015 at 9:01 AM, Ricardo Ferreira <
>> jricardoferre...@gmail.com> wrote:
>>
>>> Hi Gwen,
>>>
>>> Thanks for the response.
>>>
>>> In my case, I have both consumer application and the server versions in
>>> 0.8.2, Scala 2.10.
>>>
>>> No errors are thrown, and my *zookeeper.session.timeout.ms
>>> * property is set to 500, although
>>> I tried 5000 and also didn't worked.
>>>
>>> I checked the offset checker tool, but it asks for a group in which I
>>> don't know which group the kafka-console-producer is using. I tried the
>>> consumer group but it gave the following message:
>>>
>>> Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
>>> KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/test/0.
>>>
>>> Perhaps the solution is downgrade the consumer libs to 0.8.1?
>>>
>>> Thanks,
>>>
>>> Ricardo
>>>
>>> On Sun, Feb 8, 2015 at 11:27 AM, Gwen Shapira 
>>> wrote:
>>>
 I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few
 of them actually :)
 AFAIK the API did not change.

 Do you see any error messages? Do you have timeout configured on the
 consumer? What does the offset checker tool say?

 On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira <
 jricardoferre...@gmail.com> wrote:

> Hi,
>
> I had a client running on Kafka 0.8.1 using the High-Level consumer API
> working fine.
>
> Today I updated my Kafka installation for the 0.8.2 version (tried all
> versions of Scala) but the consumer doesn't get any messages. I tested
> using the kafka-console-consumer.sh utility tool and works fine, only
> my
> Java program that not.
>
> Did I miss something? I heard that the API changed, so I'd like to
> know if
> someone can share a simple client with me.
>
> Please, respond directly to me or just reply all because I am not
> currently
> subscribed to the group.
>
> Thanks,
>
> Ricardo Ferreira
>


>>>
>>
>


Re: Poor performance consuming multiple topics

2015-02-09 Thread dinesh kumar
Hi CJ,
I recently ran into some kafka message size related issue and did some
digging around to understand the system. I will put those details in brief
and hope it will help you.

Each consumer connector has fetcher threads and fetcher manager threads
associated with it. The Fetcher thread talks to the Kafka brokers  and get
the data for the consumer. The fetcher thread get the partition information
after the re-balance operation. So say each consumer owns N partitions in a
topic, and M (M< N) partitions are in Broker i (Broker i is the leader of
these partitions) , the fetcher thread sends a request to Broker i for the
data. Kafka Protocol is in designed such that the maximum amount of data
transferred to one client in a single request should be less that 2GB (2GB
also includes the protocol overhead but they are only a few bytes and can
be ignored for now).

The data requested by fetcher thread is in unit of chunks per partition.
Each chunk is of the size of *fetch.message.max.bytes* a parameter in the
consumer configuration. Each chunk can have many messages in them. Also if
there is a very large message of say 200 MB that needs to be consumed, then
the fetch.message.max.bytes should be at least 200MB as in-complete
messages are not allowed (ie., one large message cannot be broken into
multiple pieces and transferred to the client)


The request for data made by the fetcher threads are in chunks of
fetch.message.max.bytes
and since they are per partition, it is very easy to run into a situation
where the total amount of data requested by the fetcher thread crosses 2GB.
This results in a situation where the consumer gets no data from the broker.

The data transferred from the broker is put in a Blocking Queue
.
The consumer thread will be blocked on the queue till the fetcher thread
puts some data in the queue. If you specify a timeout (consumer.timeout.ms)
during initialization of the client,  the consumer thread will wait for a
maximum of  consumer.timeout.ms for the data and will throw a Timeout
Exception. If consumer.timeout.ms is -1 (default value) then the consumer
thread will be blocked till the fetcher queues some data.

Kafka supports compression. Compression happens in the producer end and it
is decompressed by the consumer. The decompression happens only when the
message is processed by the consumer thread and not while getting added to
the Blocking queue (ie., decompression is done by the consumer thread and
not by fetcher thread).

So the *fetch.message.max.bytes *should be the maximum message size after
compression. So to circumvent the limitation in Kafka protocol of 2GB per
request, we can use kafka compression.


So to summarize and to answer your question, there is no way to get a large
message with a small *fetch.message.max.bytes.*


Thanks,

Dinesh

On 8 February 2015 at 21:09, Cj  wrote:

>
>
> Hi Kafka team,
>
> We have a use case where we need to consume from ~20 topics (each with 24
> partitions), we have a potential max message size of 20MB so we've set our
> consumer fetch.size to 20MB but that's causing very poor performance on our
> consumer (most of our messages are in the 10-100k range). Is it possible to
> set the fetch size to a lower number than the max message size and
> gracefully handle larger messages (as a trapped exception for example) in
> order to improve our throughput?
>
> Thank you in advance for your help
> CJ Woolard


Got ClosedByInterruptException when closing ConsumerConnector

2015-02-09 Thread tao xiao
Hi team,

I got java.nio.channels.ClosedByInterruptException when
closing ConsumerConnector using kafka 0.8.2

Here is the exception

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], ZKConsumerConnector shutting down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherManager-1423479848796] Stopping leader finder thread

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], Shutting down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], Shutdown completed

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherManager-1423479848796] Stopping all fetchers

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], Stopped

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherThread-test12345_localhost], Shutting down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 - Reconnect due to
socket error: java.nio.channels.ClosedByInterruptException

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherThread-test12345_localhost], Stopped

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherThread-test12345_localhost], Shutdown completed

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[ConsumerFetcherManager-1423479848796] All connections stopped

2015-02-09 19:04:19 INFO  org.I0Itec.zkclient.ZkEventThread:82 - Terminate
ZkClient event thread.

2015-02-09 19:04:19 INFO  org.apache.zookeeper.ZooKeeper:684 - Session:
0x14b6dd8fcf80011 closed

2015-02-09 19:04:19 INFO  org.apache.zookeeper.ClientCnxn$EventThread:512 -
EventThread shut down

2015-02-09 19:04:19 INFO  kafka.utils.Logging$class:68 -
[test12345_localhost], ZKConsumerConnector shutdown completed in 86 ms


-- 
Regards,
Tao


Is auto.commit.enable still applicable when setting offsets.storage to kafka

2015-02-09 Thread tao xiao
Hi team,

If I set offsets.storage=kafka can I still use auto.commit.enable to turn
off auto commit and auto.commit.interval.ms to control commit interval ? As
the documentation mentions that the above two properties are used to
control offset to zookeeper.

-- 
Regards,
Tao


Re: Topic migration mentionning one removed server

2015-02-09 Thread Anthony Pastor
Any help on this subject please ?

2015-02-05 10:45 GMT+01:00 Anthony Pastor :

> We're using Kafka 0.8.1.1 on debian 7.7
>
> - Logs when i migrate a specific topic (~20GB) from kafka5 to kafka2 (No
> problem that way):
> - controller.log: No logs.
>
> - Logs when i migrate the same specific topic from kafka2 to kafka5 (same
> problems as my original mail):
> - controller.log:
>
> [2015-02-05 10:40:18,693] DEBUG [PartitionsReassignedListener on 1]:
> Partitions reassigned listener fired for path /admin/reassign_partitions.
> Record partitions to be reassigned
> {"version":1,"partitions":[{"topic":"AdBidderZoneDealDailyKey","partition":0,"replicas":[4,5]}]}
> (kafka.controller.PartitionsReassignedListener)
> [2015-02-05 10:40:18,695] INFO [Controller 1]: Handling reassignment of
> partition [AdBidderZoneDealDailyKey,0] to new replicas 4,5
> (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,703] INFO [Controller 1]: New replicas 4,5 for
> partition [AdBidderZoneDealDailyKey,0] being reassigned not yet caught up
> with the leader (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,710] DEBUG [Controller 1]: Updated path
> /brokers/topics/AdBidderZoneDealDailyKey with
> {"version":1,"partitions":{"0":[4,5,2]}} for replica assignment
> (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,710] INFO [Controller 1]: Updated assigned replicas
> for partition [AdBidderZoneDealDailyKey,0] being reassigned to 4,5,2
> (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,710] DEBUG [Controller 1]: Updating leader epoch for
> partition [AdBidderZoneDealDailyKey,0]. (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,718] INFO [Controller 1]: Updated leader epoch for
> partition [AdBidderZoneDealDailyKey,0] to 37
> (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,719] INFO [Replica state machine on controller 1]:
> Invoking state change to NewReplica for replicas
> [Topic=AdBidderZoneDealDailyKey,Partition=0,Replica=5]
> (kafka.controller.ReplicaStateMachine)
> [2015-02-05 10:40:18,721] INFO [Controller 1]: Waiting for new replicas
> 4,5 for partition [AdBidderZoneDealDailyKey,0] being reassigned to catch up
> with the leader (kafka.controller.KafkaController)
> [2015-02-05 10:40:18,721] INFO [AddPartitionsListener on 1]: Add Partition
> triggered {"version":1,"partitions":{"0":[4,5,2]}} for path
> /brokers/topics/AdBidderZoneDealDailyKey
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
> [2015-02-05 10:40:18,723] DEBUG [ReassignedPartitionsIsrChangeListener on
> controller 1]: Reassigned partitions isr change listener fired for path
> /brokers/topics/AdBidderZoneDealDailyKey/partitions/0/state with children
> {"controller_epoch":5,"leader":4,"version":1,"leader_epoch":37,"isr":[4,2]}
> (kafka.controller.ReassignedPartitionsIsrChangeListener)
> [2015-02-05 10:40:18,724] INFO [ReassignedPartitionsIsrChangeListener on
> controller 1]: 1/2 replicas have caught up with the leader for partition
> [AdBidderZoneDealDailyKey,0] being reassigned.Replica(s) 5 still need to
> catch up (kafka.controller.ReassignedPartitionsIsrChangeListener)
> [2015-02-05 10:40:18,852] INFO [AddPartitionsListener on 1]: Add Partition
> triggered {"version":1,"partitions":{"0":[4,5,2,7]}} for path
> /brokers/topics/AdBidderZoneDealDailyKey
> (kafka.controller.PartitionStateMachine$AddPartitionsListener)
>
> 2015-02-05 6:29 GMT+01:00 Jun Rao :
>
>> Which version of Kafka are you using? Anything interesting from the
>> controller log?
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Feb 4, 2015 at 8:19 AM, Anthony Pastor 
>> wrote:
>>
>> > Hello,
>> >
>> > I'm trying to understand an awkward issue we're having on our Kafka
>> > production.
>> > We currently have 8 Kafka servers named:
>> > kafka1, kafka2, kafka3 ...
>> >
>> > Few weeks ago we migrated all topics present on kafka7 and then have
>> > shutdown it.
>> >
>> > The main issue is a performance issue on Kafka5.
>> > Topic migration are very slow.
>> >
>> > Today i tried to do a topic migration from kafka2 to kafka5.
>> > The awkward part is that we've kafka7 references like if it's on
>> > replication on this server :
>> >
>> > Here are my commands & comments:
>> >
>> > - Original state of the topic was :
>> >
>> > $ /usr/share/kafka/bin/kafka-topics.sh --zookeeper 172.16.22.1
>> --describe
>> > --topic AdBidderZoneDealDailyKey
>> > Topic:AdBidderZoneDealDailyKeyPartitionCount:1
>> > ReplicationFactor:2Configs:retention.ms=60480
>> > Topic: AdBidderZoneDealDailyKeyPartition: 0Leader: 4
>> > Replicas: 4,2Isr: 4,2
>> >
>> >
>> > - Then i ran a topic migration :
>> >
>> > $ /usr/share/kafka/bin/kafka-reassign-partitions.sh --zookeeper
>> > 172.16.22.1 --reassignment-json-file target-zone-deal-daily.json
>> --execute
>> >
>> > $ cat target-zone-deal-daily.json
>> >
>> >
>> {"version":1,"partitions":[{"topic":"AdBidderZoneDealDailyKey","partition":0,"replicas":[4,5]}]}
>> >
>> >
>> > - Now i've two issues : the replicatio

Re: regarding custom msg

2015-02-09 Thread Manikumar Reddy
Can you post the exception stack-trace?

On Mon, Feb 9, 2015 at 2:58 PM, Gaurav Agarwal 
wrote:

> hello
> We are sending custom message across producer and consumer. But
> getting class cast exception . This is working fine with String
> message and string encoder.
> But this did not work with custom message , i got class cast
> exception. I have a message with couple of String attributes
>


regarding custom msg

2015-02-09 Thread Gaurav Agarwal
hello
We are sending custom message across producer and consumer. But
getting class cast exception . This is working fine with String
message and string encoder.
But this did not work with custom message , i got class cast
exception. I have a message with couple of String attributes


Re: Apache Kafka 0.8.2 Consumer Example

2015-02-09 Thread Achanta Vamsi Subhash
High level consumer of 0.8.1 works fine with 0.8.2. In extra, you can
change the config to use kafka for offsets storage instead of zookeeper.
There are some extra config parameters added as well as explained in the
wiki.
http://kafka.apache.org/documentation.html#consumerconfigs

For low-level consumer, you can look here:
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka


On Mon, Feb 9, 2015 at 10:23 AM, Jaikiran Pai 
wrote:

> Ricardo,
>
> What does your consumer code look like? If it's too big to have that code
> inline in this mail, you can perhaps put it in a github repo or even a gist.
>
> -Jaikiran
>
>
> On Monday 09 February 2015 12:54 AM, Ricardo Ferreira wrote:
>
>> Hi Gwen,
>>
>> Sorry, both the consumer and the broker are 0.8.2?
>> A = Yes
>>
>> So what's on 0.8.1?
>> A = It works fine using 0.8.1 for server AND client.
>>
>> You probably know the consumer group of your application. Can you use the
>> offset checker tool on that?
>> A = Yes, I know from the consumer, and the offset checker gave me nothing
>> about that group.
>>
>> Thanks,
>>
>> Ricardo
>>
>> On Sun, Feb 8, 2015 at 1:19 PM, Gwen Shapira 
>> wrote:
>>
>>  Sorry, both the consumer and the broker are 0.8.2?
>>>
>>> So what's on 0.8.1?
>>>
>>> I seriously doubt downgrading is the solution.
>>>
>>> You probably know the consumer group of your application. Can you use the
>>> offset checker tool on that?
>>>
>>> Gwen
>>>
>>> On Sun, Feb 8, 2015 at 9:01 AM, Ricardo Ferreira <
>>> jricardoferre...@gmail.com> wrote:
>>>
>>>  Hi Gwen,

 Thanks for the response.

 In my case, I have both consumer application and the server versions in
 0.8.2, Scala 2.10.

 No errors are thrown, and my *zookeeper.session.timeout.ms
 * property is set to 500, although
 I tried 5000 and also didn't worked.

 I checked the offset checker tool, but it asks for a group in which I
 don't know which group the kafka-console-producer is using. I tried the
 consumer group but it gave the following message:

 Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
 KeeperErrorCode = NoNode for /consumers/test-consumer-
 group/offsets/test/0.

 Perhaps the solution is downgrade the consumer libs to 0.8.1?

 Thanks,

 Ricardo

 On Sun, Feb 8, 2015 at 11:27 AM, Gwen Shapira 
 wrote:

  I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few
> of them actually :)
> AFAIK the API did not change.
>
> Do you see any error messages? Do you have timeout configured on the
> consumer? What does the offset checker tool say?
>
> On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira <
> jricardoferre...@gmail.com> wrote:
>
>  Hi,
>>
>> I had a client running on Kafka 0.8.1 using the High-Level consumer
>> API
>> working fine.
>>
>> Today I updated my Kafka installation for the 0.8.2 version (tried all
>> versions of Scala) but the consumer doesn't get any messages. I tested
>> using the kafka-console-consumer.sh utility tool and works fine, only
>> my
>> Java program that not.
>>
>> Did I miss something? I heard that the API changed, so I'd like to
>> know
>> if
>> someone can share a simple client with me.
>>
>> Please, respond directly to me or just reply all because I am not
>> currently
>> subscribed to the group.
>>
>> Thanks,
>>
>> Ricardo Ferreira
>>
>>
>
>


-- 
Regards
Vamsi Subhash