Re: Kafka High Level Consumer Fetch All Messages From Topic Using Java API (Equivalent to --from-beginning)

2014-02-14 Thread pushkar priyadarshi
I don't think there is any direct high level API equivalent to this.every
time you read messages using high level api your offset gets synced in zoo
keeper .auto offset is for cases where last read offset for example have
been purged n rather than getting exception you want to just fall back to
either most current or oldest message offset.
But other's more experienced opinion on this will be great.
Regards,
Pushkar
On Feb 14, 2014 4:40 PM, jpa...@yahoo.com wrote:

 Good Morning,

 I am testing the Kafka High Level Consumer using the ConsumerGroupExample
 code from the Kafka site. I would like to retrieve all the existing
 messages on the topic called test that I have in the Kafka server config.
 Looking at other blogs, auto.offset.reset should be set to smallest to be
 able to get all messages:
 private static ConsumerConfig createConsumerConfig(String a_zookeeper,
 String a_groupId){ Properties props = new Properties();
 props.put(zookeeper.connect, a_zookeeper); props.put(group.id,
 a_groupId); props.put(auto.offset.reset, smallest); props.put(
 zookeeper.session.timeout.ms, 1);  return new
 ConsumerConfig(props);
 }
 The question I really have is this: what is the equivalent Java api call
 for the High Level Consumer that is the equivalent of:
 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
 --from-beginning
 Thx for your help!!


Linkedin Camus vs kafka-hadoop-loader vs hadoop-consumer

2014-02-14 Thread Marcelo Valle
Hello,

I've been studying different options to consume messages from kafka to
hadoop(hdfs) and found three odds.

Linkedin Camus - https://github.com/linkedin/camus
kafka-hadoop-loader - https://github.com/michal-harish/kafka-hadoop-loader
hadoop-consumer -
https://github.com/apache/kafka/tree/0.8/contrib/hadoop-consumer

I suppose Camus is the most robust tool, and from performance point of view
is the best too. But is more complex to use and develop than other options.
But not support raw text messages... and only Avro serializad messages can
be used.

kafka-hadoop-loader have no support since one year ago, and doesn't work
with hadoop 2 so is descarded.

hadoop-consumer is native in kafka trunk, is simple and easy to use,
support Avro an raw test, but I have doubts about performance and fault
tolerance.

I'm right in my conclusions?
Do you know about any alternive?
Can you help me to choose the best?

Thanks!


Re: kafka consumer not consuming messages

2014-02-14 Thread Jun Rao
I don't see the log in your email. Perhaps you can send out a link to
things like pastebin?

Thanks,

Jun


On Thu, Feb 13, 2014 at 8:06 AM, Arjun Kota ar...@socialtwist.com wrote:

 Yes i have made it to trace as it will help me debug the things.
 Have u found any issue in the it.
 On Feb 13, 2014 9:12 PM, Jun Rao jun...@gmail.com wrote:

  The request log is in trace. Take a look at the log4j property file in
  config/.
 
  Thanks,
 
  Jun
 
 
  On Wed, Feb 12, 2014 at 9:45 PM, Arjun ar...@socialtwist.com wrote:
 
   I am sorry but could not locate the offset in the request log. I have
   turned on the debug for the logs but couldnt . Do you know any pattern
  with
   which i can look in.
  
   Thanks
   Arjun Narasimha Kota
  
  
   On Wednesday 12 February 2014 09:26 PM, Jun Rao wrote:
  
   Interesting. So you have 4 messages in the broker. The checkpointed
  offset
   for the consumer is at the 3rd message. Did you change the default
  setting
   of auto.commit.enable? Also, if you look at the
   request log, what's the offset in the fetch request from this
 consumer?
   Thanks,
   Jun
  
  
   On Tue, Feb 11, 2014 at 10:07 PM, Arjun ar...@socialtwist.com
 wrote:
  
The topic name is correct, the o/p of the ConsumerOffserChecker is
  
   arjunn@arjunn-lt:~/Downloads/Kafka0.8/new/kafka_2.8.0-0.8.0$
   bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
 group1
   --zkconnect 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 --topic
   taf.referral.emails.service
   Group   Topic  Pid Offset logSize
   Lag Owner
   group1  taf.referral.emails.service0   2 4
 2
   group1_arjunn-lt-1392133080519-e24b249b-0
   group1  taf.referral.emails.service1   2 4
 2
   group1_arjunn-lt-1392133080519-e24b249b-0
  
   thanks
   Arjun Narasimha Kota
  
  
  
  
   On Wednesday 12 February 2014 10:21 AM, Jun Rao wrote:
  
Could you double check that you used the correct topic name? If so,
   could
   you run ConsumerOffsetChecker as described in
   https://cwiki.apache.org/confluence/display/KAFKA/FAQ and see if
  there
   is
   any lag?
  
   Thanks,
  
   Jun
  
  
   On Tue, Feb 11, 2014 at 8:45 AM, Arjun Kota ar...@socialtwist.com
   wrote:
  
 fetch.wait.max.ms=1
  
   fetch.min.bytes=128
  
   My message size is much more than that.
   On Feb 11, 2014 9:21 PM, Jun Rao jun...@gmail.com wrote:
  
 What's the fetch.wait.max.ms and fetch.min.bytes you used?
  
   Thanks,
  
   Jun
  
  
   On Tue, Feb 11, 2014 at 12:54 AM, Arjun ar...@socialtwist.com
   wrote:
  
 With the same group id from the console consumer its working
 fine.
  
  
   On Tuesday 11 February 2014 01:59 PM, Guozhang Wang wrote:
  
 Arjun,
  
   Are you using the same group name for the console consumer and
 the
  
java
  
   consumer?
  
   Guozhang
  
  
   On Mon, Feb 10, 2014 at 11:38 PM, Arjun ar...@socialtwist.com
  
wrote:
  
  Hi Jun,
  
   No its not that problem. I am not getting what the problem is can
  you
   please help.
  
   thanks
   Arjun Narasimha Kota
  
  
   On Monday 10 February 2014 09:10 PM, Jun Rao wrote:
  
  Does
  
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-
   Whydoesmyconsumernevergetanydata?
   apply?
  
   Thanks,
  
   Jun
  
  
   On Sun, Feb 9, 2014 at 10:27 PM, Arjun ar...@socialtwist.com
 
  
wrote:
  
   Hi,
  
 I started using kafka some time back. I was experimenting with
  0.8.
   My
  
   problem is the kafka is unable to consume the messages. My
  
   configuration
   is kafka broker on the local host and zookeeper on the local
   host.
  
I
  
   have only one broker and one consumer at present.
  
   What have I done:
1) I used the java examples in the kafka src and
  pushed
   some
  
600
  
   messages to the broker
  
2) I used the console consumer to check weather the
   messages
  
are
  
   there in the broker or not. Console consumer printed all 600
  
   messages
  
3) Now i used the java Consumer code, and tried to get
   those
  
   messages. This is not printing any messages. It just got stuck
  
   When was it working earlier:
-When i tried with three brokers and three consumers
  in
   the
  
same
  
   machine, with the same configuration it worked fine.
  
-I changed the properties accordingly when i tried to
  make
  
it
  
   work
  
   with one broker and one consumer
  
   What does log say:
- attaching the logs even
  
   If some one points me where I am doing wrong it would be
  helpful.
  
   Thanks
   Arjun Narasimha Kota
  
  
  
  
  
  
  
 



Re: Surprisingly high network traffic between kafka servers

2014-02-14 Thread Jun Rao
Hi, Zhong,

Thanks for sharing this. We probably should add a sanity check in the
broker to make sure that replica.fetch.max.bytes = message.max.bytes.
Could you file a jira for that?

Jun


On Thu, Feb 13, 2014 at 8:01 PM, zhong dong zea...@qq.com wrote:

 We encountered with this problem, too.

 And our problem is that we set the message.max.bytes larger than
 replica.fetch.max.bytes.

 After we changed the replica.fetch.max.bytes to a larger number, the
 problem solved.




Re: New Consumer API discussion

2014-02-14 Thread Jay Kreps
Oh, interesting. So I am assuming the following implementation:
1. We have an in-memory fetch position which controls the next fetch
offset.
2. Changing this has no effect until you poll again at which point your
fetch request will be from the newly specified offset
3. We then have an in-memory but also remotely stored committed offset.
4. Calling commit has the effect of saving the fetch position as both the
in memory committed position and in the remote store
5. Auto-commit is the same as periodically calling commit on all positions.

So batching on commit as well as getting the committed position makes
sense, but batching the fetch position wouldn't, right? I think you are
actually thinking of a different approach.

-Jay


On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 I think you are saying both, i.e. if you
 have committed on a partition it returns you that value but if you haven't
 it does a remote lookup?

 Correct.

 The other argument for making committed batched is that commit() is
 batched, so there is symmetry.

 position() and seek() are always in memory changes (I assume) so there is
 no need to batch them.

 I'm not as sure as you are about that assumption being true. Basically in
 my example above, the batching argument for committed() also applies to
 position() since one purpose of fetching a partition's offset is to use it
 to set the position of the consumer to that offset. Since that might lead
 to a remote OffsetRequest call, I think we probably would be better off
 batching it.

 Another option for naming would be position/reposition instead
 of position/seek.

 I think position/seek is better since it aligns with Java file APIs.

 I also think your suggestion about ConsumerPosition makes sense.

 Thanks,
 Neha
 On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Neha,
 
  I actually wasn't proposing the name TopicOffsetPosition, that was just a
  typo. I meant TopicPartitionOffset, and I was just referencing what was
 in
  the javadoc. So to restate my proposal without the typo, using just the
  existing classes (that naming is a separate question):
 long position(TopicPartition tp)
 void seek(TopicPartitionOffset p)
 long committed(TopicPartition tp)
 void commit(TopicPartitionOffset...);
 
  So I may be unclear on committed() (AKA lastCommittedOffset). Is it
  returning the in-memory value from the last commit by this consumer, or
 is
  it doing a remote fetch, or both? I think you are saying both, i.e. if
 you
  have committed on a partition it returns you that value but if you
 haven't
  it does a remote lookup?
 
  The other argument for making committed batched is that commit() is
  batched, so there is symmetry.
 
  position() and seek() are always in memory changes (I assume) so there is
  no need to batch them.
 
  So taking all that into account what if we revise it to
 long position(TopicPartition tp)
 void seek(TopicPartitionOffset p)
 MapTopicPartition, Long committed(TopicPartition tp);
 void commit(TopicPartitionOffset...);
 
  This is not symmetric between position/seek and commit/committed but it
 is
  convenient. Another option for naming would be position/reposition
 instead
  of position/seek.
 
  With respect to the name TopicPartitionOffset, what I was trying to say
 is
  that I recommend we change that to something shorter. I think
 TopicPosition
  or ConsumerPosition might be better. Position does not refer to the
  variables in the object, it refers to the meaning of the object--it
  represents a position within a topic. The offset field in that object is
  still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset
  would all be workable too. Basically I am just objecting to concatenating
  three nouns together. :-)
 
  -Jay
 
 
 
 
 
  On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   2. It returns a list of results. But how can you use the list? The only
  way
   to use the list is to make a map of tp=offset and then look up results
  in
   this map (or do a for loop over the list for the partition you want). I
   recommend that if this is an in-memory check we just do one at a time.
  E.g.
   long committedPosition(
   TopicPosition).
  
   This was discussed in the previous emails. There is a choice between
   returning a map or a list. Some people found the map to be more usable.
  
   What if we made it:
  long position(TopicPartition tp)
  void seek(TopicOffsetPosition p)
  long committed(TopicPartition tp)
  void commit(TopicOffsetPosition...);
  
   This is fine, but TopicOffsetPosition doesn't make sense. Offset and
   Position is confusing. Also both fetch and commit positions are related
  to
   partitions, not topics. Some more options are TopicPartitionPosition or
   TopicPartitionOffset. And we should use either position everywhere in
  Kafka
   or offset but having both is confusing.
  
  

Re: Surprisingly high network traffic between kafka servers

2014-02-14 Thread Jay Kreps
Yeah that is a bug. We should be giving an error here rather than retrying.

-Jay


On Fri, Feb 14, 2014 at 7:54 AM, Jun Rao jun...@gmail.com wrote:

 Hi, Zhong,

 Thanks for sharing this. We probably should add a sanity check in the
 broker to make sure that replica.fetch.max.bytes = message.max.bytes.
 Could you file a jira for that?

 Jun


 On Thu, Feb 13, 2014 at 8:01 PM, zhong dong zea...@qq.com wrote:

  We encountered with this problem, too.
 
  And our problem is that we set the message.max.bytes larger than
  replica.fetch.max.bytes.
 
  After we changed the replica.fetch.max.bytes to a larger number, the
  problem solved.
 
 



broker offline

2014-02-14 Thread Yu, Libo
Hi team,

We have three brokers on our production cluster. I noticed two of them somehow
got offline and then re-registered with zookeeper and got back online. It seems 
the
issue was caused by some zookeeper issue. So I want to know what may be the 
possible
cases of the issue. If I want to reproduce the issue, is there any way to do 
it? Thanks.

Regards,

Libo



Re: Surprisingly high network traffic between kafka servers

2014-02-14 Thread Carl Lerche
Hey, thanks so much for pointing this out. I think that this is likely
what is happening for us. I will attempt this fix.

Cheers,
Carl

On Thu, Feb 13, 2014 at 8:01 PM, zhong dong zea...@qq.com wrote:
 We encountered with this problem, too.

 And our problem is that we set the message.max.bytes larger than 
 replica.fetch.max.bytes.

 After we changed the replica.fetch.max.bytes to a larger number, the problem 
 solved.



ZK 3.4.5 compatibility

2014-02-14 Thread Clark Breyman
Is anyone running 0.8 (or pre-0.8.1) with the latest Zookeeper? Any known
compatibility issues? I didn't see any in JIRA but thought I'd give a
shout.


Re: broker offline

2014-02-14 Thread Guozhang Wang
Hello Libo,

When ZK resumes from a soft failure, like a GC, it will mark the ephemeral
nodes as session timed out, and the brokers will try to re-register upon
receiving the session timeout. You can re-produce this issue by signal
pause the ZK process.

Guozhang


On Fri, Feb 14, 2014 at 12:15 PM, Yu, Libo libo...@citi.com wrote:

 Hi team,

 We have three brokers on our production cluster. I noticed two of them
 somehow
 got offline and then re-registered with zookeeper and got back online. It
 seems the
 issue was caused by some zookeeper issue. So I want to know what may be
 the possible
 cases of the issue. If I want to reproduce the issue, is there any way to
 do it? Thanks.

 Regards,

 Libo




-- 
-- Guozhang


Re: some brokers cannot register themselves with zookeeper

2014-02-14 Thread Clark Breyman
Guozhang,

All the patches for KAFKA-992 look like they were committed in August,
which was before 0.8 was shipped. Should we really be seeing this on 0.8?

Thanks, Clark


Re: ZK 3.4.5 compatibility

2014-02-14 Thread Jun Rao
It should be api compatible. Not sure how stable ZK 3.4.5 is though.

Thanks,

Jun


On Fri, Feb 14, 2014 at 4:32 PM, Clark Breyman cl...@breyman.com wrote:

 Is anyone running 0.8 (or pre-0.8.1) with the latest Zookeeper? Any known
 compatibility issues? I didn't see any in JIRA but thought I'd give a
 shout.



trunk unit test failure

2014-02-14 Thread Bae, Jae Hyeon
   - LogOffsetTest kafka.server.LogOffsetTest.html.
   
testEmptyLogsGetOffsetskafka.server.LogOffsetTest.html#testEmptyLogsGetOffsets
   - LogOffsetTest kafka.server.LogOffsetTest.html.
   
testGetOffsetsBeforeEarliestTimekafka.server.LogOffsetTest.html#testGetOffsetsBeforeEarliestTime
   - LogOffsetTest kafka.server.LogOffsetTest.html.
   
testGetOffsetsBeforeLatestTimekafka.server.LogOffsetTest.html#testGetOffsetsBeforeLatestTime
   - LogOffsetTest kafka.server.LogOffsetTest.html.
   
testGetOffsetsBeforeNowkafka.server.LogOffsetTest.html#testGetOffsetsBeforeNow
   - ProducerSendTest kafka.test.ProducerSendTest.html.
   testSendToPartitionkafka.test.ProducerSendTest.html#testSendToPartition

failed.

Can I trust trunk? :)