Re: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)
I don't think there is any direct high level API equivalent to this.every time you read messages using high level api your offset gets synced in zoo keeper .auto offset is for cases where last read offset for example have been purged n rather than getting exception you want to just fall back to either most current or oldest message offset. But other's more experienced opinion on this will be great. Regards, Pushkar On Feb 14, 2014 4:40 PM, jpa...@yahoo.com wrote: Good Morning, I am testing the Kafka High Level Consumer using the ConsumerGroupExample code from the Kafka site. I would like to retrieve all the existing messages on the topic called test that I have in the Kafka server config. Looking at other blogs, auto.offset.reset should be set to smallest to be able to get all messages: private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId){ Properties props = new Properties(); props.put(zookeeper.connect, a_zookeeper); props.put(group.id, a_groupId); props.put(auto.offset.reset, smallest); props.put( zookeeper.session.timeout.ms, 1); return new ConsumerConfig(props); } The question I really have is this: what is the equivalent Java api call for the High Level Consumer that is the equivalent of: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Thx for your help!!
Linkedin Camus vs kafka-hadoop-loader vs hadoop-consumer
Hello, I've been studying different options to consume messages from kafka to hadoop(hdfs) and found three odds. Linkedin Camus - https://github.com/linkedin/camus kafka-hadoop-loader - https://github.com/michal-harish/kafka-hadoop-loader hadoop-consumer - https://github.com/apache/kafka/tree/0.8/contrib/hadoop-consumer I suppose Camus is the most robust tool, and from performance point of view is the best too. But is more complex to use and develop than other options. But not support raw text messages... and only Avro serializad messages can be used. kafka-hadoop-loader have no support since one year ago, and doesn't work with hadoop 2 so is descarded. hadoop-consumer is native in kafka trunk, is simple and easy to use, support Avro an raw test, but I have doubts about performance and fault tolerance. I'm right in my conclusions? Do you know about any alternive? Can you help me to choose the best? Thanks!
Re: kafka consumer not consuming messages
I don't see the log in your email. Perhaps you can send out a link to things like pastebin? Thanks, Jun On Thu, Feb 13, 2014 at 8:06 AM, Arjun Kota ar...@socialtwist.com wrote: Yes i have made it to trace as it will help me debug the things. Have u found any issue in the it. On Feb 13, 2014 9:12 PM, Jun Rao jun...@gmail.com wrote: The request log is in trace. Take a look at the log4j property file in config/. Thanks, Jun On Wed, Feb 12, 2014 at 9:45 PM, Arjun ar...@socialtwist.com wrote: I am sorry but could not locate the offset in the request log. I have turned on the debug for the logs but couldnt . Do you know any pattern with which i can look in. Thanks Arjun Narasimha Kota On Wednesday 12 February 2014 09:26 PM, Jun Rao wrote: Interesting. So you have 4 messages in the broker. The checkpointed offset for the consumer is at the 3rd message. Did you change the default setting of auto.commit.enable? Also, if you look at the request log, what's the offset in the fetch request from this consumer? Thanks, Jun On Tue, Feb 11, 2014 at 10:07 PM, Arjun ar...@socialtwist.com wrote: The topic name is correct, the o/p of the ConsumerOffserChecker is arjunn@arjunn-lt:~/Downloads/Kafka0.8/new/kafka_2.8.0-0.8.0$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group1 --zkconnect 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic taf.referral.emails.service Group Topic Pid Offset logSize Lag Owner group1 taf.referral.emails.service0 2 4 2 group1_arjunn-lt-1392133080519-e24b249b-0 group1 taf.referral.emails.service1 2 4 2 group1_arjunn-lt-1392133080519-e24b249b-0 thanks Arjun Narasimha Kota On Wednesday 12 February 2014 10:21 AM, Jun Rao wrote: Could you double check that you used the correct topic name? If so, could you run ConsumerOffsetChecker as described in https://cwiki.apache.org/confluence/display/KAFKA/FAQ and see if there is any lag? Thanks, Jun On Tue, Feb 11, 2014 at 8:45 AM, Arjun Kota ar...@socialtwist.com wrote: fetch.wait.max.ms=1 fetch.min.bytes=128 My message size is much more than that. On Feb 11, 2014 9:21 PM, Jun Rao jun...@gmail.com wrote: What's the fetch.wait.max.ms and fetch.min.bytes you used? Thanks, Jun On Tue, Feb 11, 2014 at 12:54 AM, Arjun ar...@socialtwist.com wrote: With the same group id from the console consumer its working fine. On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote: Arjun, Are you using the same group name for the console consumer and the java consumer? Guozhang On Mon, Feb 10, 2014 at 11:38 PM, Arjun ar...@socialtwist.com wrote: Hi Jun, No its not that problem. I am not getting what the problem is can you please help. thanks Arjun Narasimha Kota On Monday 10 February 2014 09:10 PM, Jun Rao wrote: Does https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ- Whydoesmyconsumernevergetanydata? apply? Thanks, Jun On Sun, Feb 9, 2014 at 10:27 PM, Arjun ar...@socialtwist.com wrote: Hi, I started using kafka some time back. I was experimenting with 0.8. My problem is the kafka is unable to consume the messages. My configuration is kafka broker on the local host and zookeeper on the local host. I have only one broker and one consumer at present. What have I done: 1) I used the java examples in the kafka src and pushed some 600 messages to the broker 2) I used the console consumer to check weather the messages are there in the broker or not. Console consumer printed all 600 messages 3) Now i used the java Consumer code, and tried to get those messages. This is not printing any messages. It just got stuck When was it working earlier: -When i tried with three brokers and three consumers in the same machine, with the same configuration it worked fine. -I changed the properties accordingly when i tried to make it work with one broker and one consumer What does log say: - attaching the logs even If some one points me where I am doing wrong it would be helpful. Thanks Arjun Narasimha Kota
Re: Surprisingly high network traffic between kafka servers
Hi, Zhong, Thanks for sharing this. We probably should add a sanity check in the broker to make sure that replica.fetch.max.bytes = message.max.bytes. Could you file a jira for that? Jun On Thu, Feb 13, 2014 at 8:01 PM, zhong dong zea...@qq.com wrote: We encountered with this problem, too. And our problem is that we set the message.max.bytes larger than replica.fetch.max.bytes. After we changed the replica.fetch.max.bytes to a larger number, the problem solved.
Re: New Consumer API discussion
Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.comwrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. So taking all that into account what if we revise it to long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); This is not symmetric between position/seek and commit/committed but it is convenient. Another option for naming would be position/reposition instead of position/seek. With respect to the name TopicPartitionOffset, what I was trying to say is that I recommend we change that to something shorter. I think TopicPosition or ConsumerPosition might be better. Position does not refer to the variables in the object, it refers to the meaning of the object--it represents a position within a topic. The offset field in that object is still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset would all be workable too. Basically I am just objecting to concatenating three nouns together. :-) -Jay On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). I recommend that if this is an in-memory check we just do one at a time. E.g. long committedPosition( TopicPosition). This was discussed in the previous emails. There is a choice between returning a map or a list. Some people found the map to be more usable. What if we made it: long position(TopicPartition tp) void seek(TopicOffsetPosition p) long committed(TopicPartition tp) void commit(TopicOffsetPosition...); This is fine, but TopicOffsetPosition doesn't make sense. Offset and Position is confusing. Also both fetch and commit positions are related to partitions, not topics. Some more options are TopicPartitionPosition or TopicPartitionOffset. And we should use either position everywhere in Kafka or offset but having both is confusing.
Re: Surprisingly high network traffic between kafka servers
Yeah that is a bug. We should be giving an error here rather than retrying. -Jay On Fri, Feb 14, 2014 at 7:54 AM, Jun Rao jun...@gmail.com wrote: Hi, Zhong, Thanks for sharing this. We probably should add a sanity check in the broker to make sure that replica.fetch.max.bytes = message.max.bytes. Could you file a jira for that? Jun On Thu, Feb 13, 2014 at 8:01 PM, zhong dong zea...@qq.com wrote: We encountered with this problem, too. And our problem is that we set the message.max.bytes larger than replica.fetch.max.bytes. After we changed the replica.fetch.max.bytes to a larger number, the problem solved.
broker offline
Hi team, We have three brokers on our production cluster. I noticed two of them somehow got offline and then re-registered with zookeeper and got back online. It seems the issue was caused by some zookeeper issue. So I want to know what may be the possible cases of the issue. If I want to reproduce the issue, is there any way to do it? Thanks. Regards, Libo
Re: Surprisingly high network traffic between kafka servers
Hey, thanks so much for pointing this out. I think that this is likely what is happening for us. I will attempt this fix. Cheers, Carl On Thu, Feb 13, 2014 at 8:01 PM, zhong dong zea...@qq.com wrote: We encountered with this problem, too. And our problem is that we set the message.max.bytes larger than replica.fetch.max.bytes. After we changed the replica.fetch.max.bytes to a larger number, the problem solved.
ZK 3.4.5 compatibility
Is anyone running 0.8 (or pre-0.8.1) with the latest Zookeeper? Any known compatibility issues? I didn't see any in JIRA but thought I'd give a shout.
Re: broker offline
Hello Libo, When ZK resumes from a soft failure, like a GC, it will mark the ephemeral nodes as session timed out, and the brokers will try to re-register upon receiving the session timeout. You can re-produce this issue by signal pause the ZK process. Guozhang On Fri, Feb 14, 2014 at 12:15 PM, Yu, Libo libo...@citi.com wrote: Hi team, We have three brokers on our production cluster. I noticed two of them somehow got offline and then re-registered with zookeeper and got back online. It seems the issue was caused by some zookeeper issue. So I want to know what may be the possible cases of the issue. If I want to reproduce the issue, is there any way to do it? Thanks. Regards, Libo -- -- Guozhang
Re: some brokers cannot register themselves with zookeeper
Guozhang, All the patches for KAFKA-992 look like they were committed in August, which was before 0.8 was shipped. Should we really be seeing this on 0.8? Thanks, Clark
Re: ZK 3.4.5 compatibility
It should be api compatible. Not sure how stable ZK 3.4.5 is though. Thanks, Jun On Fri, Feb 14, 2014 at 4:32 PM, Clark Breyman cl...@breyman.com wrote: Is anyone running 0.8 (or pre-0.8.1) with the latest Zookeeper? Any known compatibility issues? I didn't see any in JIRA but thought I'd give a shout.
trunk unit test failure
- LogOffsetTest kafka.server.LogOffsetTest.html. testEmptyLogsGetOffsetskafka.server.LogOffsetTest.html#testEmptyLogsGetOffsets - LogOffsetTest kafka.server.LogOffsetTest.html. testGetOffsetsBeforeEarliestTimekafka.server.LogOffsetTest.html#testGetOffsetsBeforeEarliestTime - LogOffsetTest kafka.server.LogOffsetTest.html. testGetOffsetsBeforeLatestTimekafka.server.LogOffsetTest.html#testGetOffsetsBeforeLatestTime - LogOffsetTest kafka.server.LogOffsetTest.html. testGetOffsetsBeforeNowkafka.server.LogOffsetTest.html#testGetOffsetsBeforeNow - ProducerSendTest kafka.test.ProducerSendTest.html. testSendToPartitionkafka.test.ProducerSendTest.html#testSendToPartition failed. Can I trust trunk? :)