Re: NotEnoughReplication
Does Kafka automatically replicate the under replicated partitions? I looked at these metrics through jmxterm the Value of Underreplicatedpartition came out to be 0. What are the additional places or metrics to look? There seems to be lack of documentation on Kafka administration when it comes to situations like these. On Sat, Dec 10, 2016 at 6:49 PM, Ewen Cheslack-Postava wrote: > This error doesn't necessarily mean that a broker is down, it can also mean > that too many replicas for that topic partition have fallen behind the > leader. This indicates your replication is lagging for some reason. > > You'll want to be monitoring some of the metrics listed here: > http://kafka.apache.org/documentation.html#monitoring to help you > understand a) when this occurs (e.g. # of under replicated partitions being > a critical one) and b) what the cause might be (e.g. saturating network, > requests processing slow due to some other resource contention, etc). > > -Ewen > > On Fri, Dec 9, 2016 at 5:20 PM, Mohit Anchlia > wrote: > > > What's the best way to fix NotEnoughReplication given all the nodes are > up > > and running? Zookeeper did go down momentarily. We are on Kafka 0.10 > > > > org.apache.kafka.common.errors.NotEnoughReplicasException: Number of > > insync > > replicas for partition [__consumer_offsets,20] is [1], below required > > minimum [2] > > > > > > -- > Thanks, > Ewen >
Re: How to disable auto commit for SimpleConsumer kafka 0.8.1
The simple consumer doesn't do auto-commit. It really only issues individual low-level Kafka protocol request types, so `commitOffsets` is the only way it should write offsets. Is it possible it crashed after the request was sent but before finishing reading the response? Side-note: I know you mentioned 0.8.1, but if at all possible, we'd highly recommend moving to the new consumer if at all possible. It supports both simple and consumer group modes and is what will be supported in the long term moving forward. -Ewen On Tue, Dec 6, 2016 at 12:47 PM, Anjani Gupta wrote: > I want to disable auto commit for kafka SimpleConsumer. I am using 0.8.1 > version.For High level consumer, config options can be set and passed via > consumerConfig as follows kafka.consumer.Consumer. > createJavaConsumerConnector(this.consumerConfig); > > How can I achieve the same for SimpleConsumer? I mainly want to disable > auto commit. I tried setting auto commit to false in consumer.properties > and restarted kafka server, zookeeper and producer. But, that does not > work. I think I need to apply this setting through code, not in > consumer.properties. Can anyone help here? > > Here is how my code looks like > > List topicAndPartitionList = new ArrayList<>(); > topicAndPartitionList.add(topicAndPartition); > OffsetFetchResponse offsetFetchResponse = consumer.fetchOffsets(new > OffsetFetchRequest("testGroup", topicAndPartitionList, (short) 0, > correlationId,clientName)); > > Map offsets = > offsetFetchResponse.offsets(); > FetchRequest req = new FetchRequestBuilder() .clientId(clientName) >.addFetch(a_topic, a_partition, > offsets.get(topicAndPartition).offset(), 10) .build(); > long readOffset = offsets.get(topicAndPartition).offset(); > FetchResponse fetchResponse = consumer.fetch(req); > > //Consume messages from fetchResponse > > > Map requestInfo = new > HashMap<> (); > requestInfo.put(topicAndPartition, new > OffsetMetadataAndError(readOffset, "metadata", (short)0)); > OffsetCommitResponse offsetCommitResponse = consumer.commitOffsets(new > OffsetCommitRequest("testGroup", requestInfo, (short)0, > correlationId, clientName)); > > > If above code crashes before committing offset, I still get latest offset > as result of offsets.get(topicAndPartition).offset() in next run which > makes me to think that auto commit of offset happens as code is executed. > -- Thanks, Ewen
Re: Best approach to frequently restarting consumer process
Consumer groups aren't going to handle 'let it crash' particularly well (and really any session-based services, but particularly consumer groups since a single failure affects the entire group). That said, 'let it crash' doesn't necessarily have to mean 'don't try to clean up at all'. The consumer group will recover *much* more quickly if you make sure any crash path includes a: finally { consumer.close(); } block to do some minimal cleanup. This will cause the consumer to make a best effort to explicitly leave the group, allowing rebalancing to complete after the rest of the members rejoin. If you don't do this, your rebalances get much more expensive since the group coordinator needs to wait for the session timeout. This will probably notice to noticeably longer pauses. The one drawback to doing this today is that the close() can potentially block, so it may not fail as fast as you want it to -- it would be good to get a timeout-based close() implemented as well. That said, the LeaveGroup request *is* best effort, so if the consumer was otherwise in a healthy state, this should be very fast. All this said, 'let it crash' isn't the same thing as 'constant crashes are ok'. It's a fault recovery methodology, but crashing every 5 minutes isn't what the telecom industry had in mind... If things are crashing that frequently, there is likely a very common bug/memory leak/etc which can be fixed to significantly reduce the frequency of crashes. Generally 'let it crash' systems also provide a good way to also collect debugging information for exactly this purpose. -Ewen On Wed, Dec 7, 2016 at 1:38 AM, Harald Kirsch wrote: > With 'restart' I mean a 'let it crash' setup (as promoted by Erlang and > Akka, e.g. http://doc.akka.io/docs/akka/snapshot/intro/what-is-akka.html). > The consumer gets in trouble due to an OOM or a runaway computation or > whatever that we want to preempt somehow. It crashes or gets killed > externally. > > So whether close() is called or not in the dying process, I don't know. > But clearly the subscribe is called after a restart. > > I understand that we are out of luck with this. We would have to separate > the crashing part out into a different operating system process, but must > keep the consumer running all time. :-( > > Thanks for the insight > Harald > > > On 06.12.2016 19:26, Gwen Shapira wrote: > >> Can you clarify what you mean by "restart"? If you call >> consumer.close() and consumer.subscribe() you will definitely trigger >> a rebalance. >> >> It doesn't matter if its "same consumer knocking", we already >> rebalance when you call consumer.close(). >> >> Since we want both consumer.close() and consumer.subscribe() to cause >> rebalance immediately (and not wait for heartbeat), I don't think >> we'll be changing their behavior. >> >> Depending on why consumers need to restart, I'm wondering if you can >> restart other threads in your application but keep the consumer up and >> running to avoid the rebalances. >> >> On Tue, Dec 6, 2016 at 7:18 AM, Harald Kirsch >> wrote: >> >>> We have consumer processes which need to restart frequently, say, every 5 >>> minutes. We have 10 of them so we are facing two restarts every minute on >>> average. >>> >>> 1) It seems that nearly every time a consumer restarts the group is >>> rebalanced. Even if the restart takes less than the heartbeat interval. >>> >>> 2) My guess is that the group manager just cannot know that the same >>> consumer is knocking at the door again. >>> >>> Are my suspicions (1) and (2) correct? Is there a chance to fix this such >>> that a restart within the heartbeat interval does not lead to a >>> re-balance? >>> Would a well defined client.id help? >>> >>> Regards >>> Harald >>> >>> >> >> >> -- Thanks, Ewen
Re: Configuration for low latency and low cpu utilization? java/librdkafka
On the producer side, there's not much you can do to reduce CPU usage if you want low latency and don't have enough throughput to buffer multiple messages -- you're going to end up sending 1 record at a time in order to achieve your desired latency. Note, however, that the producer is thread safe, so if it is possible to combine multiple processes into a single multi-threaded app, you might be able to share a single producer and get better batching. One the consumer side, for the Java client fetch.min.bytes is already set to 1, which will minimize latency -- data will be returned as soon as any data is available. If you are consistently seeing poll() return no messages in your consumers, try increasing fetch.max.wait.ms. It defaults to 500ms, so I'm guessing you're not hitting this, but if your data is spread across enough partitions and brokers, it's possible you are sending out a bunch of fetch requests that aren't returning any data. Also, as with producers, if you have light enough traffic you will benefit by consolidating to fewer consumers if possible. Fetch requests are made one at a time for *all* partitions the consumer is reading from that have the same leader, which means you'll amortize the cost of requests over multiple topic partitions (while maintaining the low latency guarantees when traffic in all the partitions is light anyway). Finally, as always, your best bet is to measure metrics & profile your app to see where the CPU time is going. -Ewen On Thu, Dec 8, 2016 at 7:44 AM, Niklas Ström wrote: > Use case scenario: > We want to have a fairly low latency, say below 20 ms, and we want to be > able to run a few hundred processes (on one machine) both producing and > consuming a handful of topics. The throughput is not high, lets say on > average 10 messages per second for each process. Most messages are 50-500 > bytes large, some may be a few kbytes. > > How should we adjust the configuration parameters for our use case? > > Our experiments so far gives us a good latency but at the expence of CPU > utilization. Even with a bad latency, the CPU utilization is not > satisfying. Since we will have a lot of processes we are concerned that > short poll loops will cause an overconsumption of CPU capacity. We are > hoping we might have missed some configuration parameter or that we have > some issues with our environment that we can find and solve. > > We are using both the java client and librdkafka and see similar CPU issues > in both clients. > > We have looked at recommendations from: > https://github.com/edenhill/librdkafka/wiki/How-to- > decrease-message-latency > The only thing that seems to really make a difference for librdkafka is > socket.blocking.max.ms, but reducing that also makes the CPU go up. > > I would really appreciate input on configuration parameters and of any > experience with environment issues that has caused CPU load. Or is our > scenario not feasible at all? > > Cheers > -- Thanks, Ewen
Re: Upgrading from 0.10.0.1 to 0.10.1.0
Hagen, What does "new consumer doesn't like the old brokers" mean exactly? When upgrading MM, remember that it uses the clients internally so the same compatibility rules apply: you need to upgrade both sets of brokers before you can start using the new version of MM. -Ewen On Thu, Dec 8, 2016 at 6:32 AM, Hagen Rother wrote: > Hi, > > I am testing an upgrade and I am stuck on the mirror maker. > > - New consumer doesn't like the old brokers > - Old consumer comes up, but does nothing and throws > a java.net.SocketTimeoutException after while. > > What's the correct upgrade strategy when mirroring is used? > > Thanks! > Hagen > -- Thanks, Ewen
Re: NotEnoughReplication
This error doesn't necessarily mean that a broker is down, it can also mean that too many replicas for that topic partition have fallen behind the leader. This indicates your replication is lagging for some reason. You'll want to be monitoring some of the metrics listed here: http://kafka.apache.org/documentation.html#monitoring to help you understand a) when this occurs (e.g. # of under replicated partitions being a critical one) and b) what the cause might be (e.g. saturating network, requests processing slow due to some other resource contention, etc). -Ewen On Fri, Dec 9, 2016 at 5:20 PM, Mohit Anchlia wrote: > What's the best way to fix NotEnoughReplication given all the nodes are up > and running? Zookeeper did go down momentarily. We are on Kafka 0.10 > > org.apache.kafka.common.errors.NotEnoughReplicasException: Number of > insync > replicas for partition [__consumer_offsets,20] is [1], below required > minimum [2] > -- Thanks, Ewen
Re: Running mirror maker between two different version of kafka
It's tough to read that stacktrace, but if I understand what you mean by "running the kafka mirroring in destination cluster which is 0.10.1.0 version of kafka", then the problem is that you cannot use 0.10.1.0 mirror maker with an 0.8.1. cluster. MirrorMaker is both a producer and consumer, so the version used has to be client-compatible with both source and destination clusters. For Kafka that means the client (MM) can use at most version min(src cluster version, dest cluster version). -Ewen On Thu, Dec 8, 2016 at 6:24 PM, Vijayanand Rengarajan < vijayanand.rengara...@yahoo.com.invalid> wrote: > Team, > I am trying to mirror few topics from cluster A( version 0.8.1) to Cluster > B (version 0.10.1.0), but due to version incompatibility I am getting below > error.if any one of you had similar issues, please share the work > around/solution to this issue. > I am running the kafka mirroring in destination cluster which is 0.10.1.0 > version of kafka installed. > There is no firewall and iptables between these two clusters. > WARN [ConsumerFetcherThread-console-consumer-27615_ > kafkanode01-1481247967907-68767097-0-30], Error in fetch kafka.consumer. > ConsumerFetcherThread$FetchRequest@26902baa (kafka.consumer. > ConsumerFetcherThread)java.io.EOFException at org.apache.kafka.common. > network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) at > kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:129) > at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) at > kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) at > kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$ > $sendRequest(SimpleConsumer.scala:83) at kafka.consumer.SimpleConsumer$ > $anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) > at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply(SimpleConsumer.scala:132) at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$ > apply$mcV$sp$1.apply(SimpleConsumer.scala:132) at > kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) > at > kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) at > kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) at > kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:109) > at kafka.consumer.ConsumerFetcherThread.fetch(ConsumerFetcherThread.scala:29) > at kafka.server.AbstractFetcherThread.processFetchRequest( > AbstractFetcherThread.scala:118) at kafka.server. > AbstractFetcherThread.doWork(AbstractFetcherThread.scala:103) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > > Thanks, vijayanand. -- Thanks, Ewen
Re: Kafka supported on AIX OS?
As documented here http://kafka.apache.org/documentation#os Linux and Solaris have been tested, with Linux being the most common platform and the one regularly tested within the project itself. Since AIX is a Unix it'll probably work fine there, and I believe IBM at least provided documentation if not support for this. Perhaps someone from IBM could chime in. -Ewen On Thu, Dec 8, 2016 at 5:09 PM, Jayanna, Gautham < gautham.jaya...@siemens.com> wrote: > Hi, > We are trying to determine if we can run Kafka on AIX OS, however I could > not find definite information in the wiki page or by searching on internet. > I would greatly appreciate if you could let us know if we can run Kafka on > AIX or if there are plans to support AIX in a future release. > > Regards, > Gautham > > > This message and any attachments are solely for the use of intended > recipients. The information contained herein may include trade secrets, > protected health or personal information, privileged or otherwise > confidential information. Unauthorized review, forwarding, printing, > copying, distributing, or using such information is strictly prohibited and > may be unlawful. If you are not an intended recipient, you are hereby > notified that you received this email in error, and that any review, > dissemination, distribution or copying of this email and any attachment is > strictly prohibited. If you have received this email in error, please > contact the sender and delete the message and any attachment from your > system. Thank you for your cooperation > -- Thanks, Ewen
Re: Error starting kafka server.
Sai, Attachments to the mailing list get filtered, you'll need to paste the relevant info into your email. -Ewen On Fri, Dec 9, 2016 at 1:18 AM, Sai Karthi Golagani < skgolagani...@fishbowl.com> wrote: > Hi team, > > I’ve recently setup kafka server on a edge node and zk on 3 separate > hosts.I’m using zookeeper as a client @port no: 5181.I have 3 hosts having > zk, so I’m using a znode to connect to zk from my kafka host. > > While starting the kafka server, I’m getting this following exception > “FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to > shutdown (kafka.server.KafkaServer)” > > Im attaching server,zookeeper .properties files along with the server.log. > > Please do the needful. > > > > Kafka version: 0.9.0 > > Zk version : 3.4.5 > -- Thanks, Ewen
Re: Efficient Kafka batch processing
You may actually want this implemented in a Streams app eventually, there is a KIP being discussed to support this type of incremental batch processing in Streams: https://cwiki.apache.org/confluence/display/KAFKA/KIP-95%3A+Incremental+Batch+Processing+for+Kafka+Streams However, for now the approach you mentioned using a consumer would be the best approach. When you start up the app you can use the endOffsets API to determine what offset you should treat as the last offset: http://docs.confluent.io/3.1.1/clients/javadocs/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection) In terms of memory usage, you'll simply need to process in reasonably sized blocks. If you can already handle incremental processing like this then presumably it should be possible to create smaller sub-blocks and just run that process N times if you have too many messages. -Ewen On Sat, Dec 10, 2016 at 10:29 AM, Dominik Safaric wrote: > Hi everyone, > > What is among the most efficient ways to fast consume, transform and > process Kafka messages? Importantly, I am not referring nor interested in > streams, because the Kafka topic from which I would like to process the > messages will eventually stop receiving messages, after which I should > process the messages by extracting certain keys in a batch processing like > manner. > > So far I’ve implemented a a Kafka Consumer group that consumers these > messages, hashes them according to a certain key, and upon retrieval of the > last message starts the processing script. However, I am dealing with > exactly 100.000.000 log messages, each of 16 bytes, meaning that preserving > 1.6GB of data in-memory i.e. on heap is not the most efficient manner - > performance and memory wise. > > Regards, > Dominik > > -- Thanks, Ewen
How does 'TimeWindows.of().until()' work?
Ive added the 'until()' clause to some aggregation steps and it's working wonders for keeping the size of the state store in useful boundaries... But Im not 100% clear on how it works. What is implied by the '.until()' clause? What determines when to stop receiving further data - is it clock time (since the window was created)? It seems problematic for it to refer to EventTime as this may bounce all over the place. For non-overlapping windows a given record can only fall into a single aggregation period - so when would a value get discarded? Im using 'groupByKey(),aggregate(..., TimeWindows.of(60 * 1000L).until(10 * 1000L))' - but what is this accomplishing?
Another odd error
(Am reporting these as have moved to 0.10.1.0-cp2) ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group MinuteAgg failed on partition assignment java.lang.IllegalStateException: task [1_9] Log end offset should not change while restoring at org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123) at org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206) at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66) at org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64) at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:120) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
odd error message
This came up a few times today: 2016-12-10 18:45:52,637 [StreamThread-1] ERROR o.a.k.s.p.internals.StreamThread - stream-thread [StreamThread-1] Failed to create an active task %s: org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Error while creating the state manager at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:72) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:90) at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633) at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660) at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69) at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242) Caused by: java.io.IOException: task [0_0] Failed to lock the state directory: /mnt/extra/space/MinuteAgg/0_0 at org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:101) at org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:69) ... 13 common frames omitted
Re: 'swap' space for KStream app - limitations?
Have a look into this thread: http://mail-archives.apache.org/mod_mbox/kafka-users/201606.mbox/%3c49878526-ad9b-42ef-a220-728a5ae13...@parkassist.com%3E On 12/10/16 4:03 AM, Jon Yeargers wrote: > Are there any? My app ran for a few hours and filled a 100G partition (on 5 > machines). > > Any settings to keep this growth in check? Perhaps to estimate how much > space it's going to need? > signature.asc Description: OpenPGP digital signature
Re: checking consumer lag on KStreams app?
It's basically just a consumer as any other. The application.id is used as consumer group.id. So just use the available tools you do use to check consumer lag. -Matthias On 12/9/16 5:49 PM, Jon Yeargers wrote: > How would this be done? > signature.asc Description: OpenPGP digital signature
Re: Deleting a topic without delete.topic.enable=true?
Are you running something else besides the consumers that would maintain a memory of the topics and potentially recreate them by issuing a metadata request? For example, Burrow (the consumer monitoring app I wrote) does this because it maintains a list of all topics in memory, and will end up recreating a topic that has been deleted as it issues a metadata request to try and find out what happened after an offset request for the topic fails. -Todd On Fri, Dec 9, 2016 at 8:37 AM, Tim Visher wrote: > On Fri, Dec 9, 2016 at 11:34 AM, Todd Palino wrote: > > > Given that you said you removed the log directories, and provided that > when > > you did the rmr on Zookeeper it was to the “/brokers/topics/(topic name)” > > path, you did the right things for a manual deletion. It sounds like you > > may have a consumer (or other client) that is recreating the topic. Do > you > > have auto topic creation enabled? > > > > That was the last epiphany we had. We had shut down the producer but not > all the consumers and we do allow auto-topic creation. > > That said, we then proceeded to shut all of them down (the consumers) and > the topic came back. I'm glad that we were doing the right steps though. > > > > > -Todd > > > > > > On Fri, Dec 9, 2016 at 8:25 AM, Tim Visher wrote: > > > > > I did all of that because setting delete.topic.enable=true wasn't > > > effective. We set that across every broker, restarted them, and then > > > deleted the topic, and it was still stuck in existence. > > > > > > On Fri, Dec 9, 2016 at 11:11 AM, Ali Akhtar > > wrote: > > > > > > > You need to also delete / restart zookeeper, its probably storing the > > > > topics there. (Or yeah, just enable it and then delete the topic) > > > > > > > > On Fri, Dec 9, 2016 at 9:09 PM, Rodrigo Sandoval < > > > > rodrigo.madfe...@gmail.com > > > > > wrote: > > > > > > > > > Why did you do all those things instead of just setting > > > > > delete.topic.enable=true? > > > > > > > > > > On Dec 9, 2016 13:40, "Tim Visher" wrote: > > > > > > > > > > > Hi Everyone, > > > > > > > > > > > > I'm really confused at the moment. We created a topic with > brokers > > > set > > > > to > > > > > > delete.topic.enable=false. > > > > > > > > > > > > We now need to delete that topic. To do that we shut down all the > > > > > brokers, > > > > > > deleted everything under log.dirs and logs.dir on all the kafka > > > > brokers, > > > > > > `rmr`ed the entire chroot that kafka was storing things under in > > > > > zookeeper, > > > > > > and then brought kafka back up. > > > > > > > > > > > > After doing all that, the topic comes back, every time. > > > > > > > > > > > > What can we do to delete that topic? > > > > > > > > > > > > -- > > > > > > > > > > > > In Christ, > > > > > > > > > > > > Timmy V. > > > > > > > > > > > > http://blog.twonegatives.com/ > > > > > > http://five.sentenc.es/ -- Spend less time on mail > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > *Todd Palino* > > Staff Site Reliability Engineer > > Data Infrastructure Streaming > > > > > > > > linkedin.com/in/toddpalino > > > -- *Todd Palino* Staff Site Reliability Engineer Data Infrastructure Streaming linkedin.com/in/toddpalino
Efficient Kafka batch processing
Hi everyone, What is among the most efficient ways to fast consume, transform and process Kafka messages? Importantly, I am not referring nor interested in streams, because the Kafka topic from which I would like to process the messages will eventually stop receiving messages, after which I should process the messages by extracting certain keys in a batch processing like manner. So far I’ve implemented a a Kafka Consumer group that consumers these messages, hashes them according to a certain key, and upon retrieval of the last message starts the processing script. However, I am dealing with exactly 100.000.000 log messages, each of 16 bytes, meaning that preserving 1.6GB of data in-memory i.e. on heap is not the most efficient manner - performance and memory wise. Regards, Dominik
Connect: SourceTask poll & commit interaction
Hi Kafka Users, I'm looking for a bit of clarification on the documentation for implementing a SourceTask. I'm reading a replication stream from a database in my SourceTask, and I'd like to use commit or commitRecord to advance the other system's replication stream pointer so that it knows I have successfully read & committed the records to Kafka. This allows the other system to discard unneeded transaction logs. But I'm uncertain how to use either or SourceTask's commit or commitRecord correctly. For commit, the documentation says that it should "Commit the offsets, up to the offsets that have been returned by poll().". When commit() is executed, will poll() currently be running on another thread? I assume it must be, because poll should block, and that would imply you can't commit the tailing end of some activity. If commit is invoked while poll is being invoked, I'm concerned that I can't reliably determine where to advance my replication stream pointer to -- if I store the location at the end of poll, commit might be invoked while poll is still returning some records, and advance the pointer further than actually guaranteed. commitRecord on the other hand is invoked per-record. The documentation says "Commit an individual SourceRecord when the callback from the producer client is received." But if I'm producing to N partitions on different brokers, I believe that the producer callback is not called in any guaranteed order, so I can't advance my replication stream pointer to any single record since an older record being delivered to another partition may not have been committed. The only solution I can see so far is to maintain the replication stream positions of all the source records that I've returned from poll, and advance the replication pointer in commitRecord only when the lowest outstanding record is committed. Is there anything I've misunderstood or misinterpreted? Thanks, Mathieu
'swap' space for KStream app - limitations?
Are there any? My app ran for a few hours and filled a 100G partition (on 5 machines). Any settings to keep this growth in check? Perhaps to estimate how much space it's going to need?