getting error in fetching topic metadata,when giving message to console producer in Kafka cluster
Hi, I had 3 nodes Kafka cluster with default.replication.factor=3 in all the three nodes. I create a topic "topic1" with 7 partitions and with rep-factor = 3,my list topic command gave below o/p: topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1,2,3 topic: topic1partition: 1leader: 2replicas: 1,3,2isr: 2,1,3 topic: topic1partition: 2leader: 3replicas: 2,1,3isr: 3,1,2 topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1,2,3 topic: topic1partition: 4leader: 2replicas: 1,2,3isr: 2,1,3 topic: topic1partition: 5leader: 3replicas: 2,3,1isr: 3,1,2 topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1,2,3 Then I removed 2 nodes. Now I have single node cluster with the same property default.replication.factor=3.So the list topic command is giving me the below o/p: topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1 topic: topic1partition: 1leader: 1replicas: 1,3,2isr: 1 topic: topic1partition: 2leader: 1replicas: 2,1,3isr: 1 topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1 topic: topic1partition: 4leader: 1replicas: 1,2,3isr: 1 topic: topic1partition: 5leader: 1replicas: 2,3,1isr: 1 topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1 Now I started consoleProducer and give some message ,it is giving me Warning WARN Fetching topic metadata with correlation id 8 for topics [Set(topic1)] from broker [id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$) pasting here the full stack trace: [2013-10-17 13:01:25,776] WARN Fetching topic metadata with correlation id 8 for topics [Set(topic2)] from broker [id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$) java.io.EOFException: Received -1 when reading from channel, socket has likely been closed. at kafka.utils.Utils$.read(Utils.scala:395) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Receive$class.readCompletely(Transmission.scala:56) at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) at kafka.producer.SyncProducer.send(SyncProducer.scala:112) at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) [2013-10-17 13:01:25,778] ERROR fetching topic metadata for topics [Set(topic2)] from broker [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed (kafka.utils.Utils$) kafka.common.KafkaException: fetching topic metadata for topics [Set(topic2)] from broker [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) at kafka.utils.Utils$.swallow(Utils.scala:186) at kafka.utils.Logging$class.swallowError(Logging.scala:105) at kafka.utils.Utils$.swallowError(Utils.scala:45) at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.sc
Consumer doesn't start picking existing messages from topic until new data is added
Hi All I am facing issues with the kafka 0.8 consumer with the consumer not picking existing messages in the topic for consumption. Following is a snippet from my code : - public void run() { ConsumerIterator it = stream.iterator(); System.out.println("In Run : before while loop"); //*1st SYSO* while (it.hasNext()) { byte[] message = it.next().message(); System.out.println("Got New Mesage , Processing .. Dump in hbase table"); //*2nd SYSO* } Steps: - 1. Cleaned zookeeper and kafka logs. 2. Started both again and produced 1000 messages in the topic. 3. Started the consumer. the consumer , i got log till *1st SYSO* but no progress. it kept on waiting for the stream to get messages 4. Produced 1000 more messages in the topic, now the data consumption starts , but the messages which i produced earlier in the topic were not consumed, as the hbase table showed only 1000 messages , which increased further depending on the additional number of messages i produced further, but however, the 1st initial deficit is still there. As it seems to me, the 1st 1000 messages are being lost, although the topic is getting created in the kafka-log directory , with the log file showing the initial messages there in the logbefore i start the consumer) , but the consumer does not seems to be picking them up. I am using Zookeeper-3.4.5 . Kafka-Settings are default , except that i the partitions for each topic are set to 1. Please help me out. Thanks Tarang Dawer
Re: Consumer doesn't start picking existing messages from topic until new data is added
try this, not sure if it would help though props.put("auto.offset.reset", "smallest"); A 17/10/2013, às 14:13, Tarang Dawer escreveu: > Hi All > > I am facing issues with the kafka 0.8 consumer with the consumer not > picking existing messages in the topic for consumption. > > > > Following is a snippet from my code : - > > > public void run() { >ConsumerIterator it = stream.iterator(); >System.out.println("In Run : before > while loop"); //*1st SYSO* >while (it.hasNext()) { >byte[] message = it.next().message(); >System.out.println("Got New Mesage , Processing .. Dump in > hbase table"); //*2nd SYSO* > } > > > Steps: - > 1. Cleaned zookeeper and kafka logs. > 2. Started both again and produced 1000 messages in the topic. > 3. Started the consumer. the consumer , i got log till *1st SYSO* but no > progress. it kept on waiting for the stream to get messages > 4. Produced 1000 more messages in the topic, now the data consumption > starts , but the messages which i produced earlier in the topic were not > consumed, as the hbase table showed only 1000 messages , which increased > further depending on the additional number of messages i produced further, > but however, the 1st initial deficit is still there. > > As it seems to me, the 1st 1000 messages are being lost, although the topic > is getting created in the kafka-log directory , with the log file showing > the initial messages there in the logbefore i start the consumer) , but the > consumer does not seems to be picking them up. > > I am using Zookeeper-3.4.5 . > > Kafka-Settings are default , except that i the partitions for each topic > are set to 1. > > Please help me out. > > > Thanks > Tarang Dawer signature.asc Description: Message signed with OpenPGP using GPGMail
Re: getting error in fetching topic metadata,when giving message to console producer in Kafka cluster
I think you provided the wrong port in broker-list. 2181 is the ZK port. Kafka port defaults to 9092. Thanks, Jun On Thu, Oct 17, 2013 at 1:51 AM, Monika Garg wrote: > Hi, > > I had 3 nodes Kafka cluster with default.replication.factor=3 in all the > three nodes. > I create a topic "topic1" with 7 partitions and with rep-factor = 3,my list > topic command gave below o/p: > > topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1,2,3 > topic: topic1partition: 1leader: 2replicas: 1,3,2isr: 2,1,3 > topic: topic1partition: 2leader: 3replicas: 2,1,3isr: 3,1,2 > topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1,2,3 > topic: topic1partition: 4leader: 2replicas: 1,2,3isr: 2,1,3 > topic: topic1partition: 5leader: 3replicas: 2,3,1isr: 3,1,2 > topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1,2,3 > > > Then I removed 2 nodes. > > Now I have single node cluster with the same property > default.replication.factor=3.So the list topic command is giving me the > below o/p: > > topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1 > topic: topic1partition: 1leader: 1replicas: 1,3,2isr: 1 > topic: topic1partition: 2leader: 1replicas: 2,1,3isr: 1 > topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1 > topic: topic1partition: 4leader: 1replicas: 1,2,3isr: 1 > topic: topic1partition: 5leader: 1replicas: 2,3,1isr: 1 > topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1 > > Now I started consoleProducer and give some message ,it is giving me > Warning > WARN Fetching topic metadata with correlation id 8 for topics [Set(topic1)] > from broker [id:0,host:192.168.145.107,port:2181] failed > (kafka.client.ClientUtils$) > > pasting here the full stack trace: > > [2013-10-17 13:01:25,776] WARN Fetching topic metadata with correlation id > 8 for topics [Set(topic2)] from broker > [id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$) > java.io.EOFException: Received -1 when reading from channel, socket has > likely been closed. > at kafka.utils.Utils$.read(Utils.scala:395) > at > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > at > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > at > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-10-17 13:01:25,778] ERROR fetching topic metadata for topics > [Set(topic2)] from broker > [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed > (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics > [Set(topic2)] from broker > [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > > kafka.producer.async.ProducerSendThread$$anonfun$proces
producer (or consumer?) statistics that was using metrics
I remember a while back Jay was looking for someone to work on producer (or was it consumer) statisitics which was going to use metrics. Was this ever implemented?
Fwd: Special Bay Area HUG: Tajo and Samza
FYI. -- Forwarded message -- From: Jakob Homan Date: Thu, Oct 17, 2013 at 11:08 AM Subject: Special Bay Area HUG: Tajo and Samza To: d...@samza.incubator.apache.org Hey everybody- Join us at LinkedIn Nov. 5 for a special HUG dedicated to two new awesome Incubator projects, Tajo, a low-latency SQL query engine atop YARN and Samza. http://www.meetup.com/hadoop/events/146077932/ -Jakob
Re: Flush configuration per topic
Is there only time delay or can it be set to flush for every message with the obvious performance hit? On Wed, Oct 16, 2013 at 9:49 AM, Jay Kreps wrote: > Yes, the change in trunk is that all log configurations are automatically > available at both the log level and the global default level and can be set > at topic creation time or changed later without bouncing any servers. > > -Jay > > > On Tue, Oct 15, 2013 at 5:47 PM, Simon Hørup Eskildsen > wrote: > > > Do you mean that it's possible to override log configurations per topic > in > > trunk? > > > > Yeah, you're right. :-) I wasn't sure what to call it if not consistency, > > even though I know that sort of has another meaning in this context. > > > > > > On Tue, Oct 15, 2013 at 6:53 PM, Jay Kreps wrote: > > > > > Yeah, looks like you are right, we don't have the per-topic override in > > 0.8 > > > :-( > > > > > > All log configurations are overridable in trunk which will be 0.8.1. > > > > > > Just to be totally clear this setting does not impact consistency (i.e. > > all > > > replicas will have the same messages in the same order), nor even > > > durability (if you have replication > 1), but just recoverability on a > > > single server in the event of a hard machine crash. > > > > > > -Jay > > > > > > > > > On Tue, Oct 15, 2013 at 2:07 PM, Simon Hørup Eskildsen > > > wrote: > > > > > > > 0.8, we're not on master, but we definitely can be. > > > > > > > > > > > > On Tue, Oct 15, 2013 at 5:03 PM, Jay Kreps > > wrote: > > > > > > > > > Hey Simon, > > > > > > > > > > What version of Kafka are you using? > > > > > > > > > > -Jay > > > > > > > > > > > > > > > On Tue, Oct 15, 2013 at 9:56 AM, Simon Hørup Eskildsen > > > > > wrote: > > > > > > > > > > > Hi Kafkas! > > > > > > > > > > > > Reading through the documentation and code of Kafka, it seems > there > > > is > > > > no > > > > > > feature to set flushing interval (messages/time) for a specific > > > topic. > > > > I > > > > > am > > > > > > interested in this to get consistency for certain topics by > > flushing > > > > > after > > > > > > every message, while having eventual consistency for other topics > > > > > > (default). > > > > > > > > > > > > Would there be interest in such feature? Then I might be curious > to > > > > give > > > > > > the Log module a dive and see if I can foster enough Scala to add > > > this. > > > > > > > > > > > > Thanks! > > > > > > > > > > > > -- > > > > > > Simon > > > > > > http://sirupsen.com/ > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Simon > > > > http://sirupsen.com/ > > > > > > > > > > > > > > > -- > > Simon > > http://sirupsen.com/ > > >
Broker bind address versus published hostname in ZooKeeper
Hi all, I'm getting started experimenting with Kafka and ran into a configuration issue. Currently, in server.properties, you can configure host.name which gets used for two purposes: 1) to bind the socket 2) to publish the broker details to ZK for clients to use. There are times when these two settings need to be different. Here's an example. I want to setup Kafka brokers on OpenStack virtual machines in a private cloud but I need producers to connect from elsewhere on the internal corporate network. With OpenStack, the virtual machines are only exposed to DHCP addresses (typically RFC 1918 private addresses). You can assign "floating ips" to a virtual machine but it's forwarded using Network Address Translation and not exposed directly to the VM. Also, there's typically no DNS to provide hostname lookup. Hosts have names like "fubar.novalocal" that are not externally routable. Here's what I want. I want the broker to bind to the VM's private network IP but I want it to publish it's floating IP to ZooKeeper so that producers can publish to it. I propose a new optional parameter, "listen", which would allow you to specify the socket address to listen on. If not set, the parameter would default to host.name, which is the current behavior. #Publish the externally routable IP in ZK host.name = #Accept connections from any interface the VM knows about listen = * I'm assuming others will eventually have the same requirement so I've added a JIRA ticket. https://issues.apache.org/jira/browse/KAFKA-1092 Thanks for your consideration. Cheers, Roger
Re: Broker bind address versus published hostname in ZooKeeper
Hi Roger, That's exactly what I need in my end, and actually internally created a new property called zkHost.name to publish a different host to zk. This is also needed for deploying Kafka into Azure. I also created zkHost.port since the internal and external ports that's exposed might be different as well. Tim On Thu, Oct 17, 2013 at 3:13 PM, Roger Hoover wrote: > Hi all, > > I'm getting started experimenting with Kafka and ran into a configuration > issue. > > Currently, in server.properties, you can configure host.name which gets > used for two purposes: 1) to bind the socket 2) to publish the broker > details to ZK for clients to use. > > There are times when these two settings need to be different. Here's an > example. I want to setup Kafka brokers on OpenStack virtual machines in a > private cloud but I need producers to connect from elsewhere on the > internal corporate network. With OpenStack, the virtual machines are only > exposed to DHCP addresses (typically RFC 1918 private addresses). You can > assign "floating ips" to a virtual machine but it's forwarded using Network > Address Translation and not exposed directly to the VM. Also, there's > typically no DNS to provide hostname lookup. Hosts have names like > "fubar.novalocal" that are not externally routable. > > Here's what I want. I want the broker to bind to the VM's private network > IP but I want it to publish it's floating IP to ZooKeeper so that producers > can publish to it. > > I propose a new optional parameter, "listen", which would allow you to > specify the socket address to listen on. If not set, the parameter would > default to host.name, which is the current behavior. > > #Publish the externally routable IP in ZK > host.name = > #Accept connections from any interface the VM knows about > listen = * > > I'm assuming others will eventually have the same requirement so I've added > a JIRA ticket. > > https://issues.apache.org/jira/browse/KAFKA-1092 > > Thanks for your consideration. > > Cheers, > > Roger >
Re: Broker bind address versus published hostname in ZooKeeper
Tim, This seems like a reasonable requirement. Would you be interested in providing a patch to the jira? Thanks, Jun On Thu, Oct 17, 2013 at 3:20 PM, Timothy Chen wrote: > Hi Roger, > > That's exactly what I need in my end, and actually internally created a new > property called zkHost.name to publish a different host to zk. This is also > needed for deploying Kafka into Azure. > > I also created zkHost.port since the internal and external ports that's > exposed might be different as well. > > Tim > > > On Thu, Oct 17, 2013 at 3:13 PM, Roger Hoover >wrote: > > > Hi all, > > > > I'm getting started experimenting with Kafka and ran into a configuration > > issue. > > > > Currently, in server.properties, you can configure host.name which gets > > used for two purposes: 1) to bind the socket 2) to publish the broker > > details to ZK for clients to use. > > > > There are times when these two settings need to be different. Here's an > > example. I want to setup Kafka brokers on OpenStack virtual machines in a > > private cloud but I need producers to connect from elsewhere on the > > internal corporate network. With OpenStack, the virtual machines are only > > exposed to DHCP addresses (typically RFC 1918 private addresses). You can > > assign "floating ips" to a virtual machine but it's forwarded using > Network > > Address Translation and not exposed directly to the VM. Also, there's > > typically no DNS to provide hostname lookup. Hosts have names like > > "fubar.novalocal" that are not externally routable. > > > > Here's what I want. I want the broker to bind to the VM's private network > > IP but I want it to publish it's floating IP to ZooKeeper so that > producers > > can publish to it. > > > > I propose a new optional parameter, "listen", which would allow you to > > specify the socket address to listen on. If not set, the parameter would > > default to host.name, which is the current behavior. > > > > #Publish the externally routable IP in ZK > > host.name = > > #Accept connections from any interface the VM knows about > > listen = * > > > > I'm assuming others will eventually have the same requirement so I've > added > > a JIRA ticket. > > > > https://issues.apache.org/jira/browse/KAFKA-1092 > > > > Thanks for your consideration. > > > > Cheers, > > > > Roger > > >
Re: Handling consumer rebalance when implementing synchronous auto-offset commit
We should be able to get this in after 0.8.1 and probably before the client rewrite. Thanks, Joel On Wednesday, October 16, 2013, Jason Rosenberg wrote: > This looks great. What is the time frame for this effort? > > Jason > > > On Wed, Oct 16, 2013 at 2:19 PM, Joel Koshy wrote: > > > Btw, after we complete KAFKA-1000 (offset management in Kafka) it > > should be reasonable to commit offsets on every message as long as the > > optional metadata portion of the offset commit request is small/empty. > > > > Thanks, > > > > Joel > > > > > > On Wed, Oct 16, 2013 at 10:35 AM, Jason Rosenberg > > wrote: > > > That would be great. Additionally, in the new api, it would be awesome > > > augment the default auto-commit functionality to allow client code to > > mark > > > a message for commit only after processing a message successfully! > > > > > > > > > On Wed, Oct 16, 2013 at 7:52 AM, Jun Rao wrote: > > > > > >> For manual offset commits, it will be useful to have some kind of API > > that > > >> informs the client when a rebalance is going to happen. We can think > > about > > >> this when we do the client rewrite. > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> > > >> On Tue, Oct 15, 2013 at 9:21 PM, Jason Rosenberg > > wrote: > > >> > > >> > Jun, > > >> > > > >> > Yes, sorry, I think that was the basis for my question. When auto > > >> commit > > >> > is enabled, special care is taken to make sure things are > > auto-committed > > >> > during a rebalance. This is needed because when a topic moves off > of > > a > > >> > consumer thread (since it is being rebalanced to another one), it's > > as if > > >> > that topic is being shutdown on that connector, and any > > not-yet-committed > > >> > messages need to be committed before letting go of the topic. > > >> > > > >> > So, my question is around trying to understand if there's a way I > can > > >> > reproduce similar functionality using my own sync auto commit > > >> > implementation (and I'm not sure there is). It seems that when > > there's a > > >> > rebalance, all processed but not-yet-committed offsets will not be > > >> > committed, and thus there will be no way to prevent pretty massive > > >> > duplicate consumption on a rebalance. Is that about right? Or is > > there > > >> > someway around this that I'm not seeing? > > >> > > > >> > The auto-commit functionality that's builtin is so close to being > all > > >> that > > >> > anyone would need, except it has a glaring weakness, in that it will > > >> cause > > >> > messages to be lost from time to time, and so I don't know that it > > will > > >> > meet the needs of trying to have reliable delivery (with duplicates > > ok). > > >> > > > >> > Jason > > >> > > > >> > > > >> > On Tue, Oct 15, 2013 at 9:00 PM, Jun Rao wrote: > > >> > > > >> > > If auto commit is disabled, the consumer connector won't call > > >> > commitOffsets > > >> > > during rebalancing. > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Jun > > >> > > > > >> > > > > >> > > On Tue, Oct 15, 2013 at 4:16 PM, Jason Rosenberg < > j...@squareup.com> > > >> > wrote: > > >> > > > > >> > > > I'm looking at implementing a synchronous auto offset commit > > >> solution. > > >> > > > People have discussed the need for this in previous > > >> > > > threads..Basically, in my consumer loop, I want to make > sure a > > >> > > message > > >> > > > has been actually processed before allowing it's offset to be > > >> > committed. > > >> > > > But I d -- Sent from Gmail Mobile
Re: getting error in fetching topic metadata,when giving message to console producer in Kafka cluster
Thanks Jun...You are right,bymistakenly I have given zookeeper port. Now its working fine. On Thu, Oct 17, 2013 at 8:50 PM, Jun Rao wrote: > I think you provided the wrong port in broker-list. 2181 is the ZK port. > Kafka port defaults to 9092. > > Thanks, > > Jun > > > On Thu, Oct 17, 2013 at 1:51 AM, Monika Garg wrote: > > > Hi, > > > > I had 3 nodes Kafka cluster with default.replication.factor=3 in all the > > three nodes. > > I create a topic "topic1" with 7 partitions and with rep-factor = 3,my > list > > topic command gave below o/p: > > > > topic: topic1partition: 0leader: 1replicas: 3,2,1isr: > 1,2,3 > > topic: topic1partition: 1leader: 2replicas: 1,3,2isr: > 2,1,3 > > topic: topic1partition: 2leader: 3replicas: 2,1,3isr: > 3,1,2 > > topic: topic1partition: 3leader: 1replicas: 3,1,2isr: > 1,2,3 > > topic: topic1partition: 4leader: 2replicas: 1,2,3isr: > 2,1,3 > > topic: topic1partition: 5leader: 3replicas: 2,3,1isr: > 3,1,2 > > topic: topic1partition: 6leader: 1replicas: 3,2,1isr: > 1,2,3 > > > > > > Then I removed 2 nodes. > > > > Now I have single node cluster with the same property > > default.replication.factor=3.So the list topic command is giving me the > > below o/p: > > > > topic: topic1partition: 0leader: 1replicas: 3,2,1isr: 1 > > topic: topic1partition: 1leader: 1replicas: 1,3,2isr: 1 > > topic: topic1partition: 2leader: 1replicas: 2,1,3isr: 1 > > topic: topic1partition: 3leader: 1replicas: 3,1,2isr: 1 > > topic: topic1partition: 4leader: 1replicas: 1,2,3isr: 1 > > topic: topic1partition: 5leader: 1replicas: 2,3,1isr: 1 > > topic: topic1partition: 6leader: 1replicas: 3,2,1isr: 1 > > > > Now I started consoleProducer and give some message ,it is giving me > > Warning > > WARN Fetching topic metadata with correlation id 8 for topics > [Set(topic1)] > > from broker [id:0,host:192.168.145.107,port:2181] failed > > (kafka.client.ClientUtils$) > > > > pasting here the full stack trace: > > > > [2013-10-17 13:01:25,776] WARN Fetching topic metadata with correlation > id > > 8 for topics [Set(topic2)] from broker > > [id:0,host:192.168.145.107,port:2181] failed (kafka.client.ClientUtils$) > > java.io.EOFException: Received -1 when reading from channel, socket has > > likely been closed. > > at kafka.utils.Utils$.read(Utils.scala:395) > > at > > > > > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Receive$class.readCompletely(Transmission.scala:56) > > at > > > > > kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) > > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) > > at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:74) > > at > > > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) > > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > at kafka.utils.Utils$.swallow(Utils.scala:186) > > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > > at kafka.utils.Utils$.swallowError(Utils.scala:45) > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > > at > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > > at > > > > > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > > at > > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > > [2013-10-17 13:01:25,778] ERROR fetching topic metadata for topics > > [Set(topic2)] from broker > > [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed > > (kafka.utils.Utils$) > > kafka.common.KafkaException: fetching topic metadata for topics > > [Set(topic2)] from broker > > [ArrayBuffer(id:0,host:192.168.145.107,port:2181)] failed > > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > > at > > > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > > at > > > > > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > > at kafka.utils.Utils$.swal