RE: new consumer api?

2015-08-04 Thread Simon Cooper
Reading on the consumer docs, there's no mention of a relatively simple 
consumer that doesn't need groups, coordinators, commits, anything like that - 
just read and poll from specified offsets of specific topic partitions - but 
automatically deals with leadership changes and connection losses (so one level 
up from SimpleConsumer).

Will the new API be able to be used in this relatively simple way?
SimonC

-Original Message-
From: Jun Rao [mailto:j...@confluent.io] 
Sent: 03 August 2015 18:19
To: users@kafka.apache.org
Subject: Re: new consumer api?

Jalpesh,

We are still iterating on the new consumer a bit and are waiting for some of 
the security jiras to be committed. So now, we are shooting for releasing 0.8.3 
in Oct (just updated 
https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).

Thanks,

Jun

On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia  
jalpesh.pata...@clickbank.com wrote:

 Hello guys,

 A while ago i read that the new consumer api was going to be released 
 sometime in July as part of the 0.8.3/0.9 release.
 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan


 Do we have an update when we think that can happen?


 Thanks,

 Jalpesh


 -- PRIVILEGED AND CONFIDENTIAL This transmission may contain 
 privileged, proprietary or confidential information. If you are not 
 the intended recipient, you are instructed not to review this 
 transmission. If you are not the intended recipient, please notify the 
 sender that you received this message and delete this transmission from your 
 system.



Re: New Consumer API and Range Consumption with Fail-over

2015-08-04 Thread Bhavesh Mistry
Hi Jason and Kafka Dev Team,



First of all thanks for responding and I think you got expected behavior
correctly.



The use-case is offset range consumption.  We store each minute highest
offset for each topic per partition.  So if we need to reload or re-consume
data from yesterday per say 8AM to noon, we would have offset start mapping
at 8AM and end offset mapping at noon in Time Series Database.



I was trying to load this use case with New Consumer API.   Do you or Kafka
Dev team agree with request to either have API that takes in topic and its
start/end offset for High Level Consumer group  (With older consumer API we
used Simple consumer before without fail-over).  Also, for each
range-consumption, there will be different group id  and group id will not
be reused.  The main purpose is to reload or process past data again (due
to production bugs or downtime etc occasionally and let main consumer-group
continue to consume latest records).


void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
endOffsetPartitions)



or something similar which will allow following:



1)   When consumer group already exists (meaning have consumed data and
committed offset to storage system either Kafka or ZK) ignore start offset
positions and use committed offset.  If not committed use start Offset
Partition.

2)   When partition consumption has reached end Offset for given partition,
pause is fine or this assigned thread become fail over or wait for
reassignment.

3)   When all are Consumer Group is done consuming all partitions offset
ranges (start to end), gracefully shutdown entire consumer group.

4)   While consuming records, if one of node or consuming thread goes down
automatic fail-over to others (Similar to High Level Consumer for OLD
Consumer API.   I am not sure if there exists High level and/or Simple
Consumer concept for New API  )



I hope above explanation clarifies use-case and intended behavior.  Thanks
for clarifications, and you are correct we need pause(TopicPartition tp),
resume(TopicPartition tp), and/or API to set to end offset for each
partition.



Please do let us know your preference to support above simple use-case.


Thanks,


Bhavesh

On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson ja...@confluent.io wrote:

 Hi Bhavesh,

 I'm not totally sure I understand the expected behavior, but I think this
 can work. Instead of seeking to the start of the range before the poll
 loop, you should probably provide a ConsumerRebalanceCallback to get
 notifications when group assignment has changed (e.g. when one of your
 nodes dies). When a new partition is assigned, the callback will be invoked
 by the consumer and you can use it to check if there's a committed position
 in the range or if you need to seek to the beginning of the range. For
 example:

 void onPartitionsAssigned(consumer, partitions) {
   for (partition : partitions) {
  try {
offset = consumer.committed(partition)
consumer.seek(partition, offset)
  } catch (NoOffsetForPartition) {
consumer.seek(partition, rangeStart)
  }
   }
 }

 If a failure occurs, then the partitions will be rebalanced across
 whichever consumers are still active. The case of the entire cluster being
 rebooted is not really different. When the consumers come back, they check
 the committed position and resume where they left off. Does that make
 sense?

 After you are finished consuming a partition's range, you can use
 KafkaConsumer.pause(partition) to prevent further fetches from being
 initiated while still maintaining the current assignment. The patch to add
 pause() is not in trunk yet, but it probably will be before too long.

 One potential problem is that you wouldn't be able to reuse the same group
 to consume a different range because of the way it depends on the committed
 offsets. Kafka's commit API actually allows some additional metadata to go
 along with a committed offset and that could potentially be used to tie the
 commit to the range, but it's not yet exposed in KafkaConsumer. I assume it
 will be eventually, but I'm not sure whether that will be part of the
 initial release.


 Hope that helps!

 Jason

 On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry 
 mistry.p.bhav...@gmail.com
 wrote:

  Hello Kafka Dev Team,
 
 
  With new Consumer API redesign  (
 
 
 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
  ),  is there a capability to consume given the topic and partition
 start/
  end position.  How would I achieve following use case of range
 consumption
  with fail-over.
 
 
  Use Case:
  Ability to reload data given topic and its partition offset start/end
 with
  High Level Consumer with fail over.   Basically, High Level Range
  consumption and consumer group dies while main consumer group.
 
 
  Suppose you have a topic called “test-topic” and its partition begin and
  end offset.
 
  {
 
  topic:  

Re: 0.8.3 ETA?

2015-08-04 Thread Stevo Slavić
Thanks Jun for heads up!

On Mon, Aug 3, 2015 at 7:17 PM, Jun Rao j...@confluent.io wrote:

 Hi, Stevo,

 Yes, we are still iterating on the new consumer a bit and are waiting for
 some of the security jiras to be committed. So now, we are shooting for
 releasing 0.8.3 in Oct (just updated
 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).

 As we are getting closer, we will clean up the 0.8.3 jiras and push
 non-critical ones to future releases.

 Thanks,

 Jun

 On Mon, Aug 3, 2015 at 5:52 AM, Stevo Slavić ssla...@gmail.com wrote:

  Hello Apache Kafka community,
 
  If I recall well, two weeks ago it was mentioned in a discussion that
 Kafka
  0.8.3 might be released in a month time.
 
  Is this still Kafka dev team goal, in few weeks time to have Kafka 0.8.3
  released? Or is more (re)work (e.g. more new consumer API changes)
 planned
  for 0.8.3 than already in JIRA, which would further delay 0.8.3 release?
 
  Btw, Kafka JIRA has quite a lot unresolved tickets targeting 0.8.3 as fix
  version (see here
  
 
 https://issues.apache.org/jira/browse/KAFKA-1853?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.3%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
  
  complete list).
 
  Kind regards,
  Stevo Slavic.
 



How to read in batch using HighLevel Consumer?

2015-08-04 Thread shahab
Hi,

While we the producer can put data as batch in kafka server,  I couldn't
find any API (or any document) saying how we can fetch data as batch from
Kafka ?
Even when data is placed as batch in kafka server, still using High Level
consumer I can only read one by one, and I can not specify. for example,
read 100 items at once!

Is this correct observation? or I am missing something?

best,
/Shahab


InvalidMessageException: Message is corrupt, Consumer stuck

2015-08-04 Thread Henry Cai
Hi,

We are using the Kafka high-level consumer 8.1.1, somehow we got a
corrupted message in the topic.  We are not sure the root cause of this,
but the problem we are having now is the HL consumer is stuck in that
position:

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
537685622, computed crc = 36513351)

at kafka.message.Message.ensureValid(Message.scala:166)

at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)

at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)

at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)


If we try to ignore that exception and iterate to the next message, the
iterator couldn't pass that error state:

java.lang.IllegalStateException: Iterator is in failed state

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)


By looking at the code, looks like you can only calling
IteratorTemplate.resetState() to clear the state, but this is an internal
method, is this the right way to workaround this problem?


Re: InvalidMessageException: Message is corrupt, Consumer stuck

2015-08-04 Thread Gwen Shapira
The high level consumer stores its state in ZooKeeper. Theoretically, you
should be able to go into ZooKeeper, find the consumer-group, topic and
partition, and increment the offset past the corrupt point.

On Tue, Aug 4, 2015 at 10:23 PM, Henry Cai h...@pinterest.com.invalid
wrote:

 Hi,

 We are using the Kafka high-level consumer 8.1.1, somehow we got a
 corrupted message in the topic.  We are not sure the root cause of this,
 but the problem we are having now is the HL consumer is stuck in that
 position:

 kafka.message.InvalidMessageException: Message is corrupt (stored crc =
 537685622, computed crc = 36513351)

 at kafka.message.Message.ensureValid(Message.scala:166)

 at
 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)

 at
 kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)

 at
 kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)

 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)


 If we try to ignore that exception and iterate to the next message, the
 iterator couldn't pass that error state:

 java.lang.IllegalStateException: Iterator is in failed state

 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)


 By looking at the code, looks like you can only calling
 IteratorTemplate.resetState() to clear the state, but this is an internal
 method, is this the right way to workaround this problem?



Kafka vs RabbitMQ latency

2015-08-04 Thread Yuheng Du
Hi guys,

I was reading a paper today in which the latency of kafka and rabbitmq is
compared:
http://downloads.hindawi.com/journals/js/2015/468047.pdf

To my surprise, kafka has shown some large variations of latency as the
number of records per second increases.

So I am curious about why is that. Also in the
ProducerPerformanceTest: in/kafka-run-class.sh
org.apache.kafka.clients.tools.ProducerPerformance test7 5000 100 -1
*acks=1* bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
buffer.memory=67108864 batch.size=8196

Setting acks = 1 means the producer will wait for ack from leader replica,
right? Could that be the reason which affects latency? If I set it to 0, it
will make the producers send as fast as possible therefore the throughput
can increase and latency decrease in the test results?

Thanks for answering.

best,


Re: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

2015-08-04 Thread David Li
you are right.

after I run kafka in my localhost directly, it works just fine

after further google, i found that need to set two parameters below if the
kafka is running on some other machines

#advertised.host.name=hostname routable by clients
#advertised.port=port accessible by clients

more precisely, if the kafka is running within a docker container, the
advertised.host.name should be set to the docker host ip, the
advertised.port should be set to the mapped port to the docker host.

thanks again.


On Tue, Aug 4, 2015 at 4:58 PM, Jilin Xie jilinxie1...@gmail.com wrote:

 Some suggestions:
  Check the existence of the topic.
  Check the firewall of the broker... Try telnet or something to
 make sure it's available.
  Try run the producer on the broker machine.

 Since you get this error, this code is functioning. I think it's some
 configuration and parameter stuff leading to this problem.

 On Tue, Aug 4, 2015 at 11:21 AM, David Li da...@stargazer.com.sg wrote:

  Hi I have a very simple code to just send out one message, the topic is
  created automatically, but the message just cannot be sent out. I also
  tried to change the configuration, the result is still the same. Sorry to
  bother you all with this silly question.
 
  For your information, the kafka server is running on a docker container,
  which is run in a ubuntu server vm. The test class is run from IntelliJ
  IDEA on the host, which is a amc os x.
 
  public static void main(String[] args) {
  String topic = test3;
 
  Properties props = new Properties();
  props.put(serializer.class, kafka.serializer.StringEncoder);
  props.put(metadata.broker.list, 192.168.144.10:29092);
  //props.put(retry.backoff.ms, 1000);
  //props.put(message.send.max.retries, 10);
  //props.put(topic.metadata.refresh.interval.ms, 0);
 
  ProducerInteger, String producer = new ProducerInteger,
  String(new ProducerConfig(props));
 
  int messageNo = 1;
  String messageStr = new String(Message_ + messageNo);
  producer.send(new KeyedMessageInteger, String(topic,
  messageStr));
  }
 



Re: Consumer that consumes only local partition?

2015-08-04 Thread Hawin Jiang
Hi  Robert

Here is the kafka benchmark for your reference.
if you want to use Flink, Storm, Samza or Spark, the performance will be
going down.

821,557 records/sec(78.3 MB/sec)

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines





Best regards
Hawin



On Tue, Aug 4, 2015 at 11:57 AM, Robert Metzger rmetz...@apache.org wrote:

 Sorry for the very late reply ...

 The performance issue was not caused by network latency. I had a job like
 this:
 FlinkKafkaConsumer -- someSimpleOperation -- FlinkKafkaProducer.

 I thought that our FlinkKafkaConsumer is slow, but actually our
 FlinkKafkaProducer was using the old producer API of Kafka. Switching to
 the new producer API of Kafka greatly improved our writing performance to
 Kafka. Flink was slowing down the KafkaConsumer because of the producer.

 Since we are already talking about performance, let me ask you the
 following question:
 I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What
 would you consider a good read/write performance for 8-byte messages on the
 following setup?
 - 40 brokers,
 - topic with 120 partitions
 - 120 reading threads (on 30 machines)
 - 120 writing threads (on 30 machines)

 I'm getting a write throughput of ~75k elements/core/second and a read
 throughput of ~50k el/c/s.
 When I'm stopping the writers, the read throughput goes up to 130k.
 I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per
 partition .. or are the messages too small and the overhead is very high.

 Which system out there would you recommend for getting reference
 performance numbers? Samza, Spark, Storm?


 On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

  This is not something you can use the consumer API to simply do easily
  (consumers don't have locality notion).
  I can imagine using Kafka's low-level API calls to get a list of
  partitions and the lead replica, figuring out which are local and
  using those - but that sounds painful.
 
  Are you 100% sure the performance issue is due to network latency? If
  not, you may want to start optimizing somewhere more productive :)
  Kafka brokers and clients both have Metrics that may help you track
  where the performance issues are coming from.
 
  Gwen
 
  On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger rmetz...@apache.org
  wrote:
   Hi Shef,
  
   did you resolve this issue?
   I'm facing some performance issues and I was wondering whether reading
   locally would resolve them.
  
   On Mon, Jun 22, 2015 at 11:43 PM, Shef she...@yahoo.com wrote:
  
   Noob question here. I want to have a single consumer for each
 partition
   that consumes only the messages that have been written locally. In
 other
   words, I want the consumer to access the local disk and not pull
  anything
   across the network. Possible?
  
   How can I discover which partitions are local?
  
  
  
 



Re: Decomissioning a broker

2015-08-04 Thread Grant Henke
Thats correct. Thanks for catching that.

On Tue, Aug 4, 2015 at 3:27 PM, Andrew Otto ao...@wikimedia.org wrote:

 Thanks!

  In fact if you use a Controlled Shutdown migrating the replicas and
  leaders should happen for you as well.

 Just to clarify, controlled shutdown will only move the leaders to other
 replicas, right?  It won’t actually migrate any replicas elsewhere.

 -Ao


  On Aug 4, 2015, at 13:00, Grant Henke ghe...@cloudera.com wrote:
 
  The broker will actually unregister itself from zookeeper. The brokers id
  path uses ephemeral nodes so they are automatically destroyed on
 shutdown.
  In fact if you use a Controlled Shutdown migrating the replicas and
  leaders should happen for you as well. Though, manual reassignment may be
  preferred in your case.
 
  Here is some extra information on controlled shutdowns:
  http://kafka.apache.org/documentation.html#basic_ops_restarting
 
  Thanks,
  Grant
 
  On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto ao...@wikimedia.org
 wrote:
 
  I’m sure this has been asked before, but I can’t seem to find the
 answer.
 
  I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing
  so, I will be decommissioning a broker.  I plan to remove this broker
 fully
  from the cluster, and then reinstall it and use it for a different
 purpose.
 
  I understand how to use the reassign-partitions tool to generate new
  partition assignments and to move partitions around so that the target
  broker no longer has any active replicas.  Once that is done, is there
  anything special that needs to happen?  I can shutdown the broker, but
 as
  far as I know that broker will still be registered in Zookeeper.
 Should I
  just delete the znode for that broker once it has been shut down?
 
  Thanks!
  -Andrew Otto
 
 
 
 
  --
  Grant Henke
  Software Engineer | Cloudera
  gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke




-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: Access control in kafka

2015-08-04 Thread Parth Brahmbhatt
If this 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I
nterface) is what you need then watch for
https://reviews.apache.org/r/34492/ to get committed to trunk.


Thanks
Parth

On 8/4/15, 1:57 PM, Alvaro Gareppe agare...@gmail.com wrote:

Can someone point me to documentation about access control in kafka. There
is something implemented in the current or plan for future versions ?

I need something that allows me to define what users are allowed to
connect
to certain topic, and of course user management.

Thank you guys in advance!

-- 
Eng. Alvaro Gareppe



Re: Got conflicted ephemeral node exception for several hours

2015-08-04 Thread Jaikiran Pai


I am on Kafka 0.8.2.1 (Java 8) and have happened to run into this same 
issue where the KafkaServer (broker) goes into a indefinite while loop 
writing out this message:


[2015-08-04 15:45:12,350] INFO conflict in /brokers/ids/0 data: 
{jmx_port:-1,timestamp:1438661432074,host:foo-bar,version:1,port:9092} 
stored data: 
{jmx_port:-1,timestamp:1438661429589,host:foo-bar,version:1,port:9092} 
(kafka.utils.ZkUtils$)
[2015-08-04 15:45:12,352] INFO I wrote this conflicted ephemeral node 
[{jmx_port:-1,timestamp:1438661432074,host:foo-bar,version:1,port:9092}] 
at /brokers/ids/0 a while back in a different session, hence I will 
backoff for this node to be deleted by Zookeeper and retry 
(kafka.utils.ZkUtils$)


These above 2 lines have been repeating continuously every few seconds 
for the past 20 odd hours on this broker and this broker has been 
rendered useless and is contributing to high CPU usage.


As a result the consumers have gone into a state where they no longer 
consume the messages. Furthermore, this continuous looping has put Kafka 
process on top of the CPU usage. I understand that bouncing the consumer 
is an option and probably will fix this, but in our real production 
environments, we won't be able to bounce the consumers. I currently have 
access to logs (some of which has been pasted here). Is there any chance 
these logs help in narrowing down the issue and fixing the root cause. 
Can we also please add a retry max limit kind of thing in this Zookeeper 
node creation logic instead of going into a indefinite while loop?


I have maintained the original timestamps in the logs so as to help 
narrow down the issue. The 1438661432074 (milli second) in the log 
translates to Aug 03 2015 21:10:32 (PDT) and 1438661429589 translates to 
Aug 03 2015 21:10:30 (PDT). I have included that part of the log snippet 
from the server.log of the broker (10.95.100.31).



[2015-08-03 21:10:29,805] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)
[2015-08-03 21:10:29,938] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
[2015-08-03 21:10:30,045] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)

 a lot more similar exceptions 


[2015-08-03 21:10:31,304] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,397] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,399] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,445] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)


 bunch of similar logs as above 

[2015-08-03 21:10:31,784] INFO [ReplicaFetcherManager on broker 0] 
Removed fetcher for partitions [partition list] 
(kafka.server.ReplicaFetcherManager)
[2015-08-03 21:10:31,860] INFO Closing socket connection to 
/10.95.100.31. 

Re: Lead Broker from kafka.message.MessageAndMetadata

2015-08-04 Thread Grant Henke
Hi Sreeni,

Using the SimpleConsumer you can send a TopicMetadataRequest for a topic
and the TopicMetadataResponse will contain TopicMetadata for each topic
requested (or all) which contains PartitionMetadata for all all partitions.
The PartitionMetadata contains the leader, replicas, and isr.

Is that what you are looking for?

Thanks,
Grant

On Mon, Aug 3, 2015 at 7:26 AM, Sreenivasulu Nallapati 
sreenu.nallap...@gmail.com wrote:

 Hello,

 Is there a way that we can find the lead broker
 from kafka.message.MessageAndMetadata class?

 My use case is simple, I have topic and partition and wanted to find out
 the lead broker for that partition.

 Please provide your insights


 Thanks
 Sreeni




-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: Checkpointing with custom metadata

2015-08-04 Thread Jason Gustafson
I couldn't find a jira for this, so I added KAFKA-2403.

-Jason

On Tue, Aug 4, 2015 at 9:36 AM, Jay Kreps j...@confluent.io wrote:

 Hey James,

 You are right the intended use of that was to have a way to capture some
 very small metadata about your state at the time of offset commit in an
 atomic way.

 That field isn't exposed but we do need to add it to the new consumer api
 (I think just no one has done it yet.

 -Jay

 On Mon, Aug 3, 2015 at 1:52 PM, James Cheng jch...@tivo.com wrote:

  According to
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
 ,
  we can store custom metadata with our checkpoints. It looks like the high
  level consumer does not support committing offsets with metadata, and
 that
  in order to checkpoint with custom metadata, we have to issue the
  OffsetCommitRequest ourselves. Is that correct?
 
  Thanks,
  -James
 
 



Re: Checkpointing with custom metadata

2015-08-04 Thread Jay Kreps
Hey James,

You are right the intended use of that was to have a way to capture some
very small metadata about your state at the time of offset commit in an
atomic way.

That field isn't exposed but we do need to add it to the new consumer api
(I think just no one has done it yet.

-Jay

On Mon, Aug 3, 2015 at 1:52 PM, James Cheng jch...@tivo.com wrote:

 According to
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
 we can store custom metadata with our checkpoints. It looks like the high
 level consumer does not support committing offsets with metadata, and that
 in order to checkpoint with custom metadata, we have to issue the
 OffsetCommitRequest ourselves. Is that correct?

 Thanks,
 -James




Re: new consumer api?

2015-08-04 Thread Jason Gustafson
Hey Simon,

The new consumer has the ability to forego group management and assign
partitions directly. Once assigned, you can seek to any offset you want.

-Jason

On Tue, Aug 4, 2015 at 5:08 AM, Simon Cooper 
simon.coo...@featurespace.co.uk wrote:

 Reading on the consumer docs, there's no mention of a relatively simple
 consumer that doesn't need groups, coordinators, commits, anything like
 that - just read and poll from specified offsets of specific topic
 partitions - but automatically deals with leadership changes and connection
 losses (so one level up from SimpleConsumer).

 Will the new API be able to be used in this relatively simple way?
 SimonC

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: 03 August 2015 18:19
 To: users@kafka.apache.org
 Subject: Re: new consumer api?

 Jalpesh,

 We are still iterating on the new consumer a bit and are waiting for some
 of the security jiras to be committed. So now, we are shooting for
 releasing 0.8.3 in Oct (just updated
 https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).

 Thanks,

 Jun

 On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia 
 jalpesh.pata...@clickbank.com wrote:

  Hello guys,
 
  A while ago i read that the new consumer api was going to be released
  sometime in July as part of the 0.8.3/0.9 release.
  https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
 
 
  Do we have an update when we think that can happen?
 
 
  Thanks,
 
  Jalpesh
 
 
  -- PRIVILEGED AND CONFIDENTIAL This transmission may contain
  privileged, proprietary or confidential information. If you are not
  the intended recipient, you are instructed not to review this
  transmission. If you are not the intended recipient, please notify the
  sender that you received this message and delete this transmission from
 your system.
 



Re: Kafka Zookeeper Issues

2015-08-04 Thread Grant Henke
The /brokers/ids nodes are ephemeral nodes that only exists while the
brokers maintain a session to zookeeper. There is more information on
Kafka's Zookeeper usage here:
   - http://kafka.apache.org/documentation.html
  - look for Broker Node Registry
   -
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

Hopefully that helps debug your issue.

Thank you,
Grant

On Mon, Aug 3, 2015 at 5:20 AM, Wollert, Fabian fabian.woll...@zalando.de
wrote:

 hi everyone,

 we are trying to deploy Kafka 0.8.2.1 and Zookeeper on AWS using
 Cloudformation, ASG's and other Services. For Zookeeper we are using
 Netflix' Exhibitor (V 1.5.5) to ensure failover stability.

 What we are observing right now is that after some days our Brokers are not
 registered anymore in the /brokers/ids path in Zookeeper. I was trying to
 see when they get deleted to check the logs, but the ZK Transaction logs
 only shows the create stmt, no deletes or something (though deletes are
 written down there). Can someone explain me how the mechanism works with
 registering and deregistering in Zookeeper or point me to a doc or even
 source code, where this happens? Or some one has even some idea what
 happens there.

 Any experience on what to take care of deploying kafka on AWS (or generally
 a cloud env) would be also helpful.

 Cheers

 --
 *Fabian Wollert*
 Business Intelligence

 *POSTAL ADDRESS*
 Zalando SE
 11501 Berlin

 *OFFICE*
 Zalando SE
 Mollstraße 1
 10178 Berlin
 Germany

 Phone: +49 30 20968 1819
 Fax:   +49 30 27594 693
 E-Mail: fabian.woll...@zalando.de
 Web: www.zalando.de
 Jobs: jobs.zalando.de

 Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin
 Company registration: Amtsgericht Charlottenburg, HRB 158855 B
 Tax ID: 29/560/00596 * VAT registration number: DE 260543043
 Management Board: Robert Gentz, David Schneider, Rubin Ritter
 Chairperson of the Supervisory Board: Cristina Stenbeck
 Registered office: Berlinn




-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: Decomissioning a broker

2015-08-04 Thread Grant Henke
The broker will actually unregister itself from zookeeper. The brokers id
path uses ephemeral nodes so they are automatically destroyed on shutdown.
In fact if you use a Controlled Shutdown migrating the replicas and
leaders should happen for you as well. Though, manual reassignment may be
preferred in your case.

Here is some extra information on controlled shutdowns:
http://kafka.apache.org/documentation.html#basic_ops_restarting

Thanks,
Grant

On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto ao...@wikimedia.org wrote:

 I’m sure this has been asked before, but I can’t seem to find the answer.

 I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing
 so, I will be decommissioning a broker.  I plan to remove this broker fully
 from the cluster, and then reinstall it and use it for a different purpose.

 I understand how to use the reassign-partitions tool to generate new
 partition assignments and to move partitions around so that the target
 broker no longer has any active replicas.  Once that is done, is there
 anything special that needs to happen?  I can shutdown the broker, but as
 far as I know that broker will still be registered in Zookeeper.  Should I
 just delete the znode for that broker once it has been shut down?

 Thanks!
 -Andrew Otto




-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: Decomissioning a broker

2015-08-04 Thread Andrew Otto
Thanks!

 In fact if you use a Controlled Shutdown migrating the replicas and
 leaders should happen for you as well.

Just to clarify, controlled shutdown will only move the leaders to other 
replicas, right?  It won’t actually migrate any replicas elsewhere.

-Ao


 On Aug 4, 2015, at 13:00, Grant Henke ghe...@cloudera.com wrote:
 
 The broker will actually unregister itself from zookeeper. The brokers id
 path uses ephemeral nodes so they are automatically destroyed on shutdown.
 In fact if you use a Controlled Shutdown migrating the replicas and
 leaders should happen for you as well. Though, manual reassignment may be
 preferred in your case.
 
 Here is some extra information on controlled shutdowns:
 http://kafka.apache.org/documentation.html#basic_ops_restarting
 
 Thanks,
 Grant
 
 On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto ao...@wikimedia.org wrote:
 
 I’m sure this has been asked before, but I can’t seem to find the answer.
 
 I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing
 so, I will be decommissioning a broker.  I plan to remove this broker fully
 from the cluster, and then reinstall it and use it for a different purpose.
 
 I understand how to use the reassign-partitions tool to generate new
 partition assignments and to move partitions around so that the target
 broker no longer has any active replicas.  Once that is done, is there
 anything special that needs to happen?  I can shutdown the broker, but as
 far as I know that broker will still be registered in Zookeeper.  Should I
 just delete the znode for that broker once it has been shut down?
 
 Thanks!
 -Andrew Otto
 
 
 
 
 -- 
 Grant Henke
 Software Engineer | Cloudera
 gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke



message filterin or selector

2015-08-04 Thread Alvaro Gareppe
The is way to implement a selector logic in kafka (similar to JMS
selectors)

So, allow to consume a message if only the message contains certain header
or content ?

I'm evaluating to migrate from ActiveMQ to kafka and I'm using the selector
logic widely in the application

-- 
Ing. Alvaro Gareppe
agare...@gmail.com


Re: Consumer that consumes only local partition?

2015-08-04 Thread Robert Metzger
Sorry for the very late reply ...

The performance issue was not caused by network latency. I had a job like
this:
FlinkKafkaConsumer -- someSimpleOperation -- FlinkKafkaProducer.

I thought that our FlinkKafkaConsumer is slow, but actually our
FlinkKafkaProducer was using the old producer API of Kafka. Switching to
the new producer API of Kafka greatly improved our writing performance to
Kafka. Flink was slowing down the KafkaConsumer because of the producer.

Since we are already talking about performance, let me ask you the
following question:
I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What
would you consider a good read/write performance for 8-byte messages on the
following setup?
- 40 brokers,
- topic with 120 partitions
- 120 reading threads (on 30 machines)
- 120 writing threads (on 30 machines)

I'm getting a write throughput of ~75k elements/core/second and a read
throughput of ~50k el/c/s.
When I'm stopping the writers, the read throughput goes up to 130k.
I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per
partition .. or are the messages too small and the overhead is very high.

Which system out there would you recommend for getting reference
performance numbers? Samza, Spark, Storm?


On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira gshap...@cloudera.com wrote:

 This is not something you can use the consumer API to simply do easily
 (consumers don't have locality notion).
 I can imagine using Kafka's low-level API calls to get a list of
 partitions and the lead replica, figuring out which are local and
 using those - but that sounds painful.

 Are you 100% sure the performance issue is due to network latency? If
 not, you may want to start optimizing somewhere more productive :)
 Kafka brokers and clients both have Metrics that may help you track
 where the performance issues are coming from.

 Gwen

 On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger rmetz...@apache.org
 wrote:
  Hi Shef,
 
  did you resolve this issue?
  I'm facing some performance issues and I was wondering whether reading
  locally would resolve them.
 
  On Mon, Jun 22, 2015 at 11:43 PM, Shef she...@yahoo.com wrote:
 
  Noob question here. I want to have a single consumer for each partition
  that consumes only the messages that have been written locally. In other
  words, I want the consumer to access the local disk and not pull
 anything
  across the network. Possible?
 
  How can I discover which partitions are local?
 
 
 



Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Gwen Shapira
To add some internals, the high level consumer actually does read entire
batches from Kafka. It just exposes them to the user in an event loop,
because its a very natural API. Users can then batch events the way they
prefer.

So if you are worried about batches being more efficient than single
events, you are covered!

Gwen

On Tue, Aug 4, 2015 at 12:04 PM, shahab shahab.mok...@gmail.com wrote:

 Thanks a lot Shaminder for clarification and thanks Raja for pointing me to
 the example.

 best,
 /shahab

 On Tue, Aug 4, 2015 at 6:06 PM, Rajasekar Elango rela...@salesforce.com
 wrote:

  Here is an example on what sharninder suggested
 
 
 http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
 
  Thanks,
  Raja.
 
  On Tue, Aug 4, 2015 at 12:01 PM, Sharninder sharnin...@gmail.com
 wrote:
 
   You can't. Kafka is essentially a queue, so you always read messages
 one
   by one. What you can do is disable auto offset commit, read 100
 messages,
   process them and then manually commit offset.
  
   --
   Sharninder
  
On 04-Aug-2015, at 9:07 pm, shahab shahab.mok...@gmail.com wrote:
   
Hi,
   
While we the producer can put data as batch in kafka server,  I
  couldn't
find any API (or any document) saying how we can fetch data as batch
  from
Kafka ?
Even when data is placed as batch in kafka server, still using High
  Level
consumer I can only read one by one, and I can not specify. for
  example,
read 100 items at once!
   
Is this correct observation? or I am missing something?
   
best,
/Shahab
  
 
 
 
  --
  Thanks,
  Raja.
 



Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread shahab
Thanks a lot Shaminder for clarification and thanks Raja for pointing me to
the example.

best,
/shahab

On Tue, Aug 4, 2015 at 6:06 PM, Rajasekar Elango rela...@salesforce.com
wrote:

 Here is an example on what sharninder suggested

 http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

 Thanks,
 Raja.

 On Tue, Aug 4, 2015 at 12:01 PM, Sharninder sharnin...@gmail.com wrote:

  You can't. Kafka is essentially a queue, so you always read messages one
  by one. What you can do is disable auto offset commit, read 100 messages,
  process them and then manually commit offset.
 
  --
  Sharninder
 
   On 04-Aug-2015, at 9:07 pm, shahab shahab.mok...@gmail.com wrote:
  
   Hi,
  
   While we the producer can put data as batch in kafka server,  I
 couldn't
   find any API (or any document) saying how we can fetch data as batch
 from
   Kafka ?
   Even when data is placed as batch in kafka server, still using High
 Level
   consumer I can only read one by one, and I can not specify. for
 example,
   read 100 items at once!
  
   Is this correct observation? or I am missing something?
  
   best,
   /Shahab
 



 --
 Thanks,
 Raja.



Access control in kafka

2015-08-04 Thread Alvaro Gareppe
Can someone point me to documentation about access control in kafka. There
is something implemented in the current or plan for future versions ?

I need something that allows me to define what users are allowed to connect
to certain topic, and of course user management.

Thank you guys in advance!

-- 
Eng. Alvaro Gareppe


Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Rajasekar Elango
Here is an example on what sharninder suggested
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

Thanks,
Raja.

On Tue, Aug 4, 2015 at 12:01 PM, Sharninder sharnin...@gmail.com wrote:

 You can't. Kafka is essentially a queue, so you always read messages one
 by one. What you can do is disable auto offset commit, read 100 messages,
 process them and then manually commit offset.

 --
 Sharninder

  On 04-Aug-2015, at 9:07 pm, shahab shahab.mok...@gmail.com wrote:
 
  Hi,
 
  While we the producer can put data as batch in kafka server,  I couldn't
  find any API (or any document) saying how we can fetch data as batch from
  Kafka ?
  Even when data is placed as batch in kafka server, still using High Level
  consumer I can only read one by one, and I can not specify. for example,
  read 100 items at once!
 
  Is this correct observation? or I am missing something?
 
  best,
  /Shahab




-- 
Thanks,
Raja.