RE: New Consumer API discussion
Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model
Re: New Consumer API discussion
That’s wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto:wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I
Re: New Consumer API discussion
We use kafka as a durable buffer for 3rd party event traffic. It acts as the event source in a lambda architecture. We want it to be exactly once and we are close, though we can lose messages aggregating for Hadoop. To really tie this all together, I think there should be an Apache project to implement a proper 3-phase distributed transaction capability, which the Kafka and Hadoop communities could implement together. This paper looks promising. It is a 3 RTT protocol, but it is non-blocking. This could be a part of a new consumer api, at some point. http://ieeexplore.ieee.org/xpl/articleDetails.jsp?arnumber=1703048 regards, Rob
Re: New Consumer API discussion
Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring” versus “you have drained the stream”. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I’ve done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice…build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were thread-safe, then we could register listeners for various events: * opening-stream * closing-stream * message-arrived * end-of-stream/no-more-messages-in-partition (for finite streams) * rebalance started * partition assigned * partition unassigned * rebalance finished * partition-offset-committed Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is there any sense in your providing a push-oriented KafkaMessageSource publishing OOB messages? thank you, Robert On Feb 21, 2014, at 5:59 PM, Jun Rao jun...@gmail.commailto:jun...@gmail.com wrote: Robert, Could you explain why you want to distinguish btw FetchingInProgressException and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.commailto:neha.narkh...@gmail.com mailto:neha.narkh...@gmail.comwrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr
Re: New Consumer API discussion
This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were thread-safe, then we could register listeners for various events: * opening-stream * closing-stream * message-arrived * end-of-stream/no-more-messages-in-partition (for finite streams) * rebalance started * partition assigned * partition unassigned * rebalance finished * partition-offset-committed Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is there any sense in your providing a push-oriented KafkaMessageSource publishing OOB messages? thank you, Robert On Feb 21, 2014, at 5:59 PM, Jun Rao jun...@gmail.commailto:jun...@gmail.commailto: jun...@gmail.commailto:jun...@gmail.com wrote: Robert, Could you explain why you want to distinguish btw FetchingInProgressException and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those
RE: Looks like consumer fetchers get stopped we are not getting any data
The core problem is our consumers stop consuming and lag increases. We found this blog: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped,why?. This lists 3 possibilities. The blog also talks earlier about spurious rebalances, due to improper GC settings, but we couldn't find what GC settings to use. We are considering changing the zookeeper timeouts. We are a little confused about the various issues, the sequence of issues and what could cause the consumers to stop reading. If the fetchers get shutdown, due to a ClosedByInterruptException in the leader_finder thread, which tells the executor_watcher thread to shutdown the fetchers, that would be another reason the consumers stop processing data. Is this possible? Thank you, rob -Original Message- From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] Sent: Friday, January 10, 2014 11:40 AM To: users@kafka.apache.org Subject: RE: Looks like consumer fetchers get stopped we are not getting any data It would be helpful if you guys can shed some light why all fetchers are getting stopped. -Original Message- From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] Sent: Friday, January 10, 2014 11:28 AM To: users@kafka.apache.org Subject: RE: Looks like consumer fetchers get stopped we are not getting any data We also got the below error when this happens. {2014-01-10 00:58:11,292} INFO [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b_watcher_executor] (?:?) - [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b], exception during rebalance org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/account-info-updated-hadoop-consumer/ids/account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at kafka.utils.ZkUtils$.readData(Unknown Source) at kafka.consumer.TopicCount$.constructTopicCount(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(Unknown Source) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:142) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(Unknown Source) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/account-info-updated-hadoop-consumer/ids/account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b at org.apache.zookeeper.KeeperException.create(KeeperException.java:102) at org.apache.zookeeper.KeeperException.create(KeeperException.java:42) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) ... 9 more -Original Message- From: Seshadri, Balaji [mailto:balaji.sesha...@dish.com] Sent: Friday, January 10, 2014 10:52 AM To: users@kafka.apache.org Subject: Looks like consumer fetchers get stopped we are not getting any data Please let us know why we are not getting any data from Kafaka after this log from Kafka,can you guys lets us know. What could be causing all fetchers associated to be stooped why it is not doing retry. {2014-01-10 00:58:09,284} WARN [account-info-updated-hadoop-consumer_tm1mwdpl04-1389222553159-ad59660b-leader-finder-thread] (?:?) - Fetching topic metadata with correlation id 3 for topics [Set(account-info-updated)] from broker [id:1,host:tm1-kafkabroker101,port:9092] failed java.nio.channels.ClosedByInterruptException at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:506) at java.nio.channels.SocketChannel.write(SocketChannel.java:493) at kafka.network.BoundedByteBufferSend.writeTo(Unknown Source) at kafka.network.Send$class.writeCompletely(Unknown Source) at kafka.network.BoundedByteBufferSend.writeCompletely(Unknown Source) at
how to force a consumer to start at the beginning
We are creating a consumer with properties and I did not see a property that screamed that it was to start at the beginning of a topic. Is there such a property? Thanks, rob [cid:image001.png@01CE891F.54E75000] Rob Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873
RE: C# client for kafka 0.8
Thanks for the information, Tim. I've never coded on the clr, so I asked only to attempt to entice an app group here, who does use C#. They are still considering the situation. thanks, rob Sent: Tuesday, June 18, 2013 9:47 AM To: users@kafka.apache.org Subject: Re: C# client for kafka 0.8 Hi Robert, The most recent one that I know of is the C# client that ExactTarget folks did, however not all calls are up to the 0.8 protocol so it doesn't completely work. I have a slightly more cleaned up version here https://github.com/tnachen/kafka/tree/feature/et-develop-0.8 It will be great if you are interested in finishing it :) Tim On Tue, Jun 18, 2013 at 8:23 AM, Withers, Robert robert.with...@dish.comwrote: I see an old C# client, which is 2 years old. Does anyone have a C# client that works with the kafka 0.8 producer? Thanks, rob
C# client for kafka 0.8
I see an old C# client, which is 2 years old. Does anyone have a C# client that works with the kafka 0.8 producer? Thanks, rob
RE: one consumerConnector or many?
Thanks for the info. Are you saying that even with a single connector, with say 3 topics and 3 threads per topic and 3 brokers with 3 partitions for all 3 topics on all 3 brokers, that a consumer box would have 9 sockets open? What if there are 6 partitions per topic, would that be 18 open sockets? I have read somewhere that a high partition number, per topic, is desirable, to scale out the consumers and to be prepared to dynamically scale out consumption during a traffic spike. Is it so? 100 topics, with 16 brokers and 200 partitions per topic with 1 consumer connector (just hypothetically so) would be 1600 sockets or 2 sockets? For sure these boxes have plenty of ports. I am just thinking through possible failures and what flexibility we have in configuration of producers/consumers to topics. Really the question is best practices in this area. A producer server handling 100+ msg types could also connect quite a bit. So, perhaps it is best to restrict producer and consumer servers to process a restricted class of types. Certainly if the producer is also hosting a web server, but perhaps not as dire on the consumer side. thanks, rob From: Chris Curtin [curtin.ch...@gmail.com] Sent: Wednesday, May 29, 2013 7:36 AM To: users Subject: Re: one consumerConnector or many? I'd look at a variation of #2. Can your messages by grouped into a 'class (for lack of a better term)' that are consumed together? For example a 'class' of 'auditing events' or 'sensor events'. The idea would to then have a topic for 'class'. A couple of benefits to this: - you can define your consumption of a 'class's resources by value. So the 'audit' topic may only get a 2 threaded consumer while the 'sensor' class gets a 10 threaded consumer. - you can stop processing a 'class' of messages if you need to without taking all the consumers off line (Assuming you have different processors or a way while running to alter your number of threads per topic.) Since it sounds like you may be frequently adding new message types this approach also allows you to decide if you want to shutdown only a part of your processing to add the new code to handle the message. Finally, why the concern about socket use? A well configured Windows or Linux machine can have thousands of open sockets without problems. Since 0.8.0 only connects to the Broker with the topic/partition you end up with 1 socket per topic/partition and consumer. Hope this helps, Chris On Wed, May 29, 2013 at 9:13 AM, Rob Withers reefed...@gmail.com wrote: In thinking about the design of consumption, we have in mind a generic consumer server which would consume from more than one message type. The handling of each type of message would be different. I suppose we could have upwards of say 50 different message types, eventually, maybe 100+ different types. Which of the following designs would be best and why would the other options be bad? 1) Have all message types go through one topic and use a dispatcher pattern to select the correct handler. Use one consumerConnector. 2) Use a different topic for each message type, but still use one consumerConnector and a dispatcher pattern. 3) Use a different topic for each message type and have a separate consumerConnector for each topic. I am struggling with whether my assumptions are correct. It seems that a single connector for a topic would establish one socket to each broker, as rebalancing assigns various partitions to that thread. Option 2 would pull messages from more than one topic through a single socket to a particular broker, is it so? Would option 3 be reasonable, establishing upwards of 100 sockets per broker? I am guestimating that option 2 is the right way forward, to bound socket use, and we'll need to figure out a way to parameterize stream consumption with the right handlers for a particular msg type. If we add a topic, do you think we should create a new connector or restart the original connector with the new topic in the map? Thanks, rob
RE: one consumerConnector or many?
Thanks, Jun. We have considered doing message filtering in the consumer. However, the thrust of my question below is not filtering, but dispatching. If we take Chris' recommendation and pump a small set of msg types, belonging to the same class of messages, such as Account History, through the same topic, we will want to process all the messages, but we will want to process each msg type within the class differently, so we will want to dispatch to different handlers. I totally see your point that if we only want to process a subset of the messages, then we really ought to filter in the producer and send the filtered message stream to its own topic. I am leaning toward the architecture of having a different consumerConnector per topic, as there ARE plenty of ports. This allows per topic control, which is useful. Do you see any issues with this approach? Thanks, rob -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Wednesday, May 29, 2013 9:58 AM To: users@kafka.apache.org Subject: Re: one consumerConnector or many? Rob, You are correct that each instance of consumer will use a single socket to connect to a broker, independent of # topics/partitions. One thing that's good to avoid is to read all data and filter in the consumer, especially when the data is consumed multiple times by different consumers. In this case, it's better to put the filtered data in a separate topic and let all consumers consume the filtered data directly. Thanks, Jun On Wed, May 29, 2013 at 6:13 AM, Rob Withers reefed...@gmail.com wrote: In thinking about the design of consumption, we have in mind a generic consumer server which would consume from more than one message type. The handling of each type of message would be different. I suppose we could have upwards of say 50 different message types, eventually, maybe 100+ different types. Which of the following designs would be best and why would the other options be bad? 1) Have all message types go through one topic and use a dispatcher pattern to select the correct handler. Use one consumerConnector. 2) Use a different topic for each message type, but still use one consumerConnector and a dispatcher pattern. 3) Use a different topic for each message type and have a separate consumerConnector for each topic. I am struggling with whether my assumptions are correct. It seems that a single connector for a topic would establish one socket to each broker, as rebalancing assigns various partitions to that thread. Option 2 would pull messages from more than one topic through a single socket to a particular broker, is it so? Would option 3 be reasonable, establishing upwards of 100 sockets per broker? I am guestimating that option 2 is the right way forward, to bound socket use, and we'll need to figure out a way to parameterize stream consumption with the right handlers for a particular msg type. If we add a topic, do you think we should create a new connector or restart the original connector with the new topic in the map? Thanks, rob
RE: are commitOffsets botched to zookeeper?
Awesome! Thanks for the clarification. I would like to offer my strong vote that this get tackled before a beta, to get it firmly into 0.8. Stabilize everything else to the existing use, but make offset updates batched. thanks, rob From: Neha Narkhede [neha.narkh...@gmail.com] Sent: Friday, May 17, 2013 7:17 AM To: users@kafka.apache.org Subject: RE: are commitOffsets botched to zookeeper? Sorry I wasn't clear. Zookeeper 3.4.x has this feature. As soon as 08 is stable and released it will be worth looking into when we can use zookeeper 3.4.x. Thanks, Neha On May 16, 2013 10:32 PM, Rob Withers reefed...@gmail.com wrote: Can a request be made to zookeeper for this feature? Thanks, rob -Original Message- From: Neha Narkhede [mailto:neha.narkh...@gmail.com] Sent: Thursday, May 16, 2013 9:53 PM To: users@kafka.apache.org Subject: Re: are commitOffsets botched to zookeeper? Currently Kafka depends on zookeeper 3.3.4 that doesn't have a batch write api. So if you commit after every message at a high rate, it will be slow and inefficient. Besides it will cause zookeeper performance to degrade. Thanks, Neha On May 16, 2013 6:54 PM, Rob Withers reefed...@gmail.com wrote: We are calling commitOffsets after every message consumption. It looks to be ~60% slower, with 29 partitions. If a single KafkaStream thread is from a connector, and there are 29 partitions, then commitOffsets sends 29 offset updates, correct? Are these offset updates batched in one send to zookeeper? thanks, rob
RE: possible to shutdown a consumerConnector without flushing the offset
Certainly I will try. Our understanding is that there are 2 scenarios where messages could be replayed: 1. if a consumer falls over hard, there are some message consumptions whose offsets had not yet been flushed to zookeeper and so when a rebalance occurs the consumer that starts getting messages from a partition that flipped from the broke consumer will replay some messages. 2. I think a combination of a leader election and a broker failure may replay messages. We want to demonstrate the first, but our stats and correlation harness needs to keep running. Love to demonstrate the second but egads, it's tricky. thanks, rob From: Neha Narkhede [neha.narkh...@gmail.com] Sent: Friday, May 17, 2013 7:29 AM To: users@kafka.apache.org Subject: Re: possible to shutdown a consumerConnector without flushing the offset Can you provide more details about what you mean by measuring replay when you kill a consumer? Thanks, Neha On May 17, 2013 6:26 AM, Withers, Robert robert.with...@dish.com wrote: Would it be possible for someone to provide me with a 0.8 jar that implements a ConsumerConnector.hardShutdown, which would interrupt all threads yet not do a final offset flush. We want to measure replay so we want to simulate a kill -9, but we want to keep running the process to flush stats and have them available locally as well. thanks, rob
RE: API to to query messages amount under one topic
Could you add some JMX stats for us, then? - Queue length, by group offset vs lastOffset - latency between produce and consume, again by group Thanks, Rob Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Friday, May 17, 2013 8:52 AM To: users@kafka.apache.org Subject: Re: API to to query messages amount under one topic For monitoring, we have jmxs on the broker for both the message and the byte rate. Thanks, Jun On Thu, May 16, 2013 at 10:04 PM, Rob Withers reefed...@gmail.com wrote: Immediately monitoring. Later, possible thresholding to evoke a reconfiguration of the number of partitions into a new topic and migrate message index/logs and redirect pubs/subs to the new topic, during a traffic spike. -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Thursday, May 16, 2013 10:00 PM To: users@kafka.apache.org Subject: Re: API to to query messages amount under one topic What do you plan to use this information for? Thanks, Jun On Thu, May 16, 2013 at 5:57 AM, Withers, Robert robert.with...@dish.comwrote: Any possibility for a queuedMessageCount(topic, partitionNumber) protocol? thanks, rob From: Jun Rao [jun...@gmail.com] Sent: Wednesday, May 15, 2013 10:59 PM To: users@kafka.apache.org Subject: Re: API to to query messages amount under one topic In 0.8, you can get the earliest and the latest offset using the getOffsetBefore api. The difference btw the two gives the number of messages on the broker. Thanks, Jun On Wed, May 15, 2013 at 6:05 PM, Sining Ma sinin...@aol.com wrote: Hi, Is there any APIs in kafka that I can use to query how many messages there are under one topic? This topic already exist in Kafka server, and producer is sending messages to one topic in kafka server -- Regards Sining Ma
RE: could an Encoder/Decoder be stateful?
I shall. Thanks! Thanks, Rob Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Friday, May 17, 2013 8:53 AM To: users@kafka.apache.org Subject: Re: could an Encoder/Decoder be stateful? Possible, but definitely a post 0.8 item. If you are interested, could you file a jira to track this? Thanks, Jun On Thu, May 16, 2013 at 10:06 PM, Rob Withers reefed...@gmail.com wrote: Could the producer be adapted to support the interface of the consumer? Thanks, rob -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Thursday, May 16, 2013 10:04 PM To: users@kafka.apache.org Subject: Re: could an Encoder/Decoder be stateful? The encoder instance can't be shared in different producers since it's instantiated through java reflection. Decoder instance can actually be shared by different consumers since it's passed in directly through the consumer api. Thanks, Jun On Thu, May 16, 2013 at 5:58 AM, Withers, Robert robert.with...@dish.comwrote: Is there a way to share an encoder instance, or inject shared state, across producers/consumers? Guice? From: Jun Rao [jun...@gmail.com] Sent: Wednesday, May 15, 2013 11:02 PM To: users@kafka.apache.org Subject: Re: could an Encoder/Decoder be stateful? Each producer/consumer uses a single instance of the encoder/decoder. Thanks, Jun On Wed, May 15, 2013 at 7:20 PM, Rob Withers reefed...@gmail.com wrote: Or is the same instance used for each (un)marshaling? It would be nice to have a cache and a duplicateMsgChecker function, from the app above to ensure transactional guarantees, and object ref substitutions during (de)serialization, to enable durable distributed objects and promises. Thanks, rob
RE: API to to query messages amount under one topic
Any possibility for a queuedMessageCount(topic, partitionNumber) protocol? thanks, rob From: Jun Rao [jun...@gmail.com] Sent: Wednesday, May 15, 2013 10:59 PM To: users@kafka.apache.org Subject: Re: API to to query messages amount under one topic In 0.8, you can get the earliest and the latest offset using the getOffsetBefore api. The difference btw the two gives the number of messages on the broker. Thanks, Jun On Wed, May 15, 2013 at 6:05 PM, Sining Ma sinin...@aol.com wrote: Hi, Is there any APIs in kafka that I can use to query how many messages there are under one topic? This topic already exist in Kafka server, and producer is sending messages to one topic in kafka server -- Regards Sining Ma
RE: when do you think 0.8 could get promoted from beta to a release?
Excellent news. Thanks, Neha. From: Neha Narkhede [neha.narkh...@gmail.com] Sent: Wednesday, May 15, 2013 10:25 PM To: users@kafka.apache.org Subject: Re: when do you think 0.8 could get promoted from beta to a release? We are currently working on fixing a blocker performance issue (KAFKA-901). We expect to resolve it by early next week after which a beta will be available. We could maybe target the release mid June? Thanks, Neha On May 15, 2013 7:27 PM, Rob Withers reefed...@gmail.com wrote: We are curious. It would be excellent if around 8/17 could be targeted, perhaps go for 7/17 as RC and let 8/17 be a RC2 date, a month before we would like to see it go to production. An RC with thorough testing with our application may be workable. Thanks, rob
only-once consumer groups
is it technically feasible to use an only-once simple consumer within a consumer group? thanks, rob
RE: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat
Hi Jun, It seems I failed to respond to this post. My apologies. I did test a week ago, with the latest 0.8 and it still failed. The reason I did wait so long is to get my home machine setup, which I finished today. At work we are tearing it up and leaving behind issues so we can get message traffic over kafka-0.7.2 into storm. We got this working yesterday afternoon..yay! I git cloned kafka (with EGit) and then sbt.bat update and sbt.bat package. Here is git log results: commit 9ff4e8eb10e0ddd86f257e99d55971a132426605 Author: Jay Kreps jay.kr...@gmail.com Date: Tue Mar 12 11:17:12 2013 -0700 KAFKA-739 Handle null message payloads in messages and in the log cleaner. R commit c1ed12e44ddebe41dc464683e3d7eeb4e6d39a45 Author: Jay Kreps jay.kr...@gmail.com Date: Fri Mar 8 15:07:39 2013 -0800 KAFKA-554 Dynamic per-topic configuration. This patch adds a mechanism for s commit 4f2742d60d16f5ba468aa66d2c3ed7aa37479dce Merge: 82b11aa 92ecebe Author: Jun Rao jun...@gmail.com Date: Sun Mar 3 20:20:41 2013 -0800 merge from 0.8 and resolve conflicts This problem still persists, unfortunately, running kafka-console-consumer.bat and kafka-console-producer.bat. Best, rob From: Jun Rao [jun...@gmail.com] Sent: Sunday, April 07, 2013 10:59 PM To: users@kafka.apache.org Subject: Re: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat That hash tag points to a commit on Apr 4 and is probably not what you have. Try git log on the checkout that builds your binary. If you can't figure this out, could you try the latest code in the 0.8 branch and see if the problem still exists? Thanks, Jun On Sat, Apr 6, 2013 at 10:36 AM, Withers, Robert robert.with...@dish.comwrote: I am not entirely sure as it has been about a year since I used git. Looking in .git\refs\heads\0.8, I have afecc9f23108b100b27017974b132331d6bab8e6. .git\HEAD says ref: refs/heads/0.8. If this is not what you need, tell me how I can get you the right info. Thanks, Rob From: Jun Rao [jun...@gmail.com] Sent: Friday, April 05, 2013 10:10 PM To: users@kafka.apache.org Subject: Re: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat The following is the problem. The broker fails to become the leader. Do you know the revision of the code you are using so that we know the exact line that's causing the problem? Thanks, Jun [2013-04-05 10:21:09,660] ERROR Replica Manager on Broker 0: Error processing le aderAndISR request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 7; Clie ntId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 1; PartitionStateInfo: (test,0) - PartitionStateInfo(LeaderIsrAndControllerEpoch({ ISR:0, leader:0, le aderEpoch:0 },1),1); Leaders: id:0,host:MERD7-178041.echostar.comhttp://merd7-178041.echostar.com/ ,port:9092 ( kafka.server.ReplicaManager) java.util.NoSuchElementException: key not found: \tmp\kafka-logs at scala.collection.MapLike$class.default(MapLike.scala:223) at scala.collection.immutable.Map$Map1.default(Map.scala:93) at scala.collection.MapLike$class.apply(MapLike.scala:134) at scala.collection.immutable.Map$Map1.apply(Map.scala:93) at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:81) at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:145) at kafka.cluster.Partition$$anonfun$2.apply(Partition.scala:145) On Fri, Apr 5, 2013 at 9:35 AM, Withers, Robert robert.with...@dish.com wrote: I deleted all ZK and Kafka data and reran the ZK and one server. I got some log traffic at this point. It looks like the server is elected Leader. The ZK is on port 2181 and the server is on port 9092. [2013-04-05 10:17:35,523] INFO 0 successfully elected as leader (kafka.server.Zo okeeperLeaderElector) [2013-04-05 10:17:35,623] INFO New leader is 0 (kafka.server.ZookeeperLeaderElec tor$LeaderChangeListener) In ZK: [2013-04-05 10:17:35,131] INFO Got user-level KeeperException when processing se ssionid:0x13ddafd0fc4 type:create cxid:0x4 zxid:0xfffe txntype:u nknown reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NoNode for /broke rs (org.apache.zookeeper.server.PrepRequestProcessor) [2013-04-05 10:17:35,238] INFO Got user-level KeeperException when processing se ssionid:0x13ddafd0fc4 type:create cxid:0xa zxid:0xfffe txntype:u nknown reqpath:n/a Error Path:/config Error:KeeperErrorCode = NoNode for /config (org.apache.zookeeper.server.PrepRequestProcessor) [2013-04-05 10:17:35,528] INFO Got user-level KeeperException when processing se ssionid:0x13ddafd0fc4 type:setData cxid:0x15 zxid:0xfffe txntype :unknown reqpath:n/a Error Path:/controller_epoch Error:KeeperErrorCode = NoNode for /controller_epoch
RE: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat
Yes, it happens on every message. I ran the zookeeper, server, console-producer against --broker-list localhost:9092 --topic test, then shut everything down and restarted the zookeeper and server and ran the console-consumer --zookeeper localhost:2181 --topic test and both the server console and consumer console reported WARNs of LeaderNotAvailableException. Since they were WARNs, would that mean that a message got produced and then consumed? Thanks, Rob -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Thursday, April 04, 2013 10:03 PM To: users@kafka.apache.org Subject: Re: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat This can happen on the first message in a new topic. Do you see the exception on posting every message? Also, run kafka-console-consumer to see if you can see the message. Thanks, Jun On Thu, Apr 4, 2013 at 1:37 PM, Withers, Robert robert.with...@dish.comwrote: I am brand new to exploration into kafka, for work. Any help is much appreciated. A co-worker built and updated a package for us to use on windows. I have no idea what version it is. We cannot update through our firewall. :( I am able to start the zookeeper and a kafka server, but when I run the kafka-console-producer.bat file and enter a hello world input to post to the broker, the broker throws the LeaderNotAvailableException. Thanks for any assistance. Rob
RE: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat
(kafka.consumer.ConsumerFet cherThread) [2013-04-05 10:32:08,137] INFO [ConsumerFetcherThread-console-consumer-12815_MER D7-178041-1365179527383-1da2aeba-0-0], Stopped (kafka.consumer.ConsumerFetcherT hread) [2013-04-05 10:32:08,137] INFO [ConsumerFetcherThread-console-consumer-12815_MER D7-178041-1365179527383-1da2aeba-0-0], Shutdown completed (kafka.consumer.Consum erFetcherThread) [2013-04-05 10:32:08,153] INFO [console-consumer-12815_MERD7-178041-136517952738 3-1da2aeba], ZKConsumerConnector shutting down (kafka.consumer.ZookeeperConsumer Connector) [2013-04-05 10:32:08,154] INFO [ConsumerFetcherManager-1365179527478] shutting d own (kafka.consumer.ConsumerFetcherManager) [2013-04-05 10:32:08,154] INFO [console-consumer-12815_MERD7-178041-136517952738 3-1da2aeba-leader-finder-thread], Shutting down (kafka.consumer.ConsumerFetcherM anager$$anon$1) [2013-04-05 10:32:08,154] INFO [console-consumer-12815_MERD7-178041-136517952738 3-1da2aeba-leader-finder-thread], Stopped (kafka.consumer.ConsumerFetcherManage r$$anon$1) [2013-04-05 10:32:08,154] INFO [console-consumer-12815_MERD7-178041-136517952738 3-1da2aeba-leader-finder-thread], Shutdown completed (kafka.consumer.ConsumerFet cherManager$$anon$1) [2013-04-05 10:32:08,158] INFO [ConsumerFetcherManager-1365179527478] shutdown c ompleted (kafka.consumer.ConsumerFetcherManager) Consumed 0 messages [2013-04-05 10:32:08,294] INFO Terminate ZkClient event thread. (org.I0Itec.zkcl ient.ZkEventThread) [2013-04-05 10:32:08,319] INFO Session: 0x13ddafd0fc40001 closed (org.apache.zoo keeper.ZooKeeper) [2013-04-05 10:32:08,320] INFO EventThread shut down (org.apache.zookeeper.Clien tCnxn) [2013-04-05 10:32:08,320] INFO [console-consumer-12815_MERD7-178041-136517952738 3-1da2aeba], ZKConsumerConnector shut down completed (kafka.consumer.ZookeeperCo nsumerConnector) [2013-04-05 10:32:08,321] INFO Initiating client connection, connectString=local host:2181 sessionTimeout=3 watcher=org.I0Itec.zkclient.ZkClient@3639d41 (org .apache.zookeeper.ZooKeeper) [2013-04-05 10:32:08,321] INFO Starting ZkClient event thread. (org.I0Itec.zkcli ent.ZkEventThread) [2013-04-05 10:32:08,324] INFO Opening socket connection to server localhost/127 .0.0.1:2181 (org.apache.zookeeper.ClientCnxn) [2013-04-05 10:32:08,325] INFO Socket connection established to localhost/127.0. 0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn) [2013-04-05 10:32:08,351] INFO Session establishment complete on server localhos t/127.0.0.1:2181, sessionid = 0x13ddafd0fc40002, negotiated timeout = 3 (org .apache.zookeeper.ClientCnxn) [2013-04-05 10:32:08,352] INFO zookeeper state changed (SyncConnected) (org.I0It ec.zkclient.ZkClient) [2013-04-05 10:32:08,543] INFO Terminate ZkClient event thread. (org.I0Itec.zkcl ient.ZkEventThread) [2013-04-05 10:32:08,568] INFO Session: 0x13ddafd0fc40002 closed (org.apache.zoo keeper.ZooKeeper) [2013-04-05 10:32:08,568] INFO EventThread shut down (org.apache.zookeeper.Clien tCnxn) Thanks, Rob -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Friday, April 05, 2013 10:02 AM To: users@kafka.apache.org Subject: Re: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat If this happens on every message, it indicates a problem. Could you wipe out all ZK and Kafka data and try it again? If it still doesn't work, could you try the latest revision in the 0.8 branch? Thanks, Jun On Fri, Apr 5, 2013 at 8:50 AM, Withers, Robert robert.with...@dish.comwrote: Yes, it happens on every message. I ran the zookeeper, server, console-producer against --broker-list localhost:9092 --topic test, then shut everything down and restarted the zookeeper and server and ran the console-consumer --zookeeper localhost:2181 --topic test and both the server console and consumer console reported WARNs of LeaderNotAvailableException. Since they were WARNs, would that mean that a message got produced and then consumed? Thanks, Rob -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Thursday, April 04, 2013 10:03 PM To: users@kafka.apache.org Subject: Re: LeaderNotAvailableException: reventing the running of kafka-console-producer.bat This can happen on the first message in a new topic. Do you see the exception on posting every message? Also, run kafka-console-consumer to see if you can see the message. Thanks, Jun On Thu, Apr 4, 2013 at 1:37 PM, Withers, Robert robert.with...@dish.com wrote: I am brand new to exploration into kafka, for work. Any help is much appreciated. A co-worker built and updated a package for us to use on windows. I have no idea what version it is. We cannot update through our firewall. :( I am able to start the zookeeper and a kafka server, but when I run the kafka-console-producer.bat file and enter a hello world input to post to the broker, the broker throws the LeaderNotAvailableException. Thanks for any assistance. Rob
NoClassDefFoundError exception when trying to instantiate a ProducerConfig
Hi, I have an Eclipse java project, with the 2.9.1 scala jar (scala-library-2.9.1.RC4.jar) in the build path, as well as the kafka_2.8.0-0.8-SNAPSHOT.jar. I wrote the simple Producer example and try to run it and it fails with a NoClassDefFoundError runtime exception. Here is my code and the error. Thanks for any help! package com.sample; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestKafkaProducer { public static void main(String[] args) { Properties props = new Properties(); props.put(zk.connect, 127.0.0.1:2181); props.put(serializer.class, kafka.serializer.StringEncoder); props.put(partitioner.class, example.producer.SimplePartitioner); props.put(request.required.acks, 1); ProducerConfig config = new ProducerConfig(props); ProducerString, String producer = new ProducerString, String(config); KeyedMessageString, String data = new KeyedMessageString, String(page_visits, kafka, hello world); producer.send(data); } } And the error: Exception in thread main java.lang.NoClassDefFoundError: scala/Tuple2$mcLL$sp at kafka.producer.ProducerConfig.init(ProducerConfig.scala:56) at com.sample.TestKafkaProducer.main(TestKafkaProducer.java:16) Caused by: java.lang.ClassNotFoundException: scala.Tuple2$mcLL$sp at java.net.URLClassLoader$1.run(Unknown Source) at java.net.URLClassLoader$1.run(Unknown Source) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source) at java.lang.ClassLoader.loadClass(Unknown Source) ... 2 more Thanks, Rob
trouble loading kafka into eclipse
I am struggling to load kafka into eclipse to get started. I have tried to follow the instructions here: https://cwiki.apache.org/KAFKA/developer-setup.html, but I cannot connect to the SVN repo to check-out. A co-worked pulled from github, but I seem to be missing a lot of jars. This post mentions over a hundred jars that I should add to the build path: http://grokbase.com/t/kafka/dev/133jqejwvb/kafka-setup-in-eclipse. Furthermore, I can only get scala 2.10 working in Juno, as the 2.9 version does not seem to install correctly (I cannot find a scala project option with 2.9). Can anyone provide workable instructions for getting this puppy up and running? Thanks, rob
LeaderNotAvailableException: reventing the running of kafka-console-producer.bat
I am brand new to exploration into kafka, for work. Any help is much appreciated. A co-worker built and updated a package for us to use on windows. I have no idea what version it is. We cannot update through our firewall. :( I am able to start the zookeeper and a kafka server, but when I run the kafka-console-producer.bat file and enter a hello world input to post to the broker, the broker throws the LeaderNotAvailableException. Thanks for any assistance. Rob