At-least-once guarantees with high-level consumer
Hi Carl, ** Disclaimer: I know there's a new consumer API on the way, this mail is about the currently available API. I also apologise if the below has already been discussed previously. I did try to check previous discussions It seems to me that the high-level consumer would be able to support at-least-once messaging, even if one uses auto-commit, by changing kafka.consumer.ConsumerIterator.next() to call currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a consumer thread for a KafkaStream could just loop: while (true) { MyMessage message = iterator.next().message(); process(message); } Each call to iterator.next() then updates the offset to commit to the end of the message that was just processed. When offsets are committed for the ConsumerConnector (either automatically or manually), the commit will not include offsets of messages that haven't been fully processed. I've tested the following ConsumerIterator.next(), and it seems to work as I expect What you have proposed seems very reasonable. Essentially we need two sets of offsets (one which is published to zookeeper) and one which is used for tracking which event to return from the iterator locally on the machine. (It is like prev/current setup). In my opinion this is how it should have been implemented in the first place rather than forcing everyone to write additional code just for the at least once guarantee. If you push for this in open source you will have my vote at the very least :) -Abhishek
Re: Recovering from broker failure with KafkaConsumer
We have a couple open tickets to address these issues (see KAFKA-1894 and KAFKA-2168). It's definitely something we want to fix. On Wed, Jun 17, 2015 at 4:21 AM, Jan Stette jan.ste...@gmail.com wrote: Adding some more details to the previous question: The indefinite wait doesn't happen on calling subscribe() on the consumer, it happens when I (in this case) call seekToEnd(). A related problem to this is that the seekToEnd() method is synchronized (as are the other access methods on KafkaConsumer), so the client holds a lock while sitting in this wait. This means that if another thread tries to call close(), which is all synchronized, this thread will also be blocked. Holding locks while performing network I/O seems like a bad idea - is this something that's planned to be fixed? Jan On 17 June 2015 at 10:31, Jan Stette jan.ste...@gmail.com wrote: I'm trying out the new KafkaConsumer client API in the trunk of the source tree, and while I realise that this is a work in progress, I have a question that perhaps someone can shed some light on. I'm looking at how to handle various error scenarios for a Kafka client, in particular what happens when trying to connect to the broker but it's not available. The behaviour I'm seeing is that the client will retry indefinitely (at the configurable interval), basically looping around in Fetcher.awaitMetadataUpdate() forever. I would like to have some way to fail the connection attempt to avoid the calling thread being blocked forever. Is this possible with the current version of the client? (Snapshot as of 16/6/15). If not, is that something that's planned for the future? Jan
Broker ISR confusion with soft-failed broker
3 Node Kafka - 0.8.2.1 3 node ZK - 3.4.6 We experienced a soft-node failure from one of our brokers (#2). The process was still running but no logs were being generated, it was not responding to JMX queries etc. Several consumers were unable to read from certain partitions while this was occurring, partitions that have a replication factor of 3. I have all the server, controller and state-change logs from the event, and am trying to filter out the relevant information. But this sequence of events about one partition in the ISR struck me (just kafka.cluster.Partition): There are others just like it. _time,sourcetype,host,_raw 2015-06-05T07:31:47.878-0600,kafka_server_log,qd-kafka8-01,[2015-06-05 07:31:47,878] INFO Partition [birdseed-user-stream,5] on broker 1: Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 3,1 (kafka.cluster.Partition) 2015-06-05T07:31:47.884-0600,kafka_server_log,qd-kafka8-01,[2015-06-05 07:31:47,884] INFO Partition [birdseed-user-stream,5] on broker 1: Cached zkVersion [537] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) 2015-06-05T07:31:57.934-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:31:57,934] INFO Partition [birdseed-user-stream,5] on broker 3: Shrinking ISR for partition [birdseed-user-stream,5] from 3,2 to 3 (kafka.cluster.Partition) 2015-06-05T07:31:59.454-0600,kafka_server_log,qd-kafka8-01,[2015-06-05 07:31:59,454] INFO Partition [birdseed-user-stream,5] on broker 1: Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 1 (kafka.cluster.Partition) 2015-06-05T07:31:59.462-0600,kafka_server_log,qd-kafka8-01,[2015-06-05 07:31:59,462] INFO Partition [birdseed-user-stream,5] on broker 1: Cached zkVersion [537] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) 2015-06-05T07:32:07.387-0600,kafka_server_log,qd-kafka8-01,[2015-06-05 07:32:07,387] INFO Partition [birdseed-user-stream,5] on broker 1: Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 2,1 (kafka.cluster.Partition) 2015-06-05T07:32:07.397-0600,kafka_server_log,qd-kafka8-01,[2015-06-05 07:32:07,397] INFO Partition [birdseed-user-stream,5] on broker 1: Cached zkVersion [537] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) 2015-06-05T07:32:33.619-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:32:33,619] INFO Partition [birdseed-user-stream,5] on broker 3: Expanding ISR for partition [birdseed-user-stream,5] from 3 to 3,2 (kafka.cluster.Partition) 2015-06-05T07:33:12.038-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:33:12,038] INFO Partition [birdseed-user-stream,5] on broker 3: Expanding ISR for partition [birdseed-user-stream,5] from 3,2 to 3,2,1 (kafka.cluster.Partition) 2015-06-05T07:33:38.959-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:33:38,959] INFO Partition [birdseed-user-stream,5] on broker 3: Shrinking ISR for partition [birdseed-user-stream,5] from 3,2,1 to 3,1 (kafka.cluster.Partition) 2015-06-05T07:34:47.266-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:34:47,266] INFO Partition [birdseed-user-stream,5] on broker 3: Expanding ISR for partition [birdseed-user-stream,5] from 3,1 to 3,1,2 (kafka.cluster.Partition) 2015-06-05T07:37:00.584-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:37:00,584] INFO Partition [birdseed-user-stream,5] on broker 3: Shrinking ISR for partition [birdseed-user-stream,5] from 3,1,2 to 3 (kafka.cluster.Partition) 2015-06-05T07:37:00.590-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:37:00,590] INFO Partition [birdseed-user-stream,5] on broker 3: Cached zkVersion [543] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) 2015-06-05T07:37:09.801-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:37:09,801] INFO Partition [birdseed-user-stream,5] on broker 3: Shrinking ISR for partition [birdseed-user-stream,5] from 3,1,2 to 3 (kafka.cluster.Partition) 2015-06-05T07:37:09.804-0600,kafka_server_log,qd-kafka8-03,[2015-06-05 07:37:09,804] INFO Partition [birdseed-user-stream,5] on broker 3: Cached zkVersion [543] not equal to that in zookeeper, skip updating ISR (kafka.cluster.Partition) The last 2 lines repeat every 10 seconds until broker #2 was bounced. We've seen this twice now in production, and unfortunately did not get a jstack from the frozen VM. What more information can I provide to help with this? Thanks Bob Cotton Rally Software
Re: New Producer API - batched sync mode support
@Shapira You are correct from my perspective. We are using kafka for a system where panels can send multiple events in a single message. The current contract is such that all events fail or succeed as a whole. If there is a failure the panel resends all the events. The existing producer api supports this fine, am I getting left behind here for the sake of brevity? I can get behind not adding every feature people ask for but taking away something is a different story all together. On Wed, Apr 29, 2015 at 9:08 PM, Gwen Shapira gshap...@cloudera.com wrote: I'm starting to think that the old adage If two people say you are drunk, lie down applies here :) Current API seems perfectly clear, useful and logical to everyone who wrote it... but we are getting multiple users asking for the old batch behavior back. One reason to get it back is to make upgrades easier - people won't need to rethink their existing logic if they get an API with the same behavior in the new producer. The other reason is what Ewen mentioned earlier - if everyone re-implements Joel's logic, we can provide something for that. How about getting the old batch send behavior back by adding a new API with: public void batchSend(ListProducerRecordK,V) With this implementation (mixes the old behavior with Joel's snippet): * send records one by one * flush * iterate on futures and get them * log a detailed message on each error * throw an exception if any send failed. It reproduces the old behavior - which apparently everyone really liked, and I don't think it is overly weird. It is very limited, but anyone who needs more control over his sends already have plenty of options. Thoughts? Gwen On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, The locking argument is correct for very small records ( 50 bytes), batching will help here because for small records locking becomes the big bottleneck. I think these use cases are rare but not unreasonable. Overall I'd emphasize that the new producer is way faster at virtually all use cases. If there is a use case where that isn't true, let's look at it in a data driven way by comparing the old producer to the new producer and looking for any areas where things got worse. I suspect the reducing allocations argument to be not a big thing. We do a number of small per-message allocations and it didn't seem to have much impact. I do think there are a couple of big producer memory optimizations we could do by reusing the arrays in the accumulator in the serialization of the request but I don't think this is one of them. I'd be skeptical of any api that was too weird--i.e. introduces a new way of partitioning, gives back errors on a per-partition rather than per message basis (given that partitioning is transparent this is really hard to think about), etc. Bad apis end up causing a ton of churn and just don't end up being a good long term commitment as we change how the underlying code works over time (i.e. we hyper optimize for something then have to maintain some super weird api as it becomes hyper unoptimized for the client over time). Roshan--Flush works as you would hope, it blocks on the completion of all outstanding requests. Calling get on the future for the request gives you the associated error code back. Flush doesn't throw any exceptions because waiting for requests to complete doesn't error, the individual requests fail or succeed which is always reported with each request. Ivan--The batches you send in the scala producer today actually aren't truely atomic, they just get sent in a single request. One tricky problem to solve when user's do batching is size limits on requests. This can be very hard to manage since predicting the serialized size of a bunch of java objects is not always obvious. This was repeatedly a problem before. -Jay On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov ibalas...@gmail.com wrote: I must agree with @Roshan – it's hard to imagine anything more intuitive and easy to use for atomic batching as old sync batch api. Also, it's fast. Coupled with a separate instance of producer per broker:port:topic:partition it works very well. I would be glad if it finds its way into new producer api. On a side-side-side note, could anyone confirm/deny if SimpleConsumer's fetchSize must be set at least as batch bytes (before or after compression), otherwise client risks not getting any messages?
Re: NoSuchMethodError with Consumer Instantiation
You probably have the wrong version of the Kafka jar(s) within your classpath. Which version of Kafka are you using and how have you setup the classpath? -Jaikiran On Thursday 18 June 2015 08:11 AM, Srividhya Anantharamakrishnan wrote: Hi, I am trying to set up Kafka in our cluster and I am running into the following error when Consumer is getting instantiated: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread; at kafka.utils.KafkaScheduler$$anon$1.newThread(KafkaScheduler.scala:84) at java.util.concurrent.ThreadPoolExecutor$Worker.init(ThreadPoolExecutor.java:610) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:924) at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1590) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:333) at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570) at kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:116) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:136) at kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:65) at kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:68) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) I am guessing that it is missing certain classpath references. If that is the reason, could someone tell me which jar is it? If not, what is it that I am missing? *KafkaConsumer:* public KafkaConsumer(String topic) { * consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); //line where the error is thrown* this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put(zookeeper.connect, IP:PORT); props.put(group.id, group1); props.put(zookeeper.session.timeout.ms, 6000); props.put(zookeeper.sync.time.ms, 2000); props.put(auto.commit.interval.ms, 6); return new ConsumerConfig(props); } TIA!
NoSuchMethodError with Consumer Instantiation
Hi, I am trying to set up Kafka in our cluster and I am running into the following error when Consumer is getting instantiated: java.lang.NoSuchMethodError: org.apache.kafka.common.utils.Utils.newThread(Ljava/lang/String;Ljava/lang/Runnable;Ljava/lang/Boolean;)Ljava/lang/Thread; at kafka.utils.KafkaScheduler$$anon$1.newThread(KafkaScheduler.scala:84) at java.util.concurrent.ThreadPoolExecutor$Worker.init(ThreadPoolExecutor.java:610) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:924) at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1590) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:333) at java.util.concurrent.ScheduledThreadPoolExecutor.scheduleAtFixedRate(ScheduledThreadPoolExecutor.java:570) at kafka.utils.KafkaScheduler.schedule(KafkaScheduler.scala:116) at kafka.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:136) at kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:65) at kafka.javaapi.consumer.ZookeeperConsumerConnector.init(ZookeeperConsumerConnector.scala:68) at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:120) at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) I am guessing that it is missing certain classpath references. If that is the reason, could someone tell me which jar is it? If not, what is it that I am missing? *KafkaConsumer:* public KafkaConsumer(String topic) { * consumer = Consumer.createJavaConsumerConnector(createConsumerConfig()); //line where the error is thrown* this.topic = topic; } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); props.put(zookeeper.connect, IP:PORT); props.put(group.id, group1); props.put(zookeeper.session.timeout.ms, 6000); props.put(zookeeper.sync.time.ms, 2000); props.put(auto.commit.interval.ms, 6); return new ConsumerConfig(props); } TIA!
Re: Keeping Zookeeper and Kafka Server Up
kafka-server-start.sh has a -daemon option, but I don't think Zookeeper has it. On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com wrote: It seems like nohup has solved this issue, even when the putty window becomes inactive the processes are still running (I din't need to interact with them). I might look into using screen or tmux as a long term solution. Thanks Terry and Mike! Best, Su On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com wrote: Greetings, nohup does the trick, as Mr. Bridge has shared. If you seem to want to run these and still have some interactivity with the services, consider using screen or tmux as these will enable you to run these programs in foreground, have added windows you can use to access shell, tail logs, and so on, and enable you to disconnect from the session, but still have these sessions available for re-attachment. In addition, I using runit for service supervision may enable you to keep daemons running, but if your services are dying you may need to introspect more deeply on the root cause versus working around it by restarting them. *Terry Bates* *Email: *terryjba...@gmail.com *Phone: (*412) 215-0881 *Skype*: terryjbates *GitHub*: https://github.com/terryjbates *Linkedin*: http://www.linkedin.com/in/terryjbates/ On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com wrote: Have you tried using nohup nohup bin/zookeeper-server-start.sh config/zookeeper.properties nohup bin/kafka-server-start.sh config/server.properties On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I'm wondering how to keep Zookeeper and Kafka Server up even when my SSH (using putty) becomes inactive. I've tried running it in the background (using ), but it seems like it stops sometimes after a couple hours or so and I'll have to restart zookeeper and/or the kafka server. The only remediation i've found is to export TMOUT=[big number], but there must be another solution. Thank you! Best, Su
Re: Recovering from broker failure with KafkaConsumer
Adding some more details to the previous question: The indefinite wait doesn't happen on calling subscribe() on the consumer, it happens when I (in this case) call seekToEnd(). A related problem to this is that the seekToEnd() method is synchronized (as are the other access methods on KafkaConsumer), so the client holds a lock while sitting in this wait. This means that if another thread tries to call close(), which is all synchronized, this thread will also be blocked. Holding locks while performing network I/O seems like a bad idea - is this something that's planned to be fixed? Jan On 17 June 2015 at 10:31, Jan Stette jan.ste...@gmail.com wrote: I'm trying out the new KafkaConsumer client API in the trunk of the source tree, and while I realise that this is a work in progress, I have a question that perhaps someone can shed some light on. I'm looking at how to handle various error scenarios for a Kafka client, in particular what happens when trying to connect to the broker but it's not available. The behaviour I'm seeing is that the client will retry indefinitely (at the configurable interval), basically looping around in Fetcher.awaitMetadataUpdate() forever. I would like to have some way to fail the connection attempt to avoid the calling thread being blocked forever. Is this possible with the current version of the client? (Snapshot as of 16/6/15). If not, is that something that's planned for the future? Jan
Re: OutOfMemoryError in mirror maker
Thank you for the reply. Patch submitted https://issues.apache.org/jira/browse/KAFKA-2281 On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Tao, Yes, the issue that ErrorLoggingCallback keeps value as local variable is known for a while and we probably should fix it as the value is not used except logging the its size. Can you open a ticket and maybe also submit a patch? For unreachable objects I don¹t think it is memory leak. As you said, GC should take care of this. In LinkedIn we are using G1GC with some tunings made by our SRE. You can try that if interested. Thanks, Jiangjie (Becket) Qin On 6/13/15, 11:39 AM, tao xiao xiaotao...@gmail.com wrote: Hi, I am using mirror maker in trunk to replica data across two data centers. While the destination broker was having busy load and unresponsive the send rate of mirror maker was very low and the available producer buffer was quickly filled up. At the end mirror maker threw OOME. Detailed exception can be found here https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-exceptio n-L1 I started up mirror maker with 1G memory and 256M producer buffer. I used eclipse MAT to analyze the heap dump and found out the retained heap size of all RecordBatch objects were more than 500MB half of which were used to retain data that were to send to destination broker which makes sense to me as it is close to 256MB producer buffer but the other half of which were used by kafka.tools.MirrorMaker$MirrorMakerProducerCallback. As every producer callback in mirror maker takes the message value and hold it until the message is successfully delivered. In my case since the destination broker was very unresponsive the message value held by callback would stay forever which I think is a waste and it is a major contributor to the OOME issue. screenshot of MAT https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensho t-png The other interesting problem I observed is that when I turned on unreachable object parsing in MAT more than 400MB memory was occupied by unreachable objects. It surprised me that gc didn't clean them up before OOME was thrown. As suggested in gc log https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-gc-log-L 1 Full GC was unable to reclaim any memory and when facing OOME these unreachable objects should have been cleaned up. so either eclipse MAT has issue parsing the heap dump or there is hidden memory leak that is hard to find. I attached the sample screenshot of the unreachable objects here https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-unreachable-o bjects-png The consumer properties zookeeper.connect=zk zookeeper.connection.timeout.ms=100 group.id=mm auto.offset.reset=smallest partition.assignment.strategy=roundrobin The producer properties bootstrap.servers=brokers client.id=mirror-producer producer.type=async compression.codec=none serializer.class=kafka.serializer.DefaultEncoder key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer buffer.memory=268435456 batch.size=1048576 max.request.size=5242880 send.buffer.bytes=1048576 The java command to start mirror maker java -Xmx1024M -Xms512M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/home/kafka/slc-phx-mm-cg.hprof -XX:+PrintTenuringDistribution -XX:MaxTenuringThreshold=3 -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/var/log/kafka/kafka-phx/cg/mirrormaker-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=10M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/var/log/kafka/kafka-phx/cg -Dlog4j.configuration=file:/usr/share/kafka/bin/../config/tools-log4j.prop erties -cp libs/* kafka.tools.MirrorMaker --consumer.config consumer.properties --num.streams 10 --producer.config producer.properties --whitelist test.*
Re: At-least-once guarantees with high-level consumer
With auto-commit one can only have at-most-once delivery guarantee - after commit but before message is delivered for processing, or even after it is delivered but before it is processed, things can fail, causing event not to be processed, which is basically same outcome as if it was not delivered. On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann ch.heym...@gmail.com wrote: Hi ** Disclaimer: I know there's a new consumer API on the way, this mail is about the currently available API. I also apologise if the below has already been discussed previously. I did try to check previous discussions on ConsumerIterator ** It seems to me that the high-level consumer would be able to support at-least-once messaging, even if one uses auto-commit, by changing kafka.consumer.ConsumerIterator.next() to call currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a consumer thread for a KafkaStream could just loop: while (true) { MyMessage message = iterator.next().message(); process(message); } Each call to iterator.next() then updates the offset to commit to the end of the message that was just processed. When offsets are committed for the ConsumerConnector (either automatically or manually), the commit will not include offsets of messages that haven't been fully processed. I've tested the following ConsumerIterator.next(), and it seems to work as I expect: override def next(): MessageAndMetadata[K, V] = { // New code: reset consumer offset to the end of the previously consumed message: if (consumedOffset -1L currentTopicInfo != null) { currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace(Setting %s consumed offset to %d.format(topic, consumedOffset)) } // Old code, excluding reset: val item = super.next() if(consumedOffset 0) throw new KafkaException(Offset returned by the message set is invalid %d.format(consumedOffset)) val topic = currentTopicInfo.topic consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item } I've seen several people asking about managing commit offsets manually with the high level consumer. I suspect that this approach (the modified ConsumerIterator) would scale better than having a separate ConsumerConnecter per stream just so that you can commit offsets with at-least-once semantics. The downside of this approach is more duplicate deliveries after recovery from hard failure (but this is at least once, right, not exactly once). I don't propose that the code necessarily be changed like this in trunk, I just want to know if the approach seems reasonable. Regards Carl Heymann
How to specify kafka bootstrap jvm options?
I want to tune the kafka jvm options, but nowhere can I pass the options to the kafka startup script(bin/kafka-server-start.sh). How to pass in the jvm options?
Recovering from broker failure with KafkaConsumer
I'm trying out the new KafkaConsumer client API in the trunk of the source tree, and while I realise that this is a work in progress, I have a question that perhaps someone can shed some light on. I'm looking at how to handle various error scenarios for a Kafka client, in particular what happens when trying to connect to the broker but it's not available. The behaviour I'm seeing is that the client will retry indefinitely (at the configurable interval), basically looping around in Fetcher.awaitMetadataUpdate() forever. I would like to have some way to fail the connection attempt to avoid the calling thread being blocked forever. Is this possible with the current version of the client? (Snapshot as of 16/6/15). If not, is that something that's planned for the future? Jan
Re: How to specify kafka bootstrap jvm options?
Most of the tuning options are available in kafka-run-class.sh. You can override required props (KAFKA_HEAP_OPTS , KAFKA_JVM_PERFORMANCE_OPTS) to kafka-server-start.sh script. On Wed, Jun 17, 2015 at 2:11 PM, luo.fucong bayinam...@gmail.com wrote: I want to tune the kafka jvm options, but nowhere can I pass the options to the kafka startup script(bin/kafka-server-start.sh). How to pass in the jvm options?
Re: Log compaction not working as expected
Hi, you might want to have a look here: http://kafka.apache.org/documentation.html#topic-config _segment.ms_ and _segment.bytes _ should allow you to control the time/size when segments are rolled. Best Jan On 16.06.2015 14:05, Shayne S wrote: Some further information, and is this a bug? I'm using 0.8.2.1. Log compaction will only occur on the non active segments. Intentional or not, it seems that the last segment is always the active segment. In other words, an expired segment will not be cleaned until a new segment has been created. As a result, a log won't be compacted until new data comes in (per partition). Does this mean I need to send the equivalent of a pig ( https://en.wikipedia.org/wiki/Pigging) through each partition in order to force compaction? Or can I force the cleaning somehow? Here are the steps to recreate: 1. Create a new topic with a 5 minute segment.ms: kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30 2. Repeatedly add messages with identical keys (3x): echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 3. Wait 5+ minutes and confirm no log compaction. 4. Once satisfied, send a new message: echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 5. Log compaction will occur quickly soon after. Is my use case of infrequent logs not supported? Is this intentional behavior? It's unnecessarily challenging to target each partition with a dummy message to trigger compaction. Also, I believe there is another issue with logs originally configured without a segment timeout that lead to my original issue. I still cannot get those logs to compact. Thanks! Shayne
Re: At-least-once guarantees with high-level consumer
So Carl Heymann's ConsumerIterator.next hack approach is not reasonable? 2015-06-17 08:12:50 + 上のメッセージ Stevo Slavić: --047d7bfcf30ed09b460518b241db Content-Type: text/plain; charset=UTF-8 With auto-commit one can only have at-most-once delivery guarantee - after commit but before message is delivered for processing, or even after it is delivered but before it is processed, things can fail, causing event not to be processed, which is basically same outcome as if it was not delivered. On Mon, Jun 15, 2015 at 9:12 PM, Carl Heymann ch.heym...@gmail.com wrote: Hi ** Disclaimer: I know there's a new consumer API on the way, this mail is about the currently available API. I also apologise if the below has already been discussed previously. I did try to check previous discussions on ConsumerIterator ** It seems to me that the high-level consumer would be able to support at-least-once messaging, even if one uses auto-commit, by changing kafka.consumer.ConsumerIterator.next() to call currentTopicInfo.resetConsumeOffset(..) _before_ super.next(). This way, a consumer thread for a KafkaStream could just loop: while (true) { MyMessage message = iterator.next().message(); process(message); } Each call to iterator.next() then updates the offset to commit to the end of the message that was just processed. When offsets are committed for the ConsumerConnector (either automatically or manually), the commit will not include offsets of messages that haven't been fully processed. I've tested the following ConsumerIterator.next(), and it seems to work as I expect: override def next(): MessageAndMetadata[K, V] = { // New code: reset consumer offset to the end of the previously consumed message: if (consumedOffset -1L currentTopicInfo != null) { currentTopicInfo.resetConsumeOffset(consumedOffset) val topic = currentTopicInfo.topic trace(Setting %s consumed offset to %d.format(topic, consumedOffset)) } // Old code, excluding reset: val item = super.next() if(consumedOffset 0) throw new KafkaException(Offset returned by the message set is invalid %d.format(consumedOffset)) val topic = currentTopicInfo.topic consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() item } I've seen several people asking about managing commit offsets manually with the high level consumer. I suspect that this approach (the modified ConsumerIterator) would scale better than having a separate ConsumerConnecter per stream just so that you can commit offsets with at-least-once semantics. The downside of this approach is more duplicate deliveries after recovery from hard failure (but this is at least once, right, not exactly once). I don't propose that the code necessarily be changed like this in trunk, I just want to know if the approach seems reasonable. Regards Carl Heymann --047d7bfcf30ed09b460518b241db--
Re: Log compaction not working as expected
Ah misread that sorry! On 17.06.2015 14:26, Shayne S wrote: Right, you can see I've got segment.ms set. The trick is that they don't actually roll over until something new arrives. If your topic is idle (not receiving messages), it won't ever roll over to a new segment, and thus the last segment will never be compacted. Thanks! Shayne On Wed, Jun 17, 2015 at 5:58 AM, Jan Filipiak jan.filip...@trivago.com wrote: Hi, you might want to have a look here: http://kafka.apache.org/documentation.html#topic-config _segment.ms_ and _segment.bytes _ should allow you to control the time/size when segments are rolled. Best Jan On 16.06.2015 14:05, Shayne S wrote: Some further information, and is this a bug? I'm using 0.8.2.1. Log compaction will only occur on the non active segments. Intentional or not, it seems that the last segment is always the active segment. In other words, an expired segment will not be cleaned until a new segment has been created. As a result, a log won't be compacted until new data comes in (per partition). Does this mean I need to send the equivalent of a pig ( https://en.wikipedia.org/wiki/Pigging) through each partition in order to force compaction? Or can I force the cleaning somehow? Here are the steps to recreate: 1. Create a new topic with a 5 minute segment.ms: kafka-topics.sh --zookeeper localhost:2181 --create --topic TEST_TOPIC --replication-factor 1 --partitions 1 --config cleanup.policy=compact --config min.cleanable.dirty.ratio=0.01 --config segment.ms=30 2. Repeatedly add messages with identical keys (3x): echo ABC123,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 3. Wait 5+ minutes and confirm no log compaction. 4. Once satisfied, send a new message: echo DEF456,{\test\: 1} | kafka-console-producer.sh --broker-list localhost:9092 --topic TEST_TOPIC --property parse.key=true --property key.separator=, --new-producer 5. Log compaction will occur quickly soon after. Is my use case of infrequent logs not supported? Is this intentional behavior? It's unnecessarily challenging to target each partition with a dummy message to trigger compaction. Also, I believe there is another issue with logs originally configured without a segment timeout that lead to my original issue. I still cannot get those logs to compact. Thanks! Shayne
Re: Keeping Zookeeper and Kafka Server Up
We use supervisord for this. It ensures that the processes are always up and running. Thanks Kashyap On Wednesday, June 17, 2015, Shayne S shaynest...@gmail.com wrote: kafka-server-start.sh has a -daemon option, but I don't think Zookeeper has it. On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com javascript:; wrote: It seems like nohup has solved this issue, even when the putty window becomes inactive the processes are still running (I din't need to interact with them). I might look into using screen or tmux as a long term solution. Thanks Terry and Mike! Best, Su On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com javascript:; wrote: Greetings, nohup does the trick, as Mr. Bridge has shared. If you seem to want to run these and still have some interactivity with the services, consider using screen or tmux as these will enable you to run these programs in foreground, have added windows you can use to access shell, tail logs, and so on, and enable you to disconnect from the session, but still have these sessions available for re-attachment. In addition, I using runit for service supervision may enable you to keep daemons running, but if your services are dying you may need to introspect more deeply on the root cause versus working around it by restarting them. *Terry Bates* *Email: *terryjba...@gmail.com javascript:; *Phone: (*412) 215-0881 *Skype*: terryjbates *GitHub*: https://github.com/terryjbates *Linkedin*: http://www.linkedin.com/in/terryjbates/ On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com javascript:; wrote: Have you tried using nohup nohup bin/zookeeper-server-start.sh config/zookeeper.properties nohup bin/kafka-server-start.sh config/server.properties On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com javascript:; wrote: Hello Everyone, I'm wondering how to keep Zookeeper and Kafka Server up even when my SSH (using putty) becomes inactive. I've tried running it in the background (using ), but it seems like it stops sometimes after a couple hours or so and I'll have to restart zookeeper and/or the kafka server. The only remediation i've found is to export TMOUT=[big number], but there must be another solution. Thank you! Best, Su
Re: QuickStart OK locally, but getting WARN Property topic is not valid and LeaderNotAvailableException remotely
I set this up on EC2 in exactly the same way and had the same errors when accessing it with a producer that was outside EC2. Is there something else I have to configure other than to set advertised.host.name to my external IP address? On Tue, Jun 16, 2015 at 4:27 PM, Mike Bridge m...@bridgecanada.com wrote: Running this seems to indicate that there is a leader at 0: $ ./bin/kafka-topics.sh --zookeeper MY.EXTERNAL.IP:2181 --describe --topic test123 -- Topic:test123 PartitionCount:1 ReplicationFactor:1 Configs: Topic: test123 Partition: 0 Leader: 0 Replicas: 0 Isr: 0 I reran this test and my server.log indicates that there is a leader at 0: ... [2015-06-16 21:58:04,498] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2015-06-16 21:58:04,642] INFO Registered broker 0 at path /brokers/ids/0 with address MY.EXTERNAL.IP:9092. (kafka.utils.ZkUtils$) [2015-06-16 21:58:04,670] INFO [Kafka Server 0], started (kafka.server.KafkaServer) [2015-06-16 21:58:04,736] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) I see this error in the logs when I send a message from the producer: [2015-06-16 22:18:24,584] ERROR [KafkaApi-0] error when handling request Name: TopicMetadataRequest; Version: 0; CorrelationId: 7; ClientId: console-producer; Topics: test123 (kafka.server.KafkaApis) kafka.admin.AdminOperationException: replication factor: 1 larger than available brokers: 0 at kafka.admin.AdminUtils$.assignReplicasToBrokers(AdminUtils.scala:70) If it sees my broker when I start it up, why can't it find it later when it receives a message? Thanks, -Mike On Tue, Jun 16, 2015 at 3:11 PM, Gwen Shapira gshap...@cloudera.com wrote: The topic warning is a bug (i.e the fact that you get a warning on perfectly valid parameter). We fixed it for next release. It is also unrelated to the real issue with the LeaderNotAvailable
Re: Keeping Zookeeper and Kafka Server Up
supervisord is pretty easy to use. Netflix Exhibitor will manage this all for zookeeper, if you want to try that tool. On Wed, Jun 17, 2015 at 7:03 AM, Kashyap Mhaisekar kashya...@gmail.com wrote: We use supervisord for this. It ensures that the processes are always up and running. Thanks Kashyap On Wednesday, June 17, 2015, Shayne S shaynest...@gmail.com wrote: kafka-server-start.sh has a -daemon option, but I don't think Zookeeper has it. On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com javascript:; wrote: It seems like nohup has solved this issue, even when the putty window becomes inactive the processes are still running (I din't need to interact with them). I might look into using screen or tmux as a long term solution. Thanks Terry and Mike! Best, Su On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com javascript:; wrote: Greetings, nohup does the trick, as Mr. Bridge has shared. If you seem to want to run these and still have some interactivity with the services, consider using screen or tmux as these will enable you to run these programs in foreground, have added windows you can use to access shell, tail logs, and so on, and enable you to disconnect from the session, but still have these sessions available for re-attachment. In addition, I using runit for service supervision may enable you to keep daemons running, but if your services are dying you may need to introspect more deeply on the root cause versus working around it by restarting them. *Terry Bates* *Email: *terryjba...@gmail.com javascript:; *Phone: (*412) 215-0881 *Skype*: terryjbates *GitHub*: https://github.com/terryjbates *Linkedin*: http://www.linkedin.com/in/terryjbates/ On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com javascript:; wrote: Have you tried using nohup nohup bin/zookeeper-server-start.sh config/zookeeper.properties nohup bin/kafka-server-start.sh config/server.properties On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com javascript:; wrote: Hello Everyone, I'm wondering how to keep Zookeeper and Kafka Server up even when my SSH (using putty) becomes inactive. I've tried running it in the background (using ), but it seems like it stops sometimes after a couple hours or so and I'll have to restart zookeeper and/or the kafka server. The only remediation i've found is to export TMOUT=[big number], but there must be another solution. Thank you! Best, Su
Re: Keeping Zookeeper and Kafka Server Up
+1 to using exhibitor. Besides managing the ensemble it also helps with backups and zk log cleanup (which if you don't do your machine will run out of space). ~ Joestein On Jun 17, 2015 9:44 AM, Dillian Murphey crackshotm...@gmail.com wrote: supervisord is pretty easy to use. Netflix Exhibitor will manage this all for zookeeper, if you want to try that tool. On Wed, Jun 17, 2015 at 7:03 AM, Kashyap Mhaisekar kashya...@gmail.com wrote: We use supervisord for this. It ensures that the processes are always up and running. Thanks Kashyap On Wednesday, June 17, 2015, Shayne S shaynest...@gmail.com wrote: kafka-server-start.sh has a -daemon option, but I don't think Zookeeper has it. On Tue, Jun 16, 2015 at 11:32 PM, Su She suhsheka...@gmail.com javascript:; wrote: It seems like nohup has solved this issue, even when the putty window becomes inactive the processes are still running (I din't need to interact with them). I might look into using screen or tmux as a long term solution. Thanks Terry and Mike! Best, Su On Tue, Jun 16, 2015 at 3:42 PM, Terry Bates terryjba...@gmail.com javascript:; wrote: Greetings, nohup does the trick, as Mr. Bridge has shared. If you seem to want to run these and still have some interactivity with the services, consider using screen or tmux as these will enable you to run these programs in foreground, have added windows you can use to access shell, tail logs, and so on, and enable you to disconnect from the session, but still have these sessions available for re-attachment. In addition, I using runit for service supervision may enable you to keep daemons running, but if your services are dying you may need to introspect more deeply on the root cause versus working around it by restarting them. *Terry Bates* *Email: *terryjba...@gmail.com javascript:; *Phone: (*412) 215-0881 *Skype*: terryjbates *GitHub*: https://github.com/terryjbates *Linkedin*: http://www.linkedin.com/in/terryjbates/ On Tue, Jun 16, 2015 at 3:30 PM, Mike Bridge m...@bridgecanada.com javascript:; wrote: Have you tried using nohup nohup bin/zookeeper-server-start.sh config/zookeeper.properties nohup bin/kafka-server-start.sh config/server.properties On Tue, Jun 16, 2015 at 3:21 PM, Su She suhsheka...@gmail.com javascript:; wrote: Hello Everyone, I'm wondering how to keep Zookeeper and Kafka Server up even when my SSH (using putty) becomes inactive. I've tried running it in the background (using ), but it seems like it stops sometimes after a couple hours or so and I'll have to restart zookeeper and/or the kafka server. The only remediation i've found is to export TMOUT=[big number], but there must be another solution. Thank you! Best, Su
Re: duplicate messages at consumer
This is actually an expected consequence of using distributed systems. The kafka FAQ has a good answer https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIgetexactly-oncemessagingfromKafka ? On Tue, Jun 16, 2015 at 11:06 PM, Kris K squareksc...@gmail.com wrote: Hi, While testing message delivery using kafka, I realized that few duplicate messages got delivered by the consumers in the same consumer group (two consumers got the same message with few milli-seconds difference). However, I do not see any redundancy at the producer or broker. One more observation is that - this is not happening when I use only one consumer thread. I am running 3 brokers (0.8.2.1) with 3 Zookeeper nodes. There are 3 partitions in the topic and replication-factor is 3. For producing, am using New Producer with compression.type=none. On the consumer end, I have 3 High level consumers in the same consumer group running with one consumer thread each, on three different hosts. Auto commit is set to true for consumer. Size of each message would range anywhere between 0.7 KB and 2 MB. The max volume for this test is 100 messages/hr. I looked at controller log for any possibility of consumer rebalance during this time, but did not find any. In the server log of all the brokers the error - java.io.IOException: Connection reset by peer is almost being written continuously. So, is it possible to achieve exactly-once delivery with the current high level consumer without needing an extra layer to remove redundancy? Could you please point me to any settings or logs that would help me tune the configuration ? *PS: I tried searching for similar discussions, but could not find any. If its already been answered, please provide the link. Thanks, Kris -- Adam Shannon | Software Engineer | Banno | Jack Henry 206 6th Ave Suite 1020 | Des Moines, IA 50309 | Cell: 515.867.8337