Re: InvalidMessageException: Message is corrupt, Consumer stuck

2015-08-04 Thread Gwen Shapira
The high level consumer stores its state in ZooKeeper. Theoretically, you
should be able to go into ZooKeeper, find the consumer-group, topic and
partition, and increment the offset past the "corrupt" point.

On Tue, Aug 4, 2015 at 10:23 PM, Henry Cai 
wrote:

> Hi,
>
> We are using the Kafka high-level consumer 8.1.1, somehow we got a
> corrupted message in the topic.  We are not sure the root cause of this,
> but the problem we are having now is the HL consumer is stuck in that
> position:
>
> kafka.message.InvalidMessageException: Message is corrupt (stored crc =
> 537685622, computed crc = 36513351)
>
> at kafka.message.Message.ensureValid(Message.scala:166)
>
> at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)
>
> at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
>
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)
>
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)
>
>
> If we try to ignore that exception and iterate to the next message, the
> iterator couldn't pass that error state:
>
> java.lang.IllegalStateException: Iterator is in failed state
>
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
>
>
> By looking at the code, looks like you can only calling
> IteratorTemplate.resetState() to clear the state, but this is an internal
> method, is this the right way to workaround this problem?
>


InvalidMessageException: Message is corrupt, Consumer stuck

2015-08-04 Thread Henry Cai
Hi,

We are using the Kafka high-level consumer 8.1.1, somehow we got a
corrupted message in the topic.  We are not sure the root cause of this,
but the problem we are having now is the HL consumer is stuck in that
position:

kafka.message.InvalidMessageException: Message is corrupt (stored crc =
537685622, computed crc = 36513351)

at kafka.message.Message.ensureValid(Message.scala:166)

at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102)

at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)

at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66)

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58)


If we try to ignore that exception and iterate to the next message, the
iterator couldn't pass that error state:

java.lang.IllegalStateException: Iterator is in failed state

at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)


By looking at the code, looks like you can only calling
IteratorTemplate.resetState() to clear the state, but this is an internal
method, is this the right way to workaround this problem?


Re: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

2015-08-04 Thread David Li
you are right.

after I run kafka in my localhost directly, it works just fine

after further google, i found that need to set two parameters below if the
kafka is running on some other machines

#advertised.host.name=
#advertised.port=

more precisely, if the kafka is running within a docker container, the
advertised.host.name should be set to the docker host ip, the
advertised.port should be set to the mapped port to the docker host.

thanks again.


On Tue, Aug 4, 2015 at 4:58 PM, Jilin Xie  wrote:

> Some suggestions:
>  Check the existence of the topic.
>  Check the firewall of the broker... Try telnet or something to
> make sure it's available.
>  Try run the producer on the broker machine.
>
> Since you get this error, this code is functioning. I think it's some
> configuration and parameter stuff leading to this problem.
>
> On Tue, Aug 4, 2015 at 11:21 AM, David Li  wrote:
>
> > Hi I have a very simple code to just send out one message, the topic is
> > created automatically, but the message just cannot be sent out. I also
> > tried to change the configuration, the result is still the same. Sorry to
> > bother you all with this silly question.
> >
> > For your information, the kafka server is running on a docker container,
> > which is run in a ubuntu server vm. The test class is run from IntelliJ
> > IDEA on the host, which is a amc os x.
> >
> > public static void main(String[] args) {
> > String topic = "test3";
> >
> > Properties props = new Properties();
> > props.put("serializer.class", "kafka.serializer.StringEncoder");
> > props.put("metadata.broker.list", "192.168.144.10:29092");
> > //props.put("retry.backoff.ms", "1000");
> > //props.put("message.send.max.retries", "10");
> > //props.put("topic.metadata.refresh.interval.ms", "0");
> >
> > Producer producer = new Producer > String>(new ProducerConfig(props));
> >
> > int messageNo = 1;
> > String messageStr = new String("Message_" + messageNo);
> > producer.send(new KeyedMessage(topic,
> > messageStr));
> > }
> >
>


Kafka Broker server cannot be connected by telnet from other Kafka Brokers

2015-08-04 Thread Qi Xu
Hi Everyone,
We're trying the deploy the Kafka behind the network balancer and we have
created the port map for each Kafka brokers under that network balancer--we
only have one public IP and the Kafka clients are in other system and thus
cannot access the brokers via internal IP directly.

So for example, we have the public IP 1.2.3.4,  we map 1.2.3.4:9092
for broker1 and 1.2.3.4 : 9093 for broker2, etc. And in the
server.properties, the advertised host and port will be 1.2.3.4:9092 and
1.2.3.4:9093 for broker 1 and broker 2 respectively.

It works well at beginning. But then after several days with load,  the
replication between brokers fails due to connection timeout---it happens
intermediately. But the outside connection to these Brokers are still
working fine consistently.

By looking at the tcpdump at the destination server, we find out that when
timeout happens when the broker does not send TCP ACK back to the public IP
1.2.3.4.
As you can see the Source IP here is the public IP, and the destination is
the broker. The broker server does not send TCP ACK to the source.
[image: Inline image 1]

Did you see the similar problem before?  Very appreciate for any kind of
information.

Thanks,
Tony


Kafka vs RabbitMQ latency

2015-08-04 Thread Yuheng Du
Hi guys,

I was reading a paper today in which the latency of kafka and rabbitmq is
compared:
http://downloads.hindawi.com/journals/js/2015/468047.pdf

To my surprise, kafka has shown some large variations of latency as the
number of records per second increases.

So I am curious about why is that. Also in the
ProducerPerformanceTest: in/kafka-run-class.sh
org.apache.kafka.clients.tools.ProducerPerformance test7 5000 100 -1
*acks=1* bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
buffer.memory=67108864 batch.size=8196

Setting acks = 1 means the producer will wait for ack from leader replica,
right? Could that be the reason which affects latency? If I set it to 0, it
will make the producers send as fast as possible therefore the throughput
can increase and latency decrease in the test results?

Thanks for answering.

best,


Re: Decomissioning a broker

2015-08-04 Thread Grant Henke
Thats correct. Thanks for catching that.

On Tue, Aug 4, 2015 at 3:27 PM, Andrew Otto  wrote:

> Thanks!
>
> > In fact if you use a "Controlled Shutdown" migrating the replicas and
> > leaders should happen for you as well.
>
> Just to clarify, controlled shutdown will only move the leaders to other
> replicas, right?  It won’t actually migrate any replicas elsewhere.
>
> -Ao
>
>
> > On Aug 4, 2015, at 13:00, Grant Henke  wrote:
> >
> > The broker will actually unregister itself from zookeeper. The brokers id
> > path uses ephemeral nodes so they are automatically destroyed on
> shutdown.
> > In fact if you use a "Controlled Shutdown" migrating the replicas and
> > leaders should happen for you as well. Though, manual reassignment may be
> > preferred in your case.
> >
> > Here is some extra information on controlled shutdowns:
> > http://kafka.apache.org/documentation.html#basic_ops_restarting
> >
> > Thanks,
> > Grant
> >
> > On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto 
> wrote:
> >
> >> I’m sure this has been asked before, but I can’t seem to find the
> answer.
> >>
> >> I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing
> >> so, I will be decommissioning a broker.  I plan to remove this broker
> fully
> >> from the cluster, and then reinstall it and use it for a different
> purpose.
> >>
> >> I understand how to use the reassign-partitions tool to generate new
> >> partition assignments and to move partitions around so that the target
> >> broker no longer has any active replicas.  Once that is done, is there
> >> anything special that needs to happen?  I can shutdown the broker, but
> as
> >> far as I know that broker will still be registered in Zookeeper.
> Should I
> >> just delete the znode for that broker once it has been shut down?
> >>
> >> Thanks!
> >> -Andrew Otto
> >>
> >>
> >
> >
> > --
> > Grant Henke
> > Software Engineer | Cloudera
> > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>
>


-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: message filterin or "selector"

2015-08-04 Thread Gwen Shapira
The way Kafka is currently implemented is that Kafka is not aware of the
content of messages, so there is no Selector logic available.

The way to go is to implement the Selector in your client - i.e. your
consume() loop will get all messages but will throw away those that don't
fit your pattern.


It may be worthwhile to add a ticket for pluggable selector logic in the
new consumer. I can't guarantee it will happen, there are infinite things
that can be plugged into consumers and we need to draw the line somewhere,
but worth a discussion.

On Tue, Aug 4, 2015 at 2:05 PM, Alvaro Gareppe  wrote:

> The is way to implement a "selector" logic in kafka (similar to JMS
> selectors)
>
> So, allow to consume a message if only the message contains certain header
> or content ?
>
> I'm evaluating to migrate from ActiveMQ to kafka and I'm using the selector
> logic widely in the application
>
> --
> Ing. Alvaro Gareppe
> agare...@gmail.com
>


Re: Consumer that consumes only local partition?

2015-08-04 Thread Hawin Jiang
Hi  Robert

Here is the kafka benchmark for your reference.
if you want to use Flink, Storm, Samza or Spark, the performance will be
going down.

821,557 records/sec(78.3 MB/sec)

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines





Best regards
Hawin



On Tue, Aug 4, 2015 at 11:57 AM, Robert Metzger  wrote:

> Sorry for the very late reply ...
>
> The performance issue was not caused by network latency. I had a job like
> this:
> FlinkKafkaConsumer --> someSimpleOperation --> FlinkKafkaProducer.
>
> I thought that our FlinkKafkaConsumer is slow, but actually our
> FlinkKafkaProducer was using the old producer API of Kafka. Switching to
> the new producer API of Kafka greatly improved our writing performance to
> Kafka. Flink was slowing down the KafkaConsumer because of the producer.
>
> Since we are already talking about performance, let me ask you the
> following question:
> I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What
> would you consider a good read/write performance for 8-byte messages on the
> following setup?
> - 40 brokers,
> - topic with 120 partitions
> - 120 reading threads (on 30 machines)
> - 120 writing threads (on 30 machines)
>
> I'm getting a write throughput of ~75k elements/core/second and a read
> throughput of ~50k el/c/s.
> When I'm stopping the writers, the read throughput goes up to 130k.
> I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per
> partition .. or are the messages too small and the overhead is very high.
>
> Which system out there would you recommend for getting reference
> performance numbers? Samza, Spark, Storm?
>
>
> On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira 
> wrote:
>
> > This is not something you can use the consumer API to simply do easily
> > (consumers don't have locality notion).
> > I can imagine using Kafka's low-level API calls to get a list of
> > partitions and the lead replica, figuring out which are local and
> > using those - but that sounds painful.
> >
> > Are you 100% sure the performance issue is due to network latency? If
> > not, you may want to start optimizing somewhere more productive :)
> > Kafka brokers and clients both have Metrics that may help you track
> > where the performance issues are coming from.
> >
> > Gwen
> >
> > On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger 
> > wrote:
> > > Hi Shef,
> > >
> > > did you resolve this issue?
> > > I'm facing some performance issues and I was wondering whether reading
> > > locally would resolve them.
> > >
> > > On Mon, Jun 22, 2015 at 11:43 PM, Shef  wrote:
> > >
> > >> Noob question here. I want to have a single consumer for each
> partition
> > >> that consumes only the messages that have been written locally. In
> other
> > >> words, I want the consumer to access the local disk and not pull
> > anything
> > >> across the network. Possible?
> > >>
> > >> How can I discover which partitions are local?
> > >>
> > >>
> > >>
> >
>


Re: Got conflicted ephemeral node exception for several hours

2015-08-04 Thread Jaikiran Pai


I am on Kafka 0.8.2.1 (Java 8) and have happened to run into this same 
issue where the KafkaServer (broker) goes into a indefinite while loop 
writing out this message:


[2015-08-04 15:45:12,350] INFO conflict in /brokers/ids/0 data: 
{"jmx_port":-1,"timestamp":"1438661432074","host":"foo-bar","version":1,"port":9092} 
stored data: 
{"jmx_port":-1,"timestamp":"1438661429589","host":"foo-bar","version":1,"port":9092} 
(kafka.utils.ZkUtils$)
[2015-08-04 15:45:12,352] INFO I wrote this conflicted ephemeral node 
[{"jmx_port":-1,"timestamp":"1438661432074","host":"foo-bar","version":1,"port":9092}] 
at /brokers/ids/0 a while back in a different session, hence I will 
backoff for this node to be deleted by Zookeeper and retry 
(kafka.utils.ZkUtils$)


These above 2 lines have been repeating continuously every few seconds 
for the past 20 odd hours on this broker and this broker has been 
rendered useless and is contributing to high CPU usage.


As a result the consumers have gone into a state where they no longer 
consume the messages. Furthermore, this continuous looping has put Kafka 
process on top of the CPU usage. I understand that bouncing the consumer 
is an option and probably will "fix" this, but in our real production 
environments, we won't be able to bounce the consumers. I currently have 
access to logs (some of which has been pasted here). Is there any chance 
these logs help in narrowing down the issue and fixing the root cause. 
Can we also please add a retry max limit kind of thing in this Zookeeper 
node creation logic instead of going into a indefinite while loop?


I have maintained the original timestamps in the logs so as to help 
narrow down the issue. The 1438661432074 (milli second) in the log 
translates to Aug 03 2015 21:10:32 (PDT) and 1438661429589 translates to 
Aug 03 2015 21:10:30 (PDT). I have included that part of the log snippet 
from the server.log of the broker (10.95.100.31).



[2015-08-03 21:10:29,805] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)
[2015-08-03 21:10:29,938] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at kafka.utils.Utils$.read(Utils.scala:380)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:444)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
[2015-08-03 21:10:30,045] ERROR Closing socket for /10.95.100.31 because 
of error (kafka.network.Processor)

java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)

<<< a lot more similar exceptions >>>


[2015-08-03 21:10:31,304] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,397] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,399] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)
[2015-08-03 21:10:31,445] INFO Closing socket connection to 
/10.95.100.31. (kafka.network.Processor)


 bunch of similar logs as above 

[2015-08-03 21:10:31,784] INFO [ReplicaFetcherManager on broker 0] 
Removed fetcher for partitions [] 
(kafka.server.ReplicaFetcherManager)
[2015-08-03 21:10:31,860] INFO C

Re: Access control in kafka

2015-08-04 Thread Parth Brahmbhatt
If this 
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I
nterface) is what you need then watch for
https://reviews.apache.org/r/34492/ to get committed to trunk.


Thanks
Parth

On 8/4/15, 1:57 PM, "Alvaro Gareppe"  wrote:

>Can someone point me to documentation about access control in kafka. There
>is something implemented in the current or plan for future versions ?
>
>I need something that allows me to define what users are allowed to
>connect
>to certain topic, and of course user management.
>
>Thank you guys in advance!
>
>-- 
>Eng. Alvaro Gareppe



message filterin or "selector"

2015-08-04 Thread Alvaro Gareppe
The is way to implement a "selector" logic in kafka (similar to JMS
selectors)

So, allow to consume a message if only the message contains certain header
or content ?

I'm evaluating to migrate from ActiveMQ to kafka and I'm using the selector
logic widely in the application

-- 
Ing. Alvaro Gareppe
agare...@gmail.com


Access control in kafka

2015-08-04 Thread Alvaro Gareppe
Can someone point me to documentation about access control in kafka. There
is something implemented in the current or plan for future versions ?

I need something that allows me to define what users are allowed to connect
to certain topic, and of course user management.

Thank you guys in advance!

-- 
Eng. Alvaro Gareppe


Re: Decomissioning a broker

2015-08-04 Thread Andrew Otto
Thanks!

> In fact if you use a "Controlled Shutdown" migrating the replicas and
> leaders should happen for you as well.

Just to clarify, controlled shutdown will only move the leaders to other 
replicas, right?  It won’t actually migrate any replicas elsewhere.

-Ao


> On Aug 4, 2015, at 13:00, Grant Henke  wrote:
> 
> The broker will actually unregister itself from zookeeper. The brokers id
> path uses ephemeral nodes so they are automatically destroyed on shutdown.
> In fact if you use a "Controlled Shutdown" migrating the replicas and
> leaders should happen for you as well. Though, manual reassignment may be
> preferred in your case.
> 
> Here is some extra information on controlled shutdowns:
> http://kafka.apache.org/documentation.html#basic_ops_restarting
> 
> Thanks,
> Grant
> 
> On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto  wrote:
> 
>> I’m sure this has been asked before, but I can’t seem to find the answer.
>> 
>> I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing
>> so, I will be decommissioning a broker.  I plan to remove this broker fully
>> from the cluster, and then reinstall it and use it for a different purpose.
>> 
>> I understand how to use the reassign-partitions tool to generate new
>> partition assignments and to move partitions around so that the target
>> broker no longer has any active replicas.  Once that is done, is there
>> anything special that needs to happen?  I can shutdown the broker, but as
>> far as I know that broker will still be registered in Zookeeper.  Should I
>> just delete the znode for that broker once it has been shut down?
>> 
>> Thanks!
>> -Andrew Otto
>> 
>> 
> 
> 
> -- 
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke



Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Gwen Shapira
To add some internals, the high level consumer actually does read entire
batches from Kafka. It just exposes them to the user in an event loop,
because its a very natural API. Users can then batch events the way they
prefer.

So if you are worried about batches being more efficient than single
events, you are covered!

Gwen

On Tue, Aug 4, 2015 at 12:04 PM, shahab  wrote:

> Thanks a lot Shaminder for clarification and thanks Raja for pointing me to
> the example.
>
> best,
> /shahab
>
> On Tue, Aug 4, 2015 at 6:06 PM, Rajasekar Elango 
> wrote:
>
> > Here is an example on what sharninder suggested
> >
> >
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
> >
> > Thanks,
> > Raja.
> >
> > On Tue, Aug 4, 2015 at 12:01 PM, Sharninder 
> wrote:
> >
> > > You can't. Kafka is essentially a queue, so you always read messages
> one
> > > by one. What you can do is disable auto offset commit, read 100
> messages,
> > > process them and then manually commit offset.
> > >
> > > --
> > > Sharninder
> > >
> > > > On 04-Aug-2015, at 9:07 pm, shahab  wrote:
> > > >
> > > > Hi,
> > > >
> > > > While we the producer can put data as batch in kafka server,  I
> > couldn't
> > > > find any API (or any document) saying how we can fetch data as batch
> > from
> > > > Kafka ?
> > > > Even when data is placed as batch in kafka server, still using High
> > Level
> > > > consumer I can only read one by one, and I can not specify. for
> > example,
> > > > read 100 items at once!
> > > >
> > > > Is this correct observation? or I am missing something?
> > > >
> > > > best,
> > > > /Shahab
> > >
> >
> >
> >
> > --
> > Thanks,
> > Raja.
> >
>


Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread shahab
Thanks a lot Shaminder for clarification and thanks Raja for pointing me to
the example.

best,
/shahab

On Tue, Aug 4, 2015 at 6:06 PM, Rajasekar Elango 
wrote:

> Here is an example on what sharninder suggested
>
> http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/
>
> Thanks,
> Raja.
>
> On Tue, Aug 4, 2015 at 12:01 PM, Sharninder  wrote:
>
> > You can't. Kafka is essentially a queue, so you always read messages one
> > by one. What you can do is disable auto offset commit, read 100 messages,
> > process them and then manually commit offset.
> >
> > --
> > Sharninder
> >
> > > On 04-Aug-2015, at 9:07 pm, shahab  wrote:
> > >
> > > Hi,
> > >
> > > While we the producer can put data as batch in kafka server,  I
> couldn't
> > > find any API (or any document) saying how we can fetch data as batch
> from
> > > Kafka ?
> > > Even when data is placed as batch in kafka server, still using High
> Level
> > > consumer I can only read one by one, and I can not specify. for
> example,
> > > read 100 items at once!
> > >
> > > Is this correct observation? or I am missing something?
> > >
> > > best,
> > > /Shahab
> >
>
>
>
> --
> Thanks,
> Raja.
>


Re: Consumer that consumes only local partition?

2015-08-04 Thread Robert Metzger
Sorry for the very late reply ...

The performance issue was not caused by network latency. I had a job like
this:
FlinkKafkaConsumer --> someSimpleOperation --> FlinkKafkaProducer.

I thought that our FlinkKafkaConsumer is slow, but actually our
FlinkKafkaProducer was using the old producer API of Kafka. Switching to
the new producer API of Kafka greatly improved our writing performance to
Kafka. Flink was slowing down the KafkaConsumer because of the producer.

Since we are already talking about performance, let me ask you the
following question:
I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What
would you consider a good read/write performance for 8-byte messages on the
following setup?
- 40 brokers,
- topic with 120 partitions
- 120 reading threads (on 30 machines)
- 120 writing threads (on 30 machines)

I'm getting a write throughput of ~75k elements/core/second and a read
throughput of ~50k el/c/s.
When I'm stopping the writers, the read throughput goes up to 130k.
I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per
partition .. or are the messages too small and the overhead is very high.

Which system out there would you recommend for getting reference
performance numbers? Samza, Spark, Storm?


On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira  wrote:

> This is not something you can use the consumer API to simply do easily
> (consumers don't have locality notion).
> I can imagine using Kafka's low-level API calls to get a list of
> partitions and the lead replica, figuring out which are local and
> using those - but that sounds painful.
>
> Are you 100% sure the performance issue is due to network latency? If
> not, you may want to start optimizing somewhere more productive :)
> Kafka brokers and clients both have Metrics that may help you track
> where the performance issues are coming from.
>
> Gwen
>
> On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger 
> wrote:
> > Hi Shef,
> >
> > did you resolve this issue?
> > I'm facing some performance issues and I was wondering whether reading
> > locally would resolve them.
> >
> > On Mon, Jun 22, 2015 at 11:43 PM, Shef  wrote:
> >
> >> Noob question here. I want to have a single consumer for each partition
> >> that consumes only the messages that have been written locally. In other
> >> words, I want the consumer to access the local disk and not pull
> anything
> >> across the network. Possible?
> >>
> >> How can I discover which partitions are local?
> >>
> >>
> >>
>


Re: Checkpointing with custom metadata

2015-08-04 Thread Jason Gustafson
I couldn't find a jira for this, so I added KAFKA-2403.

-Jason

On Tue, Aug 4, 2015 at 9:36 AM, Jay Kreps  wrote:

> Hey James,
>
> You are right the intended use of that was to have a way to capture some
> very small metadata about your state at the time of offset commit in an
> atomic way.
>
> That field isn't exposed but we do need to add it to the new consumer api
> (I think just no one has done it yet.
>
> -Jay
>
> On Mon, Aug 3, 2015 at 1:52 PM, James Cheng  wrote:
>
> > According to
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest
> ,
> > we can store custom metadata with our checkpoints. It looks like the high
> > level consumer does not support committing offsets with metadata, and
> that
> > in order to checkpoint with custom metadata, we have to issue the
> > OffsetCommitRequest ourselves. Is that correct?
> >
> > Thanks,
> > -James
> >
> >
>


Re: Decomissioning a broker

2015-08-04 Thread Grant Henke
The broker will actually unregister itself from zookeeper. The brokers id
path uses ephemeral nodes so they are automatically destroyed on shutdown.
In fact if you use a "Controlled Shutdown" migrating the replicas and
leaders should happen for you as well. Though, manual reassignment may be
preferred in your case.

Here is some extra information on controlled shutdowns:
http://kafka.apache.org/documentation.html#basic_ops_restarting

Thanks,
Grant

On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto  wrote:

> I’m sure this has been asked before, but I can’t seem to find the answer.
>
> I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1.  In doing
> so, I will be decommissioning a broker.  I plan to remove this broker fully
> from the cluster, and then reinstall it and use it for a different purpose.
>
> I understand how to use the reassign-partitions tool to generate new
> partition assignments and to move partitions around so that the target
> broker no longer has any active replicas.  Once that is done, is there
> anything special that needs to happen?  I can shutdown the broker, but as
> far as I know that broker will still be registered in Zookeeper.  Should I
> just delete the znode for that broker once it has been shut down?
>
> Thanks!
> -Andrew Otto
>
>


-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: Kafka Zookeeper Issues

2015-08-04 Thread Grant Henke
The /brokers/ids nodes are ephemeral nodes that only exists while the
brokers maintain a session to zookeeper. There is more information on
Kafka's Zookeeper usage here:
   - http://kafka.apache.org/documentation.html
  - look for "Broker Node Registry"
   -
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

Hopefully that helps debug your issue.

Thank you,
Grant

On Mon, Aug 3, 2015 at 5:20 AM, Wollert, Fabian 
wrote:

> hi everyone,
>
> we are trying to deploy Kafka 0.8.2.1 and Zookeeper on AWS using
> Cloudformation, ASG's and other Services. For Zookeeper we are using
> Netflix' Exhibitor (V 1.5.5) to ensure failover stability.
>
> What we are observing right now is that after some days our Brokers are not
> registered anymore in the "/brokers/ids" path in Zookeeper. I was trying to
> see when they get deleted to check the logs, but the ZK Transaction logs
> only shows the create stmt, no deletes or something (though deletes are
> written down there). Can someone explain me how the mechanism works with
> registering and deregistering in Zookeeper or point me to a doc or even
> source code, where this happens? Or some one has even some idea what
> happens there.
>
> Any experience on what to take care of deploying kafka on AWS (or generally
> a cloud env) would be also helpful.
>
> Cheers
>
> --
> *Fabian Wollert*
> Business Intelligence
>
> *POSTAL ADDRESS*
> Zalando SE
> 11501 Berlin
>
> *OFFICE*
> Zalando SE
> Mollstraße 1
> 10178 Berlin
> Germany
>
> Phone: +49 30 20968 1819
> Fax:   +49 30 27594 693
> E-Mail: fabian.woll...@zalando.de
> Web: www.zalando.de
> Jobs: jobs.zalando.de
>
> Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin
> Company registration: Amtsgericht Charlottenburg, HRB 158855 B
> Tax ID: 29/560/00596 * VAT registration number: DE 260543043
> Management Board: Robert Gentz, David Schneider, Rubin Ritter
> Chairperson of the Supervisory Board: Cristina Stenbeck
> Registered office: Berlinn
>



-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: Checkpointing with custom metadata

2015-08-04 Thread Jay Kreps
Hey James,

You are right the intended use of that was to have a way to capture some
very small metadata about your state at the time of offset commit in an
atomic way.

That field isn't exposed but we do need to add it to the new consumer api
(I think just no one has done it yet.

-Jay

On Mon, Aug 3, 2015 at 1:52 PM, James Cheng  wrote:

> According to
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
> we can store custom metadata with our checkpoints. It looks like the high
> level consumer does not support committing offsets with metadata, and that
> in order to checkpoint with custom metadata, we have to issue the
> OffsetCommitRequest ourselves. Is that correct?
>
> Thanks,
> -James
>
>


Re: Lead Broker from kafka.message.MessageAndMetadata

2015-08-04 Thread Grant Henke
Hi Sreeni,

Using the SimpleConsumer you can send a TopicMetadataRequest for a topic
and the TopicMetadataResponse will contain TopicMetadata for each topic
requested (or all) which contains PartitionMetadata for all all partitions.
The PartitionMetadata contains the leader, replicas, and isr.

Is that what you are looking for?

Thanks,
Grant

On Mon, Aug 3, 2015 at 7:26 AM, Sreenivasulu Nallapati <
sreenu.nallap...@gmail.com> wrote:

> Hello,
>
> Is there a way that we can find the lead broker
> from kafka.message.MessageAndMetadata class?
>
> My use case is simple, I have topic and partition and wanted to find out
> the lead broker for that partition.
>
> Please provide your insights
>
>
> Thanks
> Sreeni
>



-- 
Grant Henke
Software Engineer | Cloudera
gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke


Re: new consumer api?

2015-08-04 Thread Jason Gustafson
Hey Simon,

The new consumer has the ability to forego group management and assign
partitions directly. Once assigned, you can seek to any offset you want.

-Jason

On Tue, Aug 4, 2015 at 5:08 AM, Simon Cooper <
simon.coo...@featurespace.co.uk> wrote:

> Reading on the consumer docs, there's no mention of a relatively simple
> consumer that doesn't need groups, coordinators, commits, anything like
> that - just read and poll from specified offsets of specific topic
> partitions - but automatically deals with leadership changes and connection
> losses (so one level up from SimpleConsumer).
>
> Will the new API be able to be used in this relatively simple way?
> SimonC
>
> -Original Message-
> From: Jun Rao [mailto:j...@confluent.io]
> Sent: 03 August 2015 18:19
> To: users@kafka.apache.org
> Subject: Re: new consumer api?
>
> Jalpesh,
>
> We are still iterating on the new consumer a bit and are waiting for some
> of the security jiras to be committed. So now, we are shooting for
> releasing 0.8.3 in Oct (just updated
> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).
>
> Thanks,
>
> Jun
>
> On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia <
> jalpesh.pata...@clickbank.com> wrote:
>
> > Hello guys,
> >
> > A while ago i read that the new consumer api was going to be released
> > sometime in July as part of the 0.8.3/0.9 release.
> > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
> >
> >
> > Do we have an update when we think that can happen?
> >
> >
> > Thanks,
> >
> > Jalpesh
> >
> >
> > -- PRIVILEGED AND CONFIDENTIAL This transmission may contain
> > privileged, proprietary or confidential information. If you are not
> > the intended recipient, you are instructed not to review this
> > transmission. If you are not the intended recipient, please notify the
> > sender that you received this message and delete this transmission from
> your system.
> >
>


Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Rajasekar Elango
Here is an example on what sharninder suggested
http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/

Thanks,
Raja.

On Tue, Aug 4, 2015 at 12:01 PM, Sharninder  wrote:

> You can't. Kafka is essentially a queue, so you always read messages one
> by one. What you can do is disable auto offset commit, read 100 messages,
> process them and then manually commit offset.
>
> --
> Sharninder
>
> > On 04-Aug-2015, at 9:07 pm, shahab  wrote:
> >
> > Hi,
> >
> > While we the producer can put data as batch in kafka server,  I couldn't
> > find any API (or any document) saying how we can fetch data as batch from
> > Kafka ?
> > Even when data is placed as batch in kafka server, still using High Level
> > consumer I can only read one by one, and I can not specify. for example,
> > read 100 items at once!
> >
> > Is this correct observation? or I am missing something?
> >
> > best,
> > /Shahab
>



-- 
Thanks,
Raja.


Re: How to read in batch using HighLevel Consumer?

2015-08-04 Thread Sharninder
You can't. Kafka is essentially a queue, so you always read messages one by 
one. What you can do is disable auto offset commit, read 100 messages, process 
them and then manually commit offset.

--
Sharninder 

> On 04-Aug-2015, at 9:07 pm, shahab  wrote:
> 
> Hi,
> 
> While we the producer can put data as batch in kafka server,  I couldn't
> find any API (or any document) saying how we can fetch data as batch from
> Kafka ?
> Even when data is placed as batch in kafka server, still using High Level
> consumer I can only read one by one, and I can not specify. for example,
> read 100 items at once!
> 
> Is this correct observation? or I am missing something?
> 
> best,
> /Shahab


How to read in batch using HighLevel Consumer?

2015-08-04 Thread shahab
Hi,

While we the producer can put data as batch in kafka server,  I couldn't
find any API (or any document) saying how we can fetch data as batch from
Kafka ?
Even when data is placed as batch in kafka server, still using High Level
consumer I can only read one by one, and I can not specify. for example,
read 100 items at once!

Is this correct observation? or I am missing something?

best,
/Shahab


Get last snapshot from compacted topic

2015-08-04 Thread Aki
I'd like to save a snapshot of a processing node's state in a compacted kafka 
topic. A large number of nodes would save their snapshots in the same 
partition. 

What is an efficient way for a (restarted) node to find the offset of its 
latest snapshot? Using just Kafka (no database, local file, etc.), is there a 
more efficient way than to consume the partition from the earliest available 
offset (potentially reading a lot of snapshots of other nodes).

Thanks!

RE: new consumer api?

2015-08-04 Thread Simon Cooper
Reading on the consumer docs, there's no mention of a relatively simple 
consumer that doesn't need groups, coordinators, commits, anything like that - 
just read and poll from specified offsets of specific topic partitions - but 
automatically deals with leadership changes and connection losses (so one level 
up from SimpleConsumer).

Will the new API be able to be used in this relatively simple way?
SimonC

-Original Message-
From: Jun Rao [mailto:j...@confluent.io] 
Sent: 03 August 2015 18:19
To: users@kafka.apache.org
Subject: Re: new consumer api?

Jalpesh,

We are still iterating on the new consumer a bit and are waiting for some of 
the security jiras to be committed. So now, we are shooting for releasing 0.8.3 
in Oct (just updated 
https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).

Thanks,

Jun

On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia < 
jalpesh.pata...@clickbank.com> wrote:

> Hello guys,
>
> A while ago i read that the new consumer api was going to be released 
> sometime in July as part of the 0.8.3/0.9 release.
> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan
>
>
> Do we have an update when we think that can happen?
>
>
> Thanks,
>
> Jalpesh
>
>
> -- PRIVILEGED AND CONFIDENTIAL This transmission may contain 
> privileged, proprietary or confidential information. If you are not 
> the intended recipient, you are instructed not to review this 
> transmission. If you are not the intended recipient, please notify the 
> sender that you received this message and delete this transmission from your 
> system.
>


Re: 0.8.3 ETA?

2015-08-04 Thread Stevo Slavić
Thanks Jun for heads up!

On Mon, Aug 3, 2015 at 7:17 PM, Jun Rao  wrote:

> Hi, Stevo,
>
> Yes, we are still iterating on the new consumer a bit and are waiting for
> some of the security jiras to be committed. So now, we are shooting for
> releasing 0.8.3 in Oct (just updated
> https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan).
>
> As we are getting closer, we will clean up the 0.8.3 jiras and push
> non-critical ones to future releases.
>
> Thanks,
>
> Jun
>
> On Mon, Aug 3, 2015 at 5:52 AM, Stevo Slavić  wrote:
>
> > Hello Apache Kafka community,
> >
> > If I recall well, two weeks ago it was mentioned in a discussion that
> Kafka
> > 0.8.3 might be released in a month time.
> >
> > Is this still Kafka dev team goal, in few weeks time to have Kafka 0.8.3
> > released? Or is more (re)work (e.g. more new consumer API changes)
> planned
> > for 0.8.3 than already in JIRA, which would further delay 0.8.3 release?
> >
> > Btw, Kafka JIRA has quite a lot unresolved tickets targeting 0.8.3 as fix
> > version (see here
> > <
> >
> https://issues.apache.org/jira/browse/KAFKA-1853?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.3%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC
> > >
> > complete list).
> >
> > Kind regards,
> > Stevo Slavic.
> >
>


Re: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

2015-08-04 Thread Jilin Xie
Some suggestions:
 Check the existence of the topic.
 Check the firewall of the broker... Try telnet or something to
make sure it's available.
 Try run the producer on the broker machine.

Since you get this error, this code is functioning. I think it's some
configuration and parameter stuff leading to this problem.

On Tue, Aug 4, 2015 at 11:21 AM, David Li  wrote:

> Hi I have a very simple code to just send out one message, the topic is
> created automatically, but the message just cannot be sent out. I also
> tried to change the configuration, the result is still the same. Sorry to
> bother you all with this silly question.
>
> For your information, the kafka server is running on a docker container,
> which is run in a ubuntu server vm. The test class is run from IntelliJ
> IDEA on the host, which is a amc os x.
>
> public static void main(String[] args) {
> String topic = "test3";
>
> Properties props = new Properties();
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("metadata.broker.list", "192.168.144.10:29092");
> //props.put("retry.backoff.ms", "1000");
> //props.put("message.send.max.retries", "10");
> //props.put("topic.metadata.refresh.interval.ms", "0");
>
> Producer producer = new Producer String>(new ProducerConfig(props));
>
> int messageNo = 1;
> String messageStr = new String("Message_" + messageNo);
> producer.send(new KeyedMessage(topic,
> messageStr));
> }
>


Re: New Consumer API and Range Consumption with Fail-over

2015-08-04 Thread Bhavesh Mistry
Hi Jason and Kafka Dev Team,



First of all thanks for responding and I think you got expected behavior
correctly.



The use-case is offset range consumption.  We store each minute highest
offset for each topic per partition.  So if we need to reload or re-consume
data from yesterday per say 8AM to noon, we would have offset start mapping
at 8AM and end offset mapping at noon in Time Series Database.



I was trying to load this use case with New Consumer API.   Do you or Kafka
Dev team agree with request to either have API that takes in topic and its
start/end offset for High Level Consumer group  (With older consumer API we
used Simple consumer before without fail-over).  Also, for each
range-consumption, there will be different group id  and group id will not
be reused.  The main purpose is to reload or process past data again (due
to production bugs or downtime etc occasionally and let main consumer-group
continue to consume latest records).


void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[]
endOffsetPartitions)



or something similar which will allow following:



1)   When consumer group already exists (meaning have consumed data and
committed offset to storage system either Kafka or ZK) ignore start offset
positions and use committed offset.  If not committed use start Offset
Partition.

2)   When partition consumption has reached end Offset for given partition,
pause is fine or this assigned thread become fail over or wait for
reassignment.

3)   When all are Consumer Group is done consuming all partitions offset
ranges (start to end), gracefully shutdown entire consumer group.

4)   While consuming records, if one of node or consuming thread goes down
automatic fail-over to others (Similar to High Level Consumer for OLD
Consumer API.   I am not sure if there exists High level and/or Simple
Consumer concept for New API  )



I hope above explanation clarifies use-case and intended behavior.  Thanks
for clarifications, and you are correct we need pause(TopicPartition tp),
resume(TopicPartition tp), and/or API to set to end offset for each
partition.



Please do let us know your preference to support above simple use-case.


Thanks,


Bhavesh

On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson  wrote:

> Hi Bhavesh,
>
> I'm not totally sure I understand the expected behavior, but I think this
> can work. Instead of seeking to the start of the range before the poll
> loop, you should probably provide a ConsumerRebalanceCallback to get
> notifications when group assignment has changed (e.g. when one of your
> nodes dies). When a new partition is assigned, the callback will be invoked
> by the consumer and you can use it to check if there's a committed position
> in the range or if you need to seek to the beginning of the range. For
> example:
>
> void onPartitionsAssigned(consumer, partitions) {
>   for (partition : partitions) {
>  try {
>offset = consumer.committed(partition)
>consumer.seek(partition, offset)
>  } catch (NoOffsetForPartition) {
>consumer.seek(partition, rangeStart)
>  }
>   }
> }
>
> If a failure occurs, then the partitions will be rebalanced across
> whichever consumers are still active. The case of the entire cluster being
> rebooted is not really different. When the consumers come back, they check
> the committed position and resume where they left off. Does that make
> sense?
>
> After you are finished consuming a partition's range, you can use
> KafkaConsumer.pause(partition) to prevent further fetches from being
> initiated while still maintaining the current assignment. The patch to add
> pause() is not in trunk yet, but it probably will be before too long.
>
> One potential problem is that you wouldn't be able to reuse the same group
> to consume a different range because of the way it depends on the committed
> offsets. Kafka's commit API actually allows some additional metadata to go
> along with a committed offset and that could potentially be used to tie the
> commit to the range, but it's not yet exposed in KafkaConsumer. I assume it
> will be eventually, but I'm not sure whether that will be part of the
> initial release.
>
>
> Hope that helps!
>
> Jason
>
> On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hello Kafka Dev Team,
> >
> >
> > With new Consumer API redesign  (
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
> > ),  is there a capability to consume given the topic and partition
> start/
> > end position.  How would I achieve following use case of range
> consumption
> > with fail-over.
> >
> >
> > Use Case:
> > Ability to reload data given topic and its partition offset start/end
> with
> > High Level Consumer with fail over.   Basically, High Level Range
> > consumption and consumer group dies while main consumer group.
> >
> >
> > Suppose you have a topic called “test-topi