getting error in fetching topic metadata,when giving message to console producer in Kafka cluster

2013-10-17 Thread Monika Garg
Hi,

I had 3 nodes Kafka cluster with default.replication.factor=3 in all the
three nodes.
I create a topic "topic1" with 7 partitions and with rep-factor = 3,my list
topic command gave below o/p:

topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1,2,3
topic: topic1partition: 1leader: 2replicas: 1,3,2isr: 2,1,3
topic: topic1partition: 2leader: 3replicas: 2,1,3isr: 3,1,2
topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1,2,3
topic: topic1partition: 4leader: 2replicas: 1,2,3isr: 2,1,3
topic: topic1partition: 5leader: 3replicas: 2,3,1isr: 3,1,2
topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1,2,3


Then I removed 2 nodes.

Now I have single node cluster with the same property
default.replication.factor=3.So the list topic command is giving me the
below o/p:

topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1
topic: topic1partition: 1leader: 1replicas: 1,3,2isr: 1
topic: topic1partition: 2leader: 1replicas: 2,1,3isr: 1
topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1
topic: topic1partition: 4leader: 1replicas: 1,2,3isr: 1
topic: topic1partition: 5leader: 1replicas: 2,3,1isr: 1
topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1

Now I started consoleProducer and give some message ,it is giving me
Warning
WARN Fetching topic metadata with correlation id 8 for topics [Set(topic1)]
from broker [id:0,host:192.168.145.107,port:2181] failed
(kafka.client.ClientUtils$)

pasting here the full stack trace:

[2013-10-17 13:01:25,776] WARN Fetching topic metadata with correlation id
8 for topics [Set(topic2)] from broker
[id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$)
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:395)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
at
kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
[2013-10-17 13:01:25,778] ERROR fetching topic metadata for topics
[Set(topic2)] from broker
[ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed
(kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(topic2)] from broker
[ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
at
kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at
kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
at kafka.utils.Utils$.swallow(Utils.scala:186)
at kafka.utils.Logging$class.swallowError(Logging.scala:105)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
at scala.collection.immutable.Stream.foreach(Stream.scala:254)
at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.sc

Consumer doesn't start picking existing messages from topic until new data is added

2013-10-17 Thread Tarang Dawer
Hi All

I am facing issues with the kafka 0.8 consumer with the consumer not
picking existing messages in the topic for consumption.



Following is a snippet from my code  : -


public void run() {
ConsumerIterator it = stream.iterator();
System.out.println("In Run : before
while loop");   //*1st SYSO*
while (it.hasNext()) {
byte[] message = it.next().message();
System.out.println("Got New Mesage , Processing .. Dump in
hbase table");   //*2nd SYSO*
}


Steps: -
1. Cleaned  zookeeper and kafka logs.
2. Started both again and produced 1000 messages in the topic.
3. Started the consumer.  the consumer , i got log till  *1st SYSO* but no
progress. it kept on waiting for the stream to get messages
4. Produced 1000 more messages in the topic, now the data consumption
starts , but the messages which i produced earlier in the topic were not
consumed, as the hbase table showed only 1000 messages , which increased
further depending on the additional number of messages i produced further,
but however, the 1st initial deficit is still there.

As it seems to me, the 1st 1000 messages are being lost, although the topic
is getting created in the kafka-log directory , with the log file showing
the initial messages there in the logbefore i start the consumer) , but the
consumer does not seems to be picking them up.

I am using Zookeeper-3.4.5 .

Kafka-Settings are default , except that i the partitions for each topic
are set to 1.

Please help me out.


Thanks
Tarang Dawer


Re: Consumer doesn't start picking existing messages from topic until new data is added

2013-10-17 Thread Bruno D. Rodrigues
try this, not sure if it would help though

props.put("auto.offset.reset", "smallest");

A 17/10/2013, às 14:13, Tarang Dawer  escreveu:

> Hi All
> 
> I am facing issues with the kafka 0.8 consumer with the consumer not
> picking existing messages in the topic for consumption.
> 
> 
> 
> Following is a snippet from my code  : -
> 
> 
> public void run() {
>ConsumerIterator it = stream.iterator();
>System.out.println("In Run : before
> while loop");   //*1st SYSO*
>while (it.hasNext()) {
>byte[] message = it.next().message();
>System.out.println("Got New Mesage , Processing .. Dump in
> hbase table");   //*2nd SYSO*
> }
> 
> 
> Steps: -
> 1. Cleaned  zookeeper and kafka logs.
> 2. Started both again and produced 1000 messages in the topic.
> 3. Started the consumer.  the consumer , i got log till  *1st SYSO* but no
> progress. it kept on waiting for the stream to get messages
> 4. Produced 1000 more messages in the topic, now the data consumption
> starts , but the messages which i produced earlier in the topic were not
> consumed, as the hbase table showed only 1000 messages , which increased
> further depending on the additional number of messages i produced further,
> but however, the 1st initial deficit is still there.
> 
> As it seems to me, the 1st 1000 messages are being lost, although the topic
> is getting created in the kafka-log directory , with the log file showing
> the initial messages there in the logbefore i start the consumer) , but the
> consumer does not seems to be picking them up.
> 
> I am using Zookeeper-3.4.5 .
> 
> Kafka-Settings are default , except that i the partitions for each topic
> are set to 1.
> 
> Please help me out.
> 
> 
> Thanks
> Tarang Dawer



signature.asc
Description: Message signed with OpenPGP using GPGMail


Re: getting error in fetching topic metadata,when giving message to console producer in Kafka cluster

2013-10-17 Thread Jun Rao
I think you provided the wrong port in broker-list. 2181 is the ZK port.
Kafka port defaults to 9092.

Thanks,

Jun


On Thu, Oct 17, 2013 at 1:51 AM, Monika Garg  wrote:

> Hi,
>
> I had 3 nodes Kafka cluster with default.replication.factor=3 in all the
> three nodes.
> I create a topic "topic1" with 7 partitions and with rep-factor = 3,my list
> topic command gave below o/p:
>
> topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1,2,3
> topic: topic1partition: 1leader: 2replicas: 1,3,2isr: 2,1,3
> topic: topic1partition: 2leader: 3replicas: 2,1,3isr: 3,1,2
> topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1,2,3
> topic: topic1partition: 4leader: 2replicas: 1,2,3isr: 2,1,3
> topic: topic1partition: 5leader: 3replicas: 2,3,1isr: 3,1,2
> topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1,2,3
>
>
> Then I removed 2 nodes.
>
> Now I have single node cluster with the same property
> default.replication.factor=3.So the list topic command is giving me the
> below o/p:
>
> topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1
> topic: topic1partition: 1leader: 1replicas: 1,3,2isr: 1
> topic: topic1partition: 2leader: 1replicas: 2,1,3isr: 1
> topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1
> topic: topic1partition: 4leader: 1replicas: 1,2,3isr: 1
> topic: topic1partition: 5leader: 1replicas: 2,3,1isr: 1
> topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1
>
> Now I started consoleProducer and give some message ,it is giving me
> Warning
> WARN Fetching topic metadata with correlation id 8 for topics [Set(topic1)]
> from broker [id:0,host:192.168.145.107,port:2181] failed
> (kafka.client.ClientUtils$)
>
> pasting here the full stack trace:
>
> [2013-10-17 13:01:25,776] WARN Fetching topic metadata with correlation id
> 8 for topics [Set(topic2)] from broker
> [id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:395)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> at
>
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-10-17 13:01:25,778] ERROR fetching topic metadata for topics
> [Set(topic2)] from broker
> [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed
> (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(topic2)] from broker
> [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at
>
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> at kafka.utils.Utils$.swallow(Utils.scala:186)
> at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> at kafka.utils.Utils$.swallowError(Utils.scala:45)
> at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> at
>
> kafka.producer.async.ProducerSendThread$$anonfun$proces

producer (or consumer?) statistics that was using metrics

2013-10-17 Thread S Ahmed
I remember a while back Jay was looking for someone to work on producer (or
was it consumer) statisitics which was going to use metrics.

Was this ever implemented?


Fwd: Special Bay Area HUG: Tajo and Samza

2013-10-17 Thread Jay Kreps
FYI.

-- Forwarded message --
From: Jakob Homan 
Date: Thu, Oct 17, 2013 at 11:08 AM
Subject: Special Bay Area HUG: Tajo and Samza
To: d...@samza.incubator.apache.org


Hey everybody-
   Join us at LinkedIn Nov. 5 for a special HUG dedicated to two new
awesome Incubator projects, Tajo, a low-latency SQL query engine atop YARN
and Samza.

http://www.meetup.com/hadoop/events/146077932/

-Jakob


Re: Flush configuration per topic

2013-10-17 Thread Steve Morin
Is there only time delay or can it be set to flush for every message with
the obvious performance hit?


On Wed, Oct 16, 2013 at 9:49 AM, Jay Kreps  wrote:

> Yes, the change in trunk is that all log configurations are automatically
> available at both the log level and the global default level and can be set
> at topic creation time or changed later without bouncing any servers.
>
> -Jay
>
>
> On Tue, Oct 15, 2013 at 5:47 PM, Simon Hørup Eskildsen
> wrote:
>
> > Do you mean that it's possible to override log configurations per topic
> in
> > trunk?
> >
> > Yeah, you're right. :-) I wasn't sure what to call it if not consistency,
> > even though I know that sort of has another meaning in this context.
> >
> >
> > On Tue, Oct 15, 2013 at 6:53 PM, Jay Kreps  wrote:
> >
> > > Yeah, looks like you are right, we don't have the per-topic override in
> > 0.8
> > > :-(
> > >
> > > All log configurations are overridable in trunk which will be 0.8.1.
> > >
> > > Just to be totally clear this setting does not impact consistency (i.e.
> > all
> > > replicas will have the same messages in the same order), nor even
> > > durability (if you have replication > 1), but just recoverability on a
> > > single server in the event of a hard machine crash.
> > >
> > > -Jay
> > >
> > >
> > > On Tue, Oct 15, 2013 at 2:07 PM, Simon Hørup Eskildsen
> > > wrote:
> > >
> > > > 0.8, we're not on master, but we definitely can be.
> > > >
> > > >
> > > > On Tue, Oct 15, 2013 at 5:03 PM, Jay Kreps 
> > wrote:
> > > >
> > > > > Hey Simon,
> > > > >
> > > > > What version of Kafka are you using?
> > > > >
> > > > > -Jay
> > > > >
> > > > >
> > > > > On Tue, Oct 15, 2013 at 9:56 AM, Simon Hørup Eskildsen
> > > > > wrote:
> > > > >
> > > > > > Hi Kafkas!
> > > > > >
> > > > > > Reading through the documentation and code of Kafka, it seems
> there
> > > is
> > > > no
> > > > > > feature to set flushing interval (messages/time) for a specific
> > > topic.
> > > > I
> > > > > am
> > > > > > interested in this to get consistency for certain topics by
> > flushing
> > > > > after
> > > > > > every message, while having eventual consistency for other topics
> > > > > > (default).
> > > > > >
> > > > > > Would there be interest in such feature? Then I might be curious
> to
> > > > give
> > > > > > the Log module a dive and see if I can foster enough Scala to add
> > > this.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > --
> > > > > > Simon
> > > > > > http://sirupsen.com/
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Simon
> > > > http://sirupsen.com/
> > > >
> > >
> >
> >
> >
> > --
> > Simon
> > http://sirupsen.com/
> >
>


Broker bind address versus published hostname in ZooKeeper

2013-10-17 Thread Roger Hoover
Hi all,

I'm getting started experimenting with Kafka and ran into a configuration
issue.

Currently, in server.properties, you can configure host.name which gets
used for two purposes: 1) to bind the socket 2) to publish the broker
details to ZK for clients to use.

There are times when these two settings need to be different. Here's an
example. I want to setup Kafka brokers on OpenStack virtual machines in a
private cloud but I need producers to connect from elsewhere on the
internal corporate network. With OpenStack, the virtual machines are only
exposed to DHCP addresses (typically RFC 1918 private addresses). You can
assign "floating ips" to a virtual machine but it's forwarded using Network
Address Translation and not exposed directly to the VM. Also, there's
typically no DNS to provide hostname lookup. Hosts have names like
"fubar.novalocal" that are not externally routable.

Here's what I want. I want the broker to bind to the VM's private network
IP but I want it to publish it's floating IP to ZooKeeper so that producers
can publish to it.

I propose a new optional parameter, "listen", which would allow you to
specify the socket address to listen on. If not set, the parameter would
default to host.name, which is the current behavior.

#Publish the externally routable IP in ZK
host.name = 
#Accept connections from any interface the VM knows about
listen = *

I'm assuming others will eventually have the same requirement so I've added
a JIRA ticket.

https://issues.apache.org/jira/browse/KAFKA-1092

Thanks for your consideration.

Cheers,

Roger


Re: Broker bind address versus published hostname in ZooKeeper

2013-10-17 Thread Timothy Chen
Hi Roger,

That's exactly what I need in my end, and actually internally created a new
property called zkHost.name to publish a different host to zk. This is also
needed for deploying Kafka into Azure.

I also created zkHost.port since the internal and external ports that's
exposed might be different as well.

Tim


On Thu, Oct 17, 2013 at 3:13 PM, Roger Hoover wrote:

> Hi all,
>
> I'm getting started experimenting with Kafka and ran into a configuration
> issue.
>
> Currently, in server.properties, you can configure host.name which gets
> used for two purposes: 1) to bind the socket 2) to publish the broker
> details to ZK for clients to use.
>
> There are times when these two settings need to be different. Here's an
> example. I want to setup Kafka brokers on OpenStack virtual machines in a
> private cloud but I need producers to connect from elsewhere on the
> internal corporate network. With OpenStack, the virtual machines are only
> exposed to DHCP addresses (typically RFC 1918 private addresses). You can
> assign "floating ips" to a virtual machine but it's forwarded using Network
> Address Translation and not exposed directly to the VM. Also, there's
> typically no DNS to provide hostname lookup. Hosts have names like
> "fubar.novalocal" that are not externally routable.
>
> Here's what I want. I want the broker to bind to the VM's private network
> IP but I want it to publish it's floating IP to ZooKeeper so that producers
> can publish to it.
>
> I propose a new optional parameter, "listen", which would allow you to
> specify the socket address to listen on. If not set, the parameter would
> default to host.name, which is the current behavior.
>
> #Publish the externally routable IP in ZK
> host.name = 
> #Accept connections from any interface the VM knows about
> listen = *
>
> I'm assuming others will eventually have the same requirement so I've added
> a JIRA ticket.
>
> https://issues.apache.org/jira/browse/KAFKA-1092
>
> Thanks for your consideration.
>
> Cheers,
>
> Roger
>


Re: Broker bind address versus published hostname in ZooKeeper

2013-10-17 Thread Jun Rao
Tim,

This seems like a reasonable requirement. Would you be interested in
providing a patch to the jira?

Thanks,

Jun


On Thu, Oct 17, 2013 at 3:20 PM, Timothy Chen  wrote:

> Hi Roger,
>
> That's exactly what I need in my end, and actually internally created a new
> property called zkHost.name to publish a different host to zk. This is also
> needed for deploying Kafka into Azure.
>
> I also created zkHost.port since the internal and external ports that's
> exposed might be different as well.
>
> Tim
>
>
> On Thu, Oct 17, 2013 at 3:13 PM, Roger Hoover  >wrote:
>
> > Hi all,
> >
> > I'm getting started experimenting with Kafka and ran into a configuration
> > issue.
> >
> > Currently, in server.properties, you can configure host.name which gets
> > used for two purposes: 1) to bind the socket 2) to publish the broker
> > details to ZK for clients to use.
> >
> > There are times when these two settings need to be different. Here's an
> > example. I want to setup Kafka brokers on OpenStack virtual machines in a
> > private cloud but I need producers to connect from elsewhere on the
> > internal corporate network. With OpenStack, the virtual machines are only
> > exposed to DHCP addresses (typically RFC 1918 private addresses). You can
> > assign "floating ips" to a virtual machine but it's forwarded using
> Network
> > Address Translation and not exposed directly to the VM. Also, there's
> > typically no DNS to provide hostname lookup. Hosts have names like
> > "fubar.novalocal" that are not externally routable.
> >
> > Here's what I want. I want the broker to bind to the VM's private network
> > IP but I want it to publish it's floating IP to ZooKeeper so that
> producers
> > can publish to it.
> >
> > I propose a new optional parameter, "listen", which would allow you to
> > specify the socket address to listen on. If not set, the parameter would
> > default to host.name, which is the current behavior.
> >
> > #Publish the externally routable IP in ZK
> > host.name = 
> > #Accept connections from any interface the VM knows about
> > listen = *
> >
> > I'm assuming others will eventually have the same requirement so I've
> added
> > a JIRA ticket.
> >
> > https://issues.apache.org/jira/browse/KAFKA-1092
> >
> > Thanks for your consideration.
> >
> > Cheers,
> >
> > Roger
> >
>


Re: Handling consumer rebalance when implementing synchronous auto-offset commit

2013-10-17 Thread Joel Koshy
We should be able to get this in after 0.8.1 and probably before the client
rewrite.

Thanks,

Joel

On Wednesday, October 16, 2013, Jason Rosenberg wrote:

> This looks great.   What is the time frame for this effort?
>
> Jason
>
>
> On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy  wrote:
>
> > Btw, after we complete KAFKA-1000 (offset management in Kafka) it
> > should be reasonable to commit offsets on every message as long as the
> > optional metadata portion of the offset commit request is small/empty.
> >
> > Thanks,
> >
> > Joel
> >
> >
> > On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg 
> > wrote:
> > > That would be great.  Additionally, in the new api, it would be awesome
> > > augment the default auto-commit functionality to allow client code to
> > mark
> > > a message for commit only after processing a message successfully!
> > >
> > >
> > > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao  wrote:
> > >
> > >> For manual offset commits, it will be useful to have some kind of API
> > that
> > >> informs the client when a rebalance is going to happen. We can think
> > about
> > >> this when we do the client rewrite.
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg 
> > wrote:
> > >>
> > >> > Jun,
> > >> >
> > >> > Yes, sorry, I think that was the basis for my question.   When auto
> > >> commit
> > >> > is enabled, special care is taken to make sure things are
> > auto-committed
> > >> > during a rebalance.  This is needed because when a topic moves off
> of
> > a
> > >> > consumer thread (since it is being rebalanced to another one), it's
> > as if
> > >> > that topic is being shutdown on that connector, and any
> > not-yet-committed
> > >> > messages need to be committed before letting  go of the topic.
> > >> >
> > >> > So, my question is around trying to understand if there's a way I
> can
> > >> > reproduce similar functionality using my own sync auto commit
> > >> > implementation (and I'm not sure there is).  It seems that when
> > there's a
> > >> > rebalance, all processed but not-yet-committed offsets will not be
> > >> > committed, and thus there will be no way to prevent pretty massive
> > >> > duplicate consumption on a rebalance.  Is that about right?  Or is
> > there
> > >> > someway around this that I'm not seeing?
> > >> >
> > >> > The auto-commit functionality that's builtin is so close to being
> all
> > >> that
> > >> > anyone would need, except it has a glaring weakness, in that it will
> > >> cause
> > >> > messages to be lost from time to time, and so I don't know that it
> > will
> > >> > meet the needs of trying to have reliable delivery (with duplicates
> > ok).
> > >> >
> > >> > Jason
> > >> >
> > >> >
> > >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao  wrote:
> > >> >
> > >> > > If auto commit is disabled, the consumer connector won't call
> > >> > commitOffsets
> > >> > > during rebalancing.
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg <
> j...@squareup.com>
> > >> > wrote:
> > >> > >
> > >> > > > I'm looking at implementing a synchronous auto offset commit
> > >> solution.
> > >> > > >  People have discussed the need for this in previous
> > >> > > > threads..Basically, in my consumer loop, I want to make
> sure a
> > >> > > message
> > >> > > > has been actually processed before allowing it's offset to be
> > >> > committed.
> > >> > > >  But I d



-- 
Sent from Gmail Mobile


Re: getting error in fetching topic metadata,when giving message to console producer in Kafka cluster

2013-10-17 Thread Monika Garg
Thanks Jun...You are right,bymistakenly I have given zookeeper port.

Now its working fine.


On Thu, Oct 17, 2013 at 8:50 PM, Jun Rao  wrote:

> I think you provided the wrong port in broker-list. 2181 is the ZK port.
> Kafka port defaults to 9092.
>
> Thanks,
>
> Jun
>
>
> On Thu, Oct 17, 2013 at 1:51 AM, Monika Garg  wrote:
>
> > Hi,
> >
> > I had 3 nodes Kafka cluster with default.replication.factor=3 in all the
> > three nodes.
> > I create a topic "topic1" with 7 partitions and with rep-factor = 3,my
> list
> > topic command gave below o/p:
> >
> > topic: topic1partition: 0leader: 1replicas: 3,2,1isr:
> 1,2,3
> > topic: topic1partition: 1leader: 2replicas: 1,3,2isr:
> 2,1,3
> > topic: topic1partition: 2leader: 3replicas: 2,1,3isr:
> 3,1,2
> > topic: topic1partition: 3leader: 1replicas: 3,1,2isr:
> 1,2,3
> > topic: topic1partition: 4leader: 2replicas: 1,2,3isr:
> 2,1,3
> > topic: topic1partition: 5leader: 3replicas: 2,3,1isr:
> 3,1,2
> > topic: topic1partition: 6leader: 1replicas: 3,2,1isr:
> 1,2,3
> >
> >
> > Then I removed 2 nodes.
> >
> > Now I have single node cluster with the same property
> > default.replication.factor=3.So the list topic command is giving me the
> > below o/p:
> >
> > topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1
> > topic: topic1partition: 1leader: 1replicas: 1,3,2isr: 1
> > topic: topic1partition: 2leader: 1replicas: 2,1,3isr: 1
> > topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1
> > topic: topic1partition: 4leader: 1replicas: 1,2,3isr: 1
> > topic: topic1partition: 5leader: 1replicas: 2,3,1isr: 1
> > topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1
> >
> > Now I started consoleProducer and give some message ,it is giving me
> > Warning
> > WARN Fetching topic metadata with correlation id 8 for topics
> [Set(topic1)]
> > from broker [id:0,host:192.168.145.107,port:2181] failed
> > (kafka.client.ClientUtils$)
> >
> > pasting here the full stack trace:
> >
> > [2013-10-17 13:01:25,776] WARN Fetching topic metadata with correlation
> id
> > 8 for topics [Set(topic2)] from broker
> > [id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$)
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:395)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74)
> > at
> >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> > at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swallow(Utils.scala:186)
> > at kafka.utils.Logging$class.swallowError(Logging.scala:105)
> > at kafka.utils.Utils$.swallowError(Utils.scala:45)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > at scala.collection.immutable.Stream.foreach(Stream.scala:254)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
> > at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> > [2013-10-17 13:01:25,778] ERROR fetching topic metadata for topics
> > [Set(topic2)] from broker
> > [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed
> > (kafka.utils.Utils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(topic2)] from broker
> > [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed
> > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> > at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
> > at kafka.utils.Utils$.swal