Re: InvalidMessageException: Message is corrupt, Consumer stuck
The high level consumer stores its state in ZooKeeper. Theoretically, you should be able to go into ZooKeeper, find the consumer-group, topic and partition, and increment the offset past the "corrupt" point. On Tue, Aug 4, 2015 at 10:23 PM, Henry Cai wrote: > Hi, > > We are using the Kafka high-level consumer 8.1.1, somehow we got a > corrupted message in the topic. We are not sure the root cause of this, > but the problem we are having now is the HL consumer is stuck in that > position: > > kafka.message.InvalidMessageException: Message is corrupt (stored crc = > 537685622, computed crc = 36513351) > > at kafka.message.Message.ensureValid(Message.scala:166) > > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) > > at > kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) > > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) > > > If we try to ignore that exception and iterate to the next message, the > iterator couldn't pass that error state: > > java.lang.IllegalStateException: Iterator is in failed state > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) > > > By looking at the code, looks like you can only calling > IteratorTemplate.resetState() to clear the state, but this is an internal > method, is this the right way to workaround this problem? >
InvalidMessageException: Message is corrupt, Consumer stuck
Hi, We are using the Kafka high-level consumer 8.1.1, somehow we got a corrupted message in the topic. We are not sure the root cause of this, but the problem we are having now is the HL consumer is stuck in that position: kafka.message.InvalidMessageException: Message is corrupt (stored crc = 537685622, computed crc = 36513351) at kafka.message.Message.ensureValid(Message.scala:166) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:102) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) If we try to ignore that exception and iterate to the next message, the iterator couldn't pass that error state: java.lang.IllegalStateException: Iterator is in failed state at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54) By looking at the code, looks like you can only calling IteratorTemplate.resetState() to clear the state, but this is an internal method, is this the right way to workaround this problem?
Re: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
you are right. after I run kafka in my localhost directly, it works just fine after further google, i found that need to set two parameters below if the kafka is running on some other machines #advertised.host.name= #advertised.port= more precisely, if the kafka is running within a docker container, the advertised.host.name should be set to the docker host ip, the advertised.port should be set to the mapped port to the docker host. thanks again. On Tue, Aug 4, 2015 at 4:58 PM, Jilin Xie wrote: > Some suggestions: > Check the existence of the topic. > Check the firewall of the broker... Try telnet or something to > make sure it's available. > Try run the producer on the broker machine. > > Since you get this error, this code is functioning. I think it's some > configuration and parameter stuff leading to this problem. > > On Tue, Aug 4, 2015 at 11:21 AM, David Li wrote: > > > Hi I have a very simple code to just send out one message, the topic is > > created automatically, but the message just cannot be sent out. I also > > tried to change the configuration, the result is still the same. Sorry to > > bother you all with this silly question. > > > > For your information, the kafka server is running on a docker container, > > which is run in a ubuntu server vm. The test class is run from IntelliJ > > IDEA on the host, which is a amc os x. > > > > public static void main(String[] args) { > > String topic = "test3"; > > > > Properties props = new Properties(); > > props.put("serializer.class", "kafka.serializer.StringEncoder"); > > props.put("metadata.broker.list", "192.168.144.10:29092"); > > //props.put("retry.backoff.ms", "1000"); > > //props.put("message.send.max.retries", "10"); > > //props.put("topic.metadata.refresh.interval.ms", "0"); > > > > Producer producer = new Producer > String>(new ProducerConfig(props)); > > > > int messageNo = 1; > > String messageStr = new String("Message_" + messageNo); > > producer.send(new KeyedMessage(topic, > > messageStr)); > > } > > >
Kafka Broker server cannot be connected by telnet from other Kafka Brokers
Hi Everyone, We're trying the deploy the Kafka behind the network balancer and we have created the port map for each Kafka brokers under that network balancer--we only have one public IP and the Kafka clients are in other system and thus cannot access the brokers via internal IP directly. So for example, we have the public IP 1.2.3.4, we map 1.2.3.4:9092 for broker1 and 1.2.3.4 : 9093 for broker2, etc. And in the server.properties, the advertised host and port will be 1.2.3.4:9092 and 1.2.3.4:9093 for broker 1 and broker 2 respectively. It works well at beginning. But then after several days with load, the replication between brokers fails due to connection timeout---it happens intermediately. But the outside connection to these Brokers are still working fine consistently. By looking at the tcpdump at the destination server, we find out that when timeout happens when the broker does not send TCP ACK back to the public IP 1.2.3.4. As you can see the Source IP here is the public IP, and the destination is the broker. The broker server does not send TCP ACK to the source. [image: Inline image 1] Did you see the similar problem before? Very appreciate for any kind of information. Thanks, Tony
Kafka vs RabbitMQ latency
Hi guys, I was reading a paper today in which the latency of kafka and rabbitmq is compared: http://downloads.hindawi.com/journals/js/2015/468047.pdf To my surprise, kafka has shown some large variations of latency as the number of records per second increases. So I am curious about why is that. Also in the ProducerPerformanceTest: in/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance test7 5000 100 -1 *acks=1* bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092 buffer.memory=67108864 batch.size=8196 Setting acks = 1 means the producer will wait for ack from leader replica, right? Could that be the reason which affects latency? If I set it to 0, it will make the producers send as fast as possible therefore the throughput can increase and latency decrease in the test results? Thanks for answering. best,
Re: Decomissioning a broker
Thats correct. Thanks for catching that. On Tue, Aug 4, 2015 at 3:27 PM, Andrew Otto wrote: > Thanks! > > > In fact if you use a "Controlled Shutdown" migrating the replicas and > > leaders should happen for you as well. > > Just to clarify, controlled shutdown will only move the leaders to other > replicas, right? It won’t actually migrate any replicas elsewhere. > > -Ao > > > > On Aug 4, 2015, at 13:00, Grant Henke wrote: > > > > The broker will actually unregister itself from zookeeper. The brokers id > > path uses ephemeral nodes so they are automatically destroyed on > shutdown. > > In fact if you use a "Controlled Shutdown" migrating the replicas and > > leaders should happen for you as well. Though, manual reassignment may be > > preferred in your case. > > > > Here is some extra information on controlled shutdowns: > > http://kafka.apache.org/documentation.html#basic_ops_restarting > > > > Thanks, > > Grant > > > > On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto > wrote: > > > >> I’m sure this has been asked before, but I can’t seem to find the > answer. > >> > >> I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1. In doing > >> so, I will be decommissioning a broker. I plan to remove this broker > fully > >> from the cluster, and then reinstall it and use it for a different > purpose. > >> > >> I understand how to use the reassign-partitions tool to generate new > >> partition assignments and to move partitions around so that the target > >> broker no longer has any active replicas. Once that is done, is there > >> anything special that needs to happen? I can shutdown the broker, but > as > >> far as I know that broker will still be registered in Zookeeper. > Should I > >> just delete the znode for that broker once it has been shut down? > >> > >> Thanks! > >> -Andrew Otto > >> > >> > > > > > > -- > > Grant Henke > > Software Engineer | Cloudera > > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke > > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: message filterin or "selector"
The way Kafka is currently implemented is that Kafka is not aware of the content of messages, so there is no Selector logic available. The way to go is to implement the Selector in your client - i.e. your consume() loop will get all messages but will throw away those that don't fit your pattern. It may be worthwhile to add a ticket for pluggable selector logic in the new consumer. I can't guarantee it will happen, there are infinite things that can be plugged into consumers and we need to draw the line somewhere, but worth a discussion. On Tue, Aug 4, 2015 at 2:05 PM, Alvaro Gareppe wrote: > The is way to implement a "selector" logic in kafka (similar to JMS > selectors) > > So, allow to consume a message if only the message contains certain header > or content ? > > I'm evaluating to migrate from ActiveMQ to kafka and I'm using the selector > logic widely in the application > > -- > Ing. Alvaro Gareppe > agare...@gmail.com >
Re: Consumer that consumes only local partition?
Hi Robert Here is the kafka benchmark for your reference. if you want to use Flink, Storm, Samza or Spark, the performance will be going down. 821,557 records/sec(78.3 MB/sec) https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines Best regards Hawin On Tue, Aug 4, 2015 at 11:57 AM, Robert Metzger wrote: > Sorry for the very late reply ... > > The performance issue was not caused by network latency. I had a job like > this: > FlinkKafkaConsumer --> someSimpleOperation --> FlinkKafkaProducer. > > I thought that our FlinkKafkaConsumer is slow, but actually our > FlinkKafkaProducer was using the old producer API of Kafka. Switching to > the new producer API of Kafka greatly improved our writing performance to > Kafka. Flink was slowing down the KafkaConsumer because of the producer. > > Since we are already talking about performance, let me ask you the > following question: > I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What > would you consider a good read/write performance for 8-byte messages on the > following setup? > - 40 brokers, > - topic with 120 partitions > - 120 reading threads (on 30 machines) > - 120 writing threads (on 30 machines) > > I'm getting a write throughput of ~75k elements/core/second and a read > throughput of ~50k el/c/s. > When I'm stopping the writers, the read throughput goes up to 130k. > I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per > partition .. or are the messages too small and the overhead is very high. > > Which system out there would you recommend for getting reference > performance numbers? Samza, Spark, Storm? > > > On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira > wrote: > > > This is not something you can use the consumer API to simply do easily > > (consumers don't have locality notion). > > I can imagine using Kafka's low-level API calls to get a list of > > partitions and the lead replica, figuring out which are local and > > using those - but that sounds painful. > > > > Are you 100% sure the performance issue is due to network latency? If > > not, you may want to start optimizing somewhere more productive :) > > Kafka brokers and clients both have Metrics that may help you track > > where the performance issues are coming from. > > > > Gwen > > > > On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger > > wrote: > > > Hi Shef, > > > > > > did you resolve this issue? > > > I'm facing some performance issues and I was wondering whether reading > > > locally would resolve them. > > > > > > On Mon, Jun 22, 2015 at 11:43 PM, Shef wrote: > > > > > >> Noob question here. I want to have a single consumer for each > partition > > >> that consumes only the messages that have been written locally. In > other > > >> words, I want the consumer to access the local disk and not pull > > anything > > >> across the network. Possible? > > >> > > >> How can I discover which partitions are local? > > >> > > >> > > >> > > >
Re: Got conflicted ephemeral node exception for several hours
I am on Kafka 0.8.2.1 (Java 8) and have happened to run into this same issue where the KafkaServer (broker) goes into a indefinite while loop writing out this message: [2015-08-04 15:45:12,350] INFO conflict in /brokers/ids/0 data: {"jmx_port":-1,"timestamp":"1438661432074","host":"foo-bar","version":1,"port":9092} stored data: {"jmx_port":-1,"timestamp":"1438661429589","host":"foo-bar","version":1,"port":9092} (kafka.utils.ZkUtils$) [2015-08-04 15:45:12,352] INFO I wrote this conflicted ephemeral node [{"jmx_port":-1,"timestamp":"1438661432074","host":"foo-bar","version":1,"port":9092}] at /brokers/ids/0 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$) These above 2 lines have been repeating continuously every few seconds for the past 20 odd hours on this broker and this broker has been rendered useless and is contributing to high CPU usage. As a result the consumers have gone into a state where they no longer consume the messages. Furthermore, this continuous looping has put Kafka process on top of the CPU usage. I understand that bouncing the consumer is an option and probably will "fix" this, but in our real production environments, we won't be able to bounce the consumers. I currently have access to logs (some of which has been pasted here). Is there any chance these logs help in narrowing down the issue and fixing the root cause. Can we also please add a retry max limit kind of thing in this Zookeeper node creation logic instead of going into a indefinite while loop? I have maintained the original timestamps in the logs so as to help narrow down the issue. The 1438661432074 (milli second) in the log translates to Aug 03 2015 21:10:32 (PDT) and 1438661429589 translates to Aug 03 2015 21:10:30 (PDT). I have included that part of the log snippet from the server.log of the broker (10.95.100.31). [2015-08-03 21:10:29,805] ERROR Closing socket for /10.95.100.31 because of error (kafka.network.Processor) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745) [2015-08-03 21:10:29,938] ERROR Closing socket for /10.95.100.31 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at kafka.utils.Utils$.read(Utils.scala:380) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:444) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745) [2015-08-03 21:10:30,045] ERROR Closing socket for /10.95.100.31 because of error (kafka.network.Processor) java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745) <<< a lot more similar exceptions >>> [2015-08-03 21:10:31,304] INFO Closing socket connection to /10.95.100.31. (kafka.network.Processor) [2015-08-03 21:10:31,397] INFO Closing socket connection to /10.95.100.31. (kafka.network.Processor) [2015-08-03 21:10:31,399] INFO Closing socket connection to /10.95.100.31. (kafka.network.Processor) [2015-08-03 21:10:31,445] INFO Closing socket connection to /10.95.100.31. (kafka.network.Processor) bunch of similar logs as above [2015-08-03 21:10:31,784] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [] (kafka.server.ReplicaFetcherManager) [2015-08-03 21:10:31,860] INFO C
Re: Access control in kafka
If this (https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+I nterface) is what you need then watch for https://reviews.apache.org/r/34492/ to get committed to trunk. Thanks Parth On 8/4/15, 1:57 PM, "Alvaro Gareppe" wrote: >Can someone point me to documentation about access control in kafka. There >is something implemented in the current or plan for future versions ? > >I need something that allows me to define what users are allowed to >connect >to certain topic, and of course user management. > >Thank you guys in advance! > >-- >Eng. Alvaro Gareppe
message filterin or "selector"
The is way to implement a "selector" logic in kafka (similar to JMS selectors) So, allow to consume a message if only the message contains certain header or content ? I'm evaluating to migrate from ActiveMQ to kafka and I'm using the selector logic widely in the application -- Ing. Alvaro Gareppe agare...@gmail.com
Access control in kafka
Can someone point me to documentation about access control in kafka. There is something implemented in the current or plan for future versions ? I need something that allows me to define what users are allowed to connect to certain topic, and of course user management. Thank you guys in advance! -- Eng. Alvaro Gareppe
Re: Decomissioning a broker
Thanks! > In fact if you use a "Controlled Shutdown" migrating the replicas and > leaders should happen for you as well. Just to clarify, controlled shutdown will only move the leaders to other replicas, right? It won’t actually migrate any replicas elsewhere. -Ao > On Aug 4, 2015, at 13:00, Grant Henke wrote: > > The broker will actually unregister itself from zookeeper. The brokers id > path uses ephemeral nodes so they are automatically destroyed on shutdown. > In fact if you use a "Controlled Shutdown" migrating the replicas and > leaders should happen for you as well. Though, manual reassignment may be > preferred in your case. > > Here is some extra information on controlled shutdowns: > http://kafka.apache.org/documentation.html#basic_ops_restarting > > Thanks, > Grant > > On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto wrote: > >> I’m sure this has been asked before, but I can’t seem to find the answer. >> >> I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1. In doing >> so, I will be decommissioning a broker. I plan to remove this broker fully >> from the cluster, and then reinstall it and use it for a different purpose. >> >> I understand how to use the reassign-partitions tool to generate new >> partition assignments and to move partitions around so that the target >> broker no longer has any active replicas. Once that is done, is there >> anything special that needs to happen? I can shutdown the broker, but as >> far as I know that broker will still be registered in Zookeeper. Should I >> just delete the znode for that broker once it has been shut down? >> >> Thanks! >> -Andrew Otto >> >> > > > -- > Grant Henke > Software Engineer | Cloudera > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: How to read in batch using HighLevel Consumer?
To add some internals, the high level consumer actually does read entire batches from Kafka. It just exposes them to the user in an event loop, because its a very natural API. Users can then batch events the way they prefer. So if you are worried about batches being more efficient than single events, you are covered! Gwen On Tue, Aug 4, 2015 at 12:04 PM, shahab wrote: > Thanks a lot Shaminder for clarification and thanks Raja for pointing me to > the example. > > best, > /shahab > > On Tue, Aug 4, 2015 at 6:06 PM, Rajasekar Elango > wrote: > > > Here is an example on what sharninder suggested > > > > > http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/ > > > > Thanks, > > Raja. > > > > On Tue, Aug 4, 2015 at 12:01 PM, Sharninder > wrote: > > > > > You can't. Kafka is essentially a queue, so you always read messages > one > > > by one. What you can do is disable auto offset commit, read 100 > messages, > > > process them and then manually commit offset. > > > > > > -- > > > Sharninder > > > > > > > On 04-Aug-2015, at 9:07 pm, shahab wrote: > > > > > > > > Hi, > > > > > > > > While we the producer can put data as batch in kafka server, I > > couldn't > > > > find any API (or any document) saying how we can fetch data as batch > > from > > > > Kafka ? > > > > Even when data is placed as batch in kafka server, still using High > > Level > > > > consumer I can only read one by one, and I can not specify. for > > example, > > > > read 100 items at once! > > > > > > > > Is this correct observation? or I am missing something? > > > > > > > > best, > > > > /Shahab > > > > > > > > > > > -- > > Thanks, > > Raja. > > >
Re: How to read in batch using HighLevel Consumer?
Thanks a lot Shaminder for clarification and thanks Raja for pointing me to the example. best, /shahab On Tue, Aug 4, 2015 at 6:06 PM, Rajasekar Elango wrote: > Here is an example on what sharninder suggested > > http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/ > > Thanks, > Raja. > > On Tue, Aug 4, 2015 at 12:01 PM, Sharninder wrote: > > > You can't. Kafka is essentially a queue, so you always read messages one > > by one. What you can do is disable auto offset commit, read 100 messages, > > process them and then manually commit offset. > > > > -- > > Sharninder > > > > > On 04-Aug-2015, at 9:07 pm, shahab wrote: > > > > > > Hi, > > > > > > While we the producer can put data as batch in kafka server, I > couldn't > > > find any API (or any document) saying how we can fetch data as batch > from > > > Kafka ? > > > Even when data is placed as batch in kafka server, still using High > Level > > > consumer I can only read one by one, and I can not specify. for > example, > > > read 100 items at once! > > > > > > Is this correct observation? or I am missing something? > > > > > > best, > > > /Shahab > > > > > > -- > Thanks, > Raja. >
Re: Consumer that consumes only local partition?
Sorry for the very late reply ... The performance issue was not caused by network latency. I had a job like this: FlinkKafkaConsumer --> someSimpleOperation --> FlinkKafkaProducer. I thought that our FlinkKafkaConsumer is slow, but actually our FlinkKafkaProducer was using the old producer API of Kafka. Switching to the new producer API of Kafka greatly improved our writing performance to Kafka. Flink was slowing down the KafkaConsumer because of the producer. Since we are already talking about performance, let me ask you the following question: I am using Kafka and Flink on a HDP 2.2 cluster (with 40 machines). What would you consider a good read/write performance for 8-byte messages on the following setup? - 40 brokers, - topic with 120 partitions - 120 reading threads (on 30 machines) - 120 writing threads (on 30 machines) I'm getting a write throughput of ~75k elements/core/second and a read throughput of ~50k el/c/s. When I'm stopping the writers, the read throughput goes up to 130k. I would expect a higher throughput than (8*75000) / 1024 = 585.9 kb/sec per partition .. or are the messages too small and the overhead is very high. Which system out there would you recommend for getting reference performance numbers? Samza, Spark, Storm? On Wed, Jul 15, 2015 at 7:20 PM, Gwen Shapira wrote: > This is not something you can use the consumer API to simply do easily > (consumers don't have locality notion). > I can imagine using Kafka's low-level API calls to get a list of > partitions and the lead replica, figuring out which are local and > using those - but that sounds painful. > > Are you 100% sure the performance issue is due to network latency? If > not, you may want to start optimizing somewhere more productive :) > Kafka brokers and clients both have Metrics that may help you track > where the performance issues are coming from. > > Gwen > > On Wed, Jul 15, 2015 at 9:24 AM, Robert Metzger > wrote: > > Hi Shef, > > > > did you resolve this issue? > > I'm facing some performance issues and I was wondering whether reading > > locally would resolve them. > > > > On Mon, Jun 22, 2015 at 11:43 PM, Shef wrote: > > > >> Noob question here. I want to have a single consumer for each partition > >> that consumes only the messages that have been written locally. In other > >> words, I want the consumer to access the local disk and not pull > anything > >> across the network. Possible? > >> > >> How can I discover which partitions are local? > >> > >> > >> >
Re: Checkpointing with custom metadata
I couldn't find a jira for this, so I added KAFKA-2403. -Jason On Tue, Aug 4, 2015 at 9:36 AM, Jay Kreps wrote: > Hey James, > > You are right the intended use of that was to have a way to capture some > very small metadata about your state at the time of offset commit in an > atomic way. > > That field isn't exposed but we do need to add it to the new consumer api > (I think just no one has done it yet. > > -Jay > > On Mon, Aug 3, 2015 at 1:52 PM, James Cheng wrote: > > > According to > > > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest > , > > we can store custom metadata with our checkpoints. It looks like the high > > level consumer does not support committing offsets with metadata, and > that > > in order to checkpoint with custom metadata, we have to issue the > > OffsetCommitRequest ourselves. Is that correct? > > > > Thanks, > > -James > > > > >
Re: Decomissioning a broker
The broker will actually unregister itself from zookeeper. The brokers id path uses ephemeral nodes so they are automatically destroyed on shutdown. In fact if you use a "Controlled Shutdown" migrating the replicas and leaders should happen for you as well. Though, manual reassignment may be preferred in your case. Here is some extra information on controlled shutdowns: http://kafka.apache.org/documentation.html#basic_ops_restarting Thanks, Grant On Thu, Jul 30, 2015 at 4:37 PM, Andrew Otto wrote: > I’m sure this has been asked before, but I can’t seem to find the answer. > > I’m planning a Kafka cluster expansion and upgrade to 0.8.2.1. In doing > so, I will be decommissioning a broker. I plan to remove this broker fully > from the cluster, and then reinstall it and use it for a different purpose. > > I understand how to use the reassign-partitions tool to generate new > partition assignments and to move partitions around so that the target > broker no longer has any active replicas. Once that is done, is there > anything special that needs to happen? I can shutdown the broker, but as > far as I know that broker will still be registered in Zookeeper. Should I > just delete the znode for that broker once it has been shut down? > > Thanks! > -Andrew Otto > > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: Kafka Zookeeper Issues
The /brokers/ids nodes are ephemeral nodes that only exists while the brokers maintain a session to zookeeper. There is more information on Kafka's Zookeeper usage here: - http://kafka.apache.org/documentation.html - look for "Broker Node Registry" - https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper Hopefully that helps debug your issue. Thank you, Grant On Mon, Aug 3, 2015 at 5:20 AM, Wollert, Fabian wrote: > hi everyone, > > we are trying to deploy Kafka 0.8.2.1 and Zookeeper on AWS using > Cloudformation, ASG's and other Services. For Zookeeper we are using > Netflix' Exhibitor (V 1.5.5) to ensure failover stability. > > What we are observing right now is that after some days our Brokers are not > registered anymore in the "/brokers/ids" path in Zookeeper. I was trying to > see when they get deleted to check the logs, but the ZK Transaction logs > only shows the create stmt, no deletes or something (though deletes are > written down there). Can someone explain me how the mechanism works with > registering and deregistering in Zookeeper or point me to a doc or even > source code, where this happens? Or some one has even some idea what > happens there. > > Any experience on what to take care of deploying kafka on AWS (or generally > a cloud env) would be also helpful. > > Cheers > > -- > *Fabian Wollert* > Business Intelligence > > *POSTAL ADDRESS* > Zalando SE > 11501 Berlin > > *OFFICE* > Zalando SE > Mollstraße 1 > 10178 Berlin > Germany > > Phone: +49 30 20968 1819 > Fax: +49 30 27594 693 > E-Mail: fabian.woll...@zalando.de > Web: www.zalando.de > Jobs: jobs.zalando.de > > Zalando SE, Tamara-Danz-Straße 1, 10243 Berlin > Company registration: Amtsgericht Charlottenburg, HRB 158855 B > Tax ID: 29/560/00596 * VAT registration number: DE 260543043 > Management Board: Robert Gentz, David Schneider, Rubin Ritter > Chairperson of the Supervisory Board: Cristina Stenbeck > Registered office: Berlinn > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: Checkpointing with custom metadata
Hey James, You are right the intended use of that was to have a way to capture some very small metadata about your state at the time of offset commit in an atomic way. That field isn't exposed but we do need to add it to the new consumer api (I think just no one has done it yet. -Jay On Mon, Aug 3, 2015 at 1:52 PM, James Cheng wrote: > According to > https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest, > we can store custom metadata with our checkpoints. It looks like the high > level consumer does not support committing offsets with metadata, and that > in order to checkpoint with custom metadata, we have to issue the > OffsetCommitRequest ourselves. Is that correct? > > Thanks, > -James > >
Re: Lead Broker from kafka.message.MessageAndMetadata
Hi Sreeni, Using the SimpleConsumer you can send a TopicMetadataRequest for a topic and the TopicMetadataResponse will contain TopicMetadata for each topic requested (or all) which contains PartitionMetadata for all all partitions. The PartitionMetadata contains the leader, replicas, and isr. Is that what you are looking for? Thanks, Grant On Mon, Aug 3, 2015 at 7:26 AM, Sreenivasulu Nallapati < sreenu.nallap...@gmail.com> wrote: > Hello, > > Is there a way that we can find the lead broker > from kafka.message.MessageAndMetadata class? > > My use case is simple, I have topic and partition and wanted to find out > the lead broker for that partition. > > Please provide your insights > > > Thanks > Sreeni > -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: new consumer api?
Hey Simon, The new consumer has the ability to forego group management and assign partitions directly. Once assigned, you can seek to any offset you want. -Jason On Tue, Aug 4, 2015 at 5:08 AM, Simon Cooper < simon.coo...@featurespace.co.uk> wrote: > Reading on the consumer docs, there's no mention of a relatively simple > consumer that doesn't need groups, coordinators, commits, anything like > that - just read and poll from specified offsets of specific topic > partitions - but automatically deals with leadership changes and connection > losses (so one level up from SimpleConsumer). > > Will the new API be able to be used in this relatively simple way? > SimonC > > -Original Message- > From: Jun Rao [mailto:j...@confluent.io] > Sent: 03 August 2015 18:19 > To: users@kafka.apache.org > Subject: Re: new consumer api? > > Jalpesh, > > We are still iterating on the new consumer a bit and are waiting for some > of the security jiras to be committed. So now, we are shooting for > releasing 0.8.3 in Oct (just updated > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan). > > Thanks, > > Jun > > On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia < > jalpesh.pata...@clickbank.com> wrote: > > > Hello guys, > > > > A while ago i read that the new consumer api was going to be released > > sometime in July as part of the 0.8.3/0.9 release. > > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan > > > > > > Do we have an update when we think that can happen? > > > > > > Thanks, > > > > Jalpesh > > > > > > -- PRIVILEGED AND CONFIDENTIAL This transmission may contain > > privileged, proprietary or confidential information. If you are not > > the intended recipient, you are instructed not to review this > > transmission. If you are not the intended recipient, please notify the > > sender that you received this message and delete this transmission from > your system. > > >
Re: How to read in batch using HighLevel Consumer?
Here is an example on what sharninder suggested http://ingest.tips/2014/10/12/kafka-high-level-consumer-frequently-missing-pieces/ Thanks, Raja. On Tue, Aug 4, 2015 at 12:01 PM, Sharninder wrote: > You can't. Kafka is essentially a queue, so you always read messages one > by one. What you can do is disable auto offset commit, read 100 messages, > process them and then manually commit offset. > > -- > Sharninder > > > On 04-Aug-2015, at 9:07 pm, shahab wrote: > > > > Hi, > > > > While we the producer can put data as batch in kafka server, I couldn't > > find any API (or any document) saying how we can fetch data as batch from > > Kafka ? > > Even when data is placed as batch in kafka server, still using High Level > > consumer I can only read one by one, and I can not specify. for example, > > read 100 items at once! > > > > Is this correct observation? or I am missing something? > > > > best, > > /Shahab > -- Thanks, Raja.
Re: How to read in batch using HighLevel Consumer?
You can't. Kafka is essentially a queue, so you always read messages one by one. What you can do is disable auto offset commit, read 100 messages, process them and then manually commit offset. -- Sharninder > On 04-Aug-2015, at 9:07 pm, shahab wrote: > > Hi, > > While we the producer can put data as batch in kafka server, I couldn't > find any API (or any document) saying how we can fetch data as batch from > Kafka ? > Even when data is placed as batch in kafka server, still using High Level > consumer I can only read one by one, and I can not specify. for example, > read 100 items at once! > > Is this correct observation? or I am missing something? > > best, > /Shahab
How to read in batch using HighLevel Consumer?
Hi, While we the producer can put data as batch in kafka server, I couldn't find any API (or any document) saying how we can fetch data as batch from Kafka ? Even when data is placed as batch in kafka server, still using High Level consumer I can only read one by one, and I can not specify. for example, read 100 items at once! Is this correct observation? or I am missing something? best, /Shahab
Get last snapshot from compacted topic
I'd like to save a snapshot of a processing node's state in a compacted kafka topic. A large number of nodes would save their snapshots in the same partition. What is an efficient way for a (restarted) node to find the offset of its latest snapshot? Using just Kafka (no database, local file, etc.), is there a more efficient way than to consume the partition from the earliest available offset (potentially reading a lot of snapshots of other nodes). Thanks!
RE: new consumer api?
Reading on the consumer docs, there's no mention of a relatively simple consumer that doesn't need groups, coordinators, commits, anything like that - just read and poll from specified offsets of specific topic partitions - but automatically deals with leadership changes and connection losses (so one level up from SimpleConsumer). Will the new API be able to be used in this relatively simple way? SimonC -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: 03 August 2015 18:19 To: users@kafka.apache.org Subject: Re: new consumer api? Jalpesh, We are still iterating on the new consumer a bit and are waiting for some of the security jiras to be committed. So now, we are shooting for releasing 0.8.3 in Oct (just updated https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan). Thanks, Jun On Mon, Aug 3, 2015 at 8:41 AM, Jalpesh Patadia < jalpesh.pata...@clickbank.com> wrote: > Hello guys, > > A while ago i read that the new consumer api was going to be released > sometime in July as part of the 0.8.3/0.9 release. > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan > > > Do we have an update when we think that can happen? > > > Thanks, > > Jalpesh > > > -- PRIVILEGED AND CONFIDENTIAL This transmission may contain > privileged, proprietary or confidential information. If you are not > the intended recipient, you are instructed not to review this > transmission. If you are not the intended recipient, please notify the > sender that you received this message and delete this transmission from your > system. >
Re: 0.8.3 ETA?
Thanks Jun for heads up! On Mon, Aug 3, 2015 at 7:17 PM, Jun Rao wrote: > Hi, Stevo, > > Yes, we are still iterating on the new consumer a bit and are waiting for > some of the security jiras to be committed. So now, we are shooting for > releasing 0.8.3 in Oct (just updated > https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan). > > As we are getting closer, we will clean up the 0.8.3 jiras and push > non-critical ones to future releases. > > Thanks, > > Jun > > On Mon, Aug 3, 2015 at 5:52 AM, Stevo Slavić wrote: > > > Hello Apache Kafka community, > > > > If I recall well, two weeks ago it was mentioned in a discussion that > Kafka > > 0.8.3 might be released in a month time. > > > > Is this still Kafka dev team goal, in few weeks time to have Kafka 0.8.3 > > released? Or is more (re)work (e.g. more new consumer API changes) > planned > > for 0.8.3 than already in JIRA, which would further delay 0.8.3 release? > > > > Btw, Kafka JIRA has quite a lot unresolved tickets targeting 0.8.3 as fix > > version (see here > > < > > > https://issues.apache.org/jira/browse/KAFKA-1853?jql=project%20%3D%20KAFKA%20AND%20fixVersion%20%3D%200.8.3%20AND%20resolution%20%3D%20Unresolved%20ORDER%20BY%20due%20ASC%2C%20priority%20DESC%2C%20created%20ASC > > > > > complete list). > > > > Kind regards, > > Stevo Slavic. > > >
Re: kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
Some suggestions: Check the existence of the topic. Check the firewall of the broker... Try telnet or something to make sure it's available. Try run the producer on the broker machine. Since you get this error, this code is functioning. I think it's some configuration and parameter stuff leading to this problem. On Tue, Aug 4, 2015 at 11:21 AM, David Li wrote: > Hi I have a very simple code to just send out one message, the topic is > created automatically, but the message just cannot be sent out. I also > tried to change the configuration, the result is still the same. Sorry to > bother you all with this silly question. > > For your information, the kafka server is running on a docker container, > which is run in a ubuntu server vm. The test class is run from IntelliJ > IDEA on the host, which is a amc os x. > > public static void main(String[] args) { > String topic = "test3"; > > Properties props = new Properties(); > props.put("serializer.class", "kafka.serializer.StringEncoder"); > props.put("metadata.broker.list", "192.168.144.10:29092"); > //props.put("retry.backoff.ms", "1000"); > //props.put("message.send.max.retries", "10"); > //props.put("topic.metadata.refresh.interval.ms", "0"); > > Producer producer = new Producer String>(new ProducerConfig(props)); > > int messageNo = 1; > String messageStr = new String("Message_" + messageNo); > producer.send(new KeyedMessage(topic, > messageStr)); > } >
Re: New Consumer API and Range Consumption with Fail-over
Hi Jason and Kafka Dev Team, First of all thanks for responding and I think you got expected behavior correctly. The use-case is offset range consumption. We store each minute highest offset for each topic per partition. So if we need to reload or re-consume data from yesterday per say 8AM to noon, we would have offset start mapping at 8AM and end offset mapping at noon in Time Series Database. I was trying to load this use case with New Consumer API. Do you or Kafka Dev team agree with request to either have API that takes in topic and its start/end offset for High Level Consumer group (With older consumer API we used Simple consumer before without fail-over). Also, for each range-consumption, there will be different group id and group id will not be reused. The main purpose is to reload or process past data again (due to production bugs or downtime etc occasionally and let main consumer-group continue to consume latest records). void subscribe(TopicPartition[] startOffsetPartitions, TopicPartition[] endOffsetPartitions) or something similar which will allow following: 1) When consumer group already exists (meaning have consumed data and committed offset to storage system either Kafka or ZK) ignore start offset positions and use committed offset. If not committed use start Offset Partition. 2) When partition consumption has reached end Offset for given partition, pause is fine or this assigned thread become fail over or wait for reassignment. 3) When all are Consumer Group is done consuming all partitions offset ranges (start to end), gracefully shutdown entire consumer group. 4) While consuming records, if one of node or consuming thread goes down automatic fail-over to others (Similar to High Level Consumer for OLD Consumer API. I am not sure if there exists High level and/or Simple Consumer concept for New API ) I hope above explanation clarifies use-case and intended behavior. Thanks for clarifications, and you are correct we need pause(TopicPartition tp), resume(TopicPartition tp), and/or API to set to end offset for each partition. Please do let us know your preference to support above simple use-case. Thanks, Bhavesh On Thu, Jul 30, 2015 at 1:23 PM, Jason Gustafson wrote: > Hi Bhavesh, > > I'm not totally sure I understand the expected behavior, but I think this > can work. Instead of seeking to the start of the range before the poll > loop, you should probably provide a ConsumerRebalanceCallback to get > notifications when group assignment has changed (e.g. when one of your > nodes dies). When a new partition is assigned, the callback will be invoked > by the consumer and you can use it to check if there's a committed position > in the range or if you need to seek to the beginning of the range. For > example: > > void onPartitionsAssigned(consumer, partitions) { > for (partition : partitions) { > try { >offset = consumer.committed(partition) >consumer.seek(partition, offset) > } catch (NoOffsetForPartition) { >consumer.seek(partition, rangeStart) > } > } > } > > If a failure occurs, then the partitions will be rebalanced across > whichever consumers are still active. The case of the entire cluster being > rebooted is not really different. When the consumers come back, they check > the committed position and resume where they left off. Does that make > sense? > > After you are finished consuming a partition's range, you can use > KafkaConsumer.pause(partition) to prevent further fetches from being > initiated while still maintaining the current assignment. The patch to add > pause() is not in trunk yet, but it probably will be before too long. > > One potential problem is that you wouldn't be able to reuse the same group > to consume a different range because of the way it depends on the committed > offsets. Kafka's commit API actually allows some additional metadata to go > along with a committed offset and that could potentially be used to tie the > commit to the range, but it's not yet exposed in KafkaConsumer. I assume it > will be eventually, but I'm not sure whether that will be part of the > initial release. > > > Hope that helps! > > Jason > > On Thu, Jul 30, 2015 at 7:54 AM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com> > wrote: > > > Hello Kafka Dev Team, > > > > > > With new Consumer API redesign ( > > > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > > ), is there a capability to consume given the topic and partition > start/ > > end position. How would I achieve following use case of range > consumption > > with fail-over. > > > > > > Use Case: > > Ability to reload data given topic and its partition offset start/end > with > > High Level Consumer with fail over. Basically, High Level Range > > consumption and consumer group dies while main consumer group. > > > > > > Suppose you have a topic called “test-topi