Re: Suggestions of pulling local application logs into Kafka
On Tue, May 3, 2016 at 1:00 PM Banias Hwrote: > I should add Flume is not an option for various reasons. > > On Tue, May 3, 2016 at 2:49 PM, Banias H wrote: > > > We use Kafka (0.9.x) internally in our pipeline and now we would like to > > ingest application logs sitting in local file system of servers external > to > > the Kafka cluster. > > > > We could write a producer program running on the application servers to > > push files to Kafka. However we wonder if we can leverage Kafka Connect > to > > pull files into Kafka. It requires a connector to access local file > systems > > and I am not sure if I can make use of existing connectors or I need to > > write one. > > > > Any thoughts or reference would be helpful. > heka - great at reading arbitrary inputs, files being a first-class input, can write to kafka. next release should see use of librdkafka which should handle leader changeovers more smoothly than the existing go library. > > > > Thanks, > > B > > >
Re: no space left error
I'm keen to hear about how to work one's way out of a filled partition since I've run into this many times after having tuned retention bytes or retention (time?) incorrectly. The proper path to resolving this isn't obvious based on my many harried searches through documentation. I often end up stopping the particular broker, picking an unlucky topic/partition, deleting, modifying the any topics that consumed too much space by lowering their retention bytes, and restarting. On Tue, Jan 6, 2015 at 12:02 PM, Sa Li sal...@gmail.com wrote: Continue this issue, when I restart the server, like bin/kafka-server-start.sh config/server.properties it will fails to start the server, like [2015-01-06 20:00:55,441] FATAL Fatal error during KafkaServerStable startup. Prepare to shutdown (kafka.server.KafkaServerStartable) java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.nio.HeapByteBuffer.init(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:331) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:188) at kafka.log.FileMessageSet$$anon$1.makeNext(FileMessageSet.scala:165) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.log.LogSegment.recover(LogSegment.scala:165) at kafka.log.Log.recoverLog(Log.scala:179) at kafka.log.Log.loadSegments(Log.scala:155) at kafka.log.Log.init(Log.scala:64) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:118) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$4.apply(LogManager.scala:113) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:105) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:113) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:105) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at kafka.log.LogManager.loadLogs(LogManager.scala:105) at kafka.log.LogManager.init(LogManager.scala:57) at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:275) at kafka.server.KafkaServer.startup(KafkaServer.scala:72) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) [2015-01-06 20:00:55,443] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) [2015-01-06 20:00:55,444] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2015-01-06 20:00:55,446] INFO Session: 0x684a5ed9da3a1a0f closed (org.apache.zookeeper.ZooKeeper) [2015-01-06 20:00:55,446] INFO EventThread shut down (org.apache.zookeeper.ClientCnxn) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shut down completed (kafka.server.KafkaServer) [2015-01-06 20:00:55,447] INFO [Kafka Server 100], shutting down (kafka.server.KafkaServer) Any ideas On Tue, Jan 6, 2015 at 12:00 PM, Sa Li sal...@gmail.com wrote: the complete error message: -su: cannot create temp file for here-document: No space left on device OpenJDK 64-Bit Server VM warning: Insufficient space for shared memory file: /tmp/hsperfdata_root/19721 Try using the -Djava.io.tmpdir= option to select an alternate temp location. [2015-01-06 19:50:49,244] FATAL (kafka.Kafka$) java.io.FileNotFoundException: conf (No such file or directory) at java.io.FileInputStream.open(Native Method) at java.io.FileInputStream.init(FileInputStream.java:146) at java.io.FileInputStream.init(FileInputStream.java:101) at kafka.utils.Utils$.loadProps(Utils.scala:144) at kafka.Kafka$.main(Kafka.scala:34) at kafka.Kafka.main(Kafka.scala) On Tue, Jan 6, 2015 at 11:58 AM, Sa Li sal...@gmail.com wrote: Hi, All I am doing performance test on our new kafka production server, but after sending some messages (even faked message by using bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance), it comes out the error of connection, and shut down the brokers, after that, I see such errors, conf-su: cannot create temp file for here-document: No space left on device How can I fix it, I am concerning that will happen when we start to publish real messages in kafka, and should I create some cron to regularly clean certain directories? thanks -- Alec Li -- Alec Li -- Alec Li
Re: Updated Kafka Roadmap?
I too could benefit from an updated roadmap. We're in a similar situation where some components in our stream processing stack could use an overhaul, but I'm waiting for the offset API to be fully realized before doing any meaningful planning. On Fri, Aug 1, 2014 at 11:52 AM, Jonathan Weeks jonathanbwe...@gmail.com wrote: Howdy, I was wondering if it would be possible to update the release plan: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan aligned with the feature roadmap: https://cwiki.apache.org/confluence/display/KAFKA/Index We have several active projects actively and planning to use Kafka, and any current guidance on the new releases related to ZK dependence, producer and consumer API/client timing would be very helpful. For example, is 0.8.2 possible in August, or is September likely? Also, any chance something like: https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer …might make it into 0.9? Thanks!
reduce replication factor
Is there a way to reduce the replication count? I'm trying to spread existing partitions across more brokers, but it's hard to decomm a broker. Reducing repl count would suffice for now. Any tips? I'm running a mix of 0.8.1.1 and 0.8.1 (I'm upgrading now.)
Re: reduce replication factor
I did that and so now the topic has 4 replicas for a repl count of 3, but only the 'new' replicas exist in the ISR. The old broker that I want to clear disk space and generally free up resources has fully synced a topic that I want to disassociate from it. Is there a way to do this? On Wed, May 21, 2014 at 7:48 AM, Jun Rao jun...@gmail.com wrote: We don't have an exact tool for doing this. You may be able to do that through http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factorby specifying fewer replicas. Thanks, Jun On Wed, May 21, 2014 at 1:23 AM, David Birdsong david.birds...@gmail.com wrote: Is there a way to reduce the replication count? I'm trying to spread existing partitions across more brokers, but it's hard to decomm a broker. Reducing repl count would suffice for now. Any tips? I'm running a mix of 0.8.1.1 and 0.8.1 (I'm upgrading now.)
Re: reduce replication factor
Here's the reassignment json and current describe output: https://gist.github.com/davidbirdsong/32cd0c4f49496a6a32e5 In my re-assignment json, I tried to re-assign to 2 when the repl is set to 3. Once I noticed the the completely new node 133 had appeared in the ISR, I tried stopping 224, wiping kafka completely and then brought 224 back up as 224 again. It promptly replicated the topic, but never joined the ISR. How does one move a replica? This is exactly what I'm trying to do. My pattern is a common one. I started with a set of 3 kafka brokers. The load and space is overwhelming them. I'm trying to add new brokers and spread the partitions to new nodes while removing some of the partitions on the old nodes so as to make room. It's the latter that I don't get how to do. I've conflated two issues here mostly due to needing to get this cluster stable again. - reduce replication - remove a partition from a broker, ie. remove the replica they're very distinct actions, but both would help me in the moment On Wed, May 21, 2014 at 8:56 AM, Jun Rao jun...@gmail.com wrote: During the re-assignment, did you move the replica off the old broker? Thanks, Jun On Wed, May 21, 2014 at 8:21 AM, David Birdsong david.birds...@gmail.com wrote: I did that and so now the topic has 4 replicas for a repl count of 3, but only the 'new' replicas exist in the ISR. The old broker that I want to clear disk space and generally free up resources has fully synced a topic that I want to disassociate from it. Is there a way to do this? On Wed, May 21, 2014 at 7:48 AM, Jun Rao jun...@gmail.com wrote: We don't have an exact tool for doing this. You may be able to do that through http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factorby specifying fewer replicas. Thanks, Jun On Wed, May 21, 2014 at 1:23 AM, David Birdsong david.birds...@gmail.com wrote: Is there a way to reduce the replication count? I'm trying to spread existing partitions across more brokers, but it's hard to decomm a broker. Reducing repl count would suffice for now. Any tips? I'm running a mix of 0.8.1.1 and 0.8.1 (I'm upgrading now.)
Re: reduce replication factor
On Wed, May 21, 2014 at 9:11 AM, David Birdsong david.birds...@gmail.comwrote: Here's the reassignment json and current describe output: https://gist.github.com/davidbirdsong/32cd0c4f49496a6a32e5 In my re-assignment json, I tried to re-assign to 2 when the repl is set to 3. Once I noticed the the completely new node 133 had appeared in the ISR, I tried stopping 224, wiping kafka completely and then brought 224 back up as 224 again. It promptly replicated the topic, but never joined the ISR. to be clear, i don't want it to join the ISR. i'm curious how to make 224 forget about the partitions for that topic since i have other plans for it. How does one move a replica? This is exactly what I'm trying to do. My pattern is a common one. I started with a set of 3 kafka brokers. The load and space is overwhelming them. I'm trying to add new brokers and spread the partitions to new nodes while removing some of the partitions on the old nodes so as to make room. It's the latter that I don't get how to do. I've conflated two issues here mostly due to needing to get this cluster stable again. - reduce replication - remove a partition from a broker, ie. remove the replica they're very distinct actions, but both would help me in the moment On Wed, May 21, 2014 at 8:56 AM, Jun Rao jun...@gmail.com wrote: During the re-assignment, did you move the replica off the old broker? Thanks, Jun On Wed, May 21, 2014 at 8:21 AM, David Birdsong david.birds...@gmail.com wrote: I did that and so now the topic has 4 replicas for a repl count of 3, but only the 'new' replicas exist in the ISR. The old broker that I want to clear disk space and generally free up resources has fully synced a topic that I want to disassociate from it. Is there a way to do this? On Wed, May 21, 2014 at 7:48 AM, Jun Rao jun...@gmail.com wrote: We don't have an exact tool for doing this. You may be able to do that through http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factorby specifying fewer replicas. Thanks, Jun On Wed, May 21, 2014 at 1:23 AM, David Birdsong david.birds...@gmail.com wrote: Is there a way to reduce the replication count? I'm trying to spread existing partitions across more brokers, but it's hard to decomm a broker. Reducing repl count would suffice for now. Any tips? I'm running a mix of 0.8.1.1 and 0.8.1 (I'm upgrading now.)
Re: reduce replication factor
On Wed, May 21, 2014 at 9:06 PM, Jun Rao jun...@gmail.com wrote: Expansion can be done by following http://kafka.apache.org/documentation.html#basic_ops_cluster_expansion If you just want to free up a server, you can stop the broker there and start a broker using the same broker id on a new server. Data should be automatically replicated to the new server. The thing is, I don't want to fully free up the server, I just want it to stop hosting all the topic,partition sets it originally did. Here's the flow: broker_1 hosts topic,{1,2,3,4} over time load for the partitions overwhelms broker_1 spin up broker_2 migrate topic,{3,4} to broker_2 successfully broker_1 is only useful if partitions topic,{3,4} are dropped by broker_1 how does one get broker_1 to disassociate, drop, forget, evict...(not sure the verb) topic,{3,4} and let broker_2 own it. Thanks, Jun On Wed, May 21, 2014 at 3:23 PM, David Birdsong david.birds...@gmail.com wrote: Any suggestions? I'm kind of in a bind in that I don't understand how to grow the cluster when more capacity is needed--which happens to be right now for me. The only thing I can think that might work is to create new brokers, cherry-pick topic/partition pairs and move them, then turn off the old ones and forever retire their IDs freeing up my old hardware to come back online as a new kafka broker ID. On Wed, May 21, 2014 at 9:16 AM, David Birdsong david.birds...@gmail.com wrote: On Wed, May 21, 2014 at 9:11 AM, David Birdsong david.birds...@gmail.comwrote: Here's the reassignment json and current describe output: https://gist.github.com/davidbirdsong/32cd0c4f49496a6a32e5 In my re-assignment json, I tried to re-assign to 2 when the repl is set to 3. Once I noticed the the completely new node 133 had appeared in the ISR, I tried stopping 224, wiping kafka completely and then brought 224 back up as 224 again. It promptly replicated the topic, but never joined the ISR. to be clear, i don't want it to join the ISR. i'm curious how to make 224 forget about the partitions for that topic since i have other plans for it. How does one move a replica? This is exactly what I'm trying to do. My pattern is a common one. I started with a set of 3 kafka brokers. The load and space is overwhelming them. I'm trying to add new brokers and spread the partitions to new nodes while removing some of the partitions on the old nodes so as to make room. It's the latter that I don't get how to do. I've conflated two issues here mostly due to needing to get this cluster stable again. - reduce replication - remove a partition from a broker, ie. remove the replica they're very distinct actions, but both would help me in the moment On Wed, May 21, 2014 at 8:56 AM, Jun Rao jun...@gmail.com wrote: During the re-assignment, did you move the replica off the old broker? Thanks, Jun On Wed, May 21, 2014 at 8:21 AM, David Birdsong david.birds...@gmail.comwrote: I did that and so now the topic has 4 replicas for a repl count of 3, but only the 'new' replicas exist in the ISR. The old broker that I want to clear disk space and generally free up resources has fully synced a topic that I want to disassociate from it. Is there a way to do this? On Wed, May 21, 2014 at 7:48 AM, Jun Rao jun...@gmail.com wrote: We don't have an exact tool for doing this. You may be able to do that through http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factorby specifying fewer replicas. Thanks, Jun On Wed, May 21, 2014 at 1:23 AM, David Birdsong david.birds...@gmail.com wrote: Is there a way to reduce the replication count? I'm trying to spread existing partitions across more brokers, but it's hard to decomm a broker. Reducing repl count would suffice for now. Any tips? I'm running a mix of 0.8.1.1 and 0.8.1 (I'm upgrading now.)
Re: Reg Partition
On Sun, Mar 9, 2014 at 6:09 PM, Balasubramanian Jayaraman (Contingent) balasubramanian.jayara...@autodesk.com wrote: Thanks Martin. We are still in the design phase. I wanted to clarify my doubt on the relation between parallelism and partitions. kafka is a distributed, ordered commit log. there are underlying resources that are consumed by kafka--in most cases a disk spindle. the partition is just an abstraction of that underlying resource. administrators need to know about so they can deploy kafka and monitor it correctly and developers need to know about to take full advantage of those resources. -Original Message- From: Martin Kleppmann [mailto:mkleppm...@linkedin.com] Sent: Thursday, March 06, 2014 8:29 PM To: users@kafka.apache.org Subject: Re: Reg Partition You can certainly have several consumers consuming from the same partition: just give each a different consumer group ID, and then all the messages from the partition will be delivered to all of the consumers. If you want each message to only be processed by one of the consumers, you can drop those that you don't want: for example, consumer 1 ignores all messages with an even-numbered offset, and consumer 2 ignores all messages with an odd-numbered offset. However, I don't understand why you want to have multiple consumers on the same partition in the first place. Can't you simply configure your topic to have enough partitions that you can achieve the required parallelism? That's what partitions are for. Martin On 6 Mar 2014, at 01:19, Balasubramanian Jayaraman (Contingent) balasubramanian.jayara...@autodesk.com wrote: Thanks Martin. I got it. The design is considered for Performance improvement. Will there not be any harm if I have some consumers consuming from the same partitions, if I can tolerate slowness/performance degradation? Regards Bala -Original Message- From: Martin Kleppmann [mailto:mkleppm...@linkedin.com] Sent: Wednesday, March 05, 2014 7:52 PM To: users@kafka.apache.org Subject: Re: Reg Partition Hi Bala, The way Kafka works, each partition is a sequence of messages in the order that they were produced, and each message has a position (offset) in this sequence. Kafka brokers don't keep track of which consumer has seen which messages. Instead, each consumer keeps track of the latest offset it has seen: because they are consumed in sequential order, all messages with a smaller offset have been consumed, and all messages with a greater offset have not yet been consumed. Explained in detail here: http://kafka.apache.org/documentation.html#theconsumer If you wanted to have several consumers consume from the same partition, they would have to keep communicating in order to know which one has processed which messages (otherwise they'd end up processing the same message twice). This would be extremely inefficient. It's much easier and much more performant to assign each partition to only one consumer, so each consumer only needs to keep track of its own partition offsets. A consequence of that design is that you cannot have more consumers than partitions. Martin On 5 Mar 2014, at 10:13, Balasubramanian Jayaraman (Contingent) balasubramanian.jayara...@autodesk.com wrote: Hi I have a doubt on the parallelism. Why the number of parallel consumer consuming messages from a topic is restricted on the number of partitions configured for a topic? Why should this be the case. Why should the partition affect the number of parallel consumers? Thanks Bala
Re: Producer fails when old brokers are replaced by new
On Tue, Mar 4, 2014 at 10:33 AM, Guozhang Wang wangg...@gmail.com wrote: Hi David, When you (re)-start the producer/consumer, the broker list is used for bootstrap, so it should guarantee that some of the brokers listed is alive when the client starts. When you migrate from 1,2,3 to 4,5,6 (in fact, in this case it is better to just keep the broker id but just replace the host/port), and then bounce the clients, then it is better to also change the bootstrap broker list since clients are memoryless. Does this make sense? Sure, when we were saying broker {1,2,3} and {4,5,6} I made the mistake of assuming we were referring to names/ips and ports symbolically. That list will have to be updated and correct between process restarts however it's implemented. I agree that using ZK the clients can get ride of the bootstrap broker list, but that would add the dependency of ZooKeeper on the clients code base. So we decided to remove the ZK dependency from the client instead, and if people do want to use ZK for bootstraping, they can always add a simple script which reads the broker list from ZK and give that to the broker config. This is effectively what I'm talking about. A 'wrapper' that discovers the broker id list from zookeeper and then retrieves the ip:ports of those brokers from zookeeper where a wrapper is just more code and not really a script. I should mention that I'm not using the scala/jvm client libs. I'm on my own in golang land and I've had to implement a high-level consumer on my own which has been an adventure. Guozhang On Mon, Mar 3, 2014 at 4:16 PM, David Birdsong david.birds...@gmail.com wrote: On Mon, Mar 3, 2014 at 4:00 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Chris, In 0.9 we will have just one broker list, i.e. the list of brokers read from the config file will be updated during bootstraping and all the future metadata refresh operations. This feature should lift this limit you are describing, for example, if your broker list in config is {1,2,3}, and later on you extend the cluster to {1,2,3,4,5,6}, then now you can shut down 1,2,3 all at once. But if you producer or consumer ever restarts and only knows about {1,2,3}, the problem still exists no? This is why I bootstrap off of zk and expect to have to maintain an accurate list of zk nodes to all processes. Guozhang On Mon, Mar 3, 2014 at 1:35 PM, Christofer Hedbrandh christo...@knewton.com wrote: Thanks again Guozhang. There are still some details here that are unclear to me, but if what I am describing is not a bug, do you think it is reasonable to file this as a feature request? We agree that it is not ideal to have to keep at least one broker in the list is alive, when replacing a cluster i.e. migrating from one set of brokers to another? Christofer On Wed, Feb 26, 2014 at 9:16 PM, Guozhang Wang wangg...@gmail.com wrote: kafka-preferred-replica-election.sh is only used to move leaders between brokers, as long as the broker in the broker.metadata.list, i.e. the second broker list I mentioned in previous email is still alive then the producer can learn the leader change from it. In terms of broker discovery, I think it depends on how you define the future. For example, originally there are 3 brokers 1,2,3, and you start the producer with metadata list = {1,2,3}, and later on another three brokers 4,5,6 are added, the producer can still find these newly added brokers. It is just that if all the brokers in the metadata list, i.e. 1,2,3 are gone, then the producer will not be able to refresh its metadata. Guozhang On Wed, Feb 26, 2014 at 11:04 AM, Christofer Hedbrandh christo...@knewton.com wrote: Thanks for your response Guozhang. I did make sure that new meta data is fetched before taking out the old broker. I set the topic.metadata.refresh.interval.ms to something very low, and I confirm in the producer log that new meta data is actually fetched, after the new broker is brought up, and before the old broker is taken down. Does this not mean that the dynamic current brokers list would hold the new broker at this point? If you are saying that the dynamic current brokers list is never used for fetching meta data, this does not explain how the producer does NOT fail when kafka-preferred-replica-election.sh makes the new broker become the leader. Lastly, if broker discovery is not a producer feature in 0.8.0 Release, and I have to make sure at least one broker in the list is alive during the rolling bounce, is this a feature you are considering for future versions
Re: Producer fails when old brokers are replaced by new
On Mon, Mar 3, 2014 at 4:00 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Chris, In 0.9 we will have just one broker list, i.e. the list of brokers read from the config file will be updated during bootstraping and all the future metadata refresh operations. This feature should lift this limit you are describing, for example, if your broker list in config is {1,2,3}, and later on you extend the cluster to {1,2,3,4,5,6}, then now you can shut down 1,2,3 all at once. But if you producer or consumer ever restarts and only knows about {1,2,3}, the problem still exists no? This is why I bootstrap off of zk and expect to have to maintain an accurate list of zk nodes to all processes. Guozhang On Mon, Mar 3, 2014 at 1:35 PM, Christofer Hedbrandh christo...@knewton.com wrote: Thanks again Guozhang. There are still some details here that are unclear to me, but if what I am describing is not a bug, do you think it is reasonable to file this as a feature request? We agree that it is not ideal to have to keep at least one broker in the list is alive, when replacing a cluster i.e. migrating from one set of brokers to another? Christofer On Wed, Feb 26, 2014 at 9:16 PM, Guozhang Wang wangg...@gmail.com wrote: kafka-preferred-replica-election.sh is only used to move leaders between brokers, as long as the broker in the broker.metadata.list, i.e. the second broker list I mentioned in previous email is still alive then the producer can learn the leader change from it. In terms of broker discovery, I think it depends on how you define the future. For example, originally there are 3 brokers 1,2,3, and you start the producer with metadata list = {1,2,3}, and later on another three brokers 4,5,6 are added, the producer can still find these newly added brokers. It is just that if all the brokers in the metadata list, i.e. 1,2,3 are gone, then the producer will not be able to refresh its metadata. Guozhang On Wed, Feb 26, 2014 at 11:04 AM, Christofer Hedbrandh christo...@knewton.com wrote: Thanks for your response Guozhang. I did make sure that new meta data is fetched before taking out the old broker. I set the topic.metadata.refresh.interval.ms to something very low, and I confirm in the producer log that new meta data is actually fetched, after the new broker is brought up, and before the old broker is taken down. Does this not mean that the dynamic current brokers list would hold the new broker at this point? If you are saying that the dynamic current brokers list is never used for fetching meta data, this does not explain how the producer does NOT fail when kafka-preferred-replica-election.sh makes the new broker become the leader. Lastly, if broker discovery is not a producer feature in 0.8.0 Release, and I have to make sure at least one broker in the list is alive during the rolling bounce, is this a feature you are considering for future versions? On Wed, Feb 26, 2014 at 12:04 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Chris, The broker.metadata.list, once read in at start up time, will not be changed. In other words, during the life time of a producer it has two lists of brokers: 1. The current brokers in the cluster that is returned in the metadata request response, which is dynamic 2. The broker list that is used for bootstraping, this is read from broker.metadata.list and is fixed. This list could for example be a VIP and a hardware load balancer behind it will distribute the metadata requests to the brokers. So in your case, the metadata list only has broker B, and once it is taken out and the producer failed to send message to it and hence tries to refresh its metadata, it has no broker to go. Therefore, when you are trying to do a rolling bounce of the cluster to, for example, do a in-place upgrade, you need to make sure at least one broker in the list is alive during the rolling bounce. Hope this helps. Guozhang On Wed, Feb 26, 2014 at 8:19 AM, Christofer Hedbrandh christo...@knewton.com wrote: Hi all, I ran into a problem with the Kafka producer when attempting to replace all the nodes in a 0.8.0 Beta1 Release Kafka cluster, with 0.8.0 Release nodes. I started a producer/consumer test program to measure the clusters performance during the process, I added new brokers, I ran kafka-reassign-partitions.sh, and I removed the old brokers. When I removed the old brokers my producer failed. The simplest scenario that I could come up with where I still see this behavior is this. Using version 0.8.0
Re: Reg Exception in Kafka
Yeah, I've only skimmed this, but I think I might have something. All non-vpc type ec2 nodes come with an external IP address and an internal IP address. The external IP address is what grants the node access to the internet--makes it publicly routable. The mechanism by which the external IP address isn't fully disclosed, but one can infer that there exists some NAT device off the instance that maps any inbound traffic to the external IP to the EC2 instances internal IP. When deploying distributed systems that employ service registry(zk) in EC2, I advise choosing between connecting only on the internal network or only on the external network. The internal network is usually my preference for simple privacy reasons. ..anyway, this means all *internal* name resolution needs to be figured out by using something like /etc/hosts or using all the internal names and the name services provide by AWS(or for the intrepid, set up internal DNS servers.) If you go the internal route, then put all of your nodes in the same security group. The issue of timing out network requests smacks of a kafka node (be it a broker or producer, I didn't parse which one) either picking up an external IP address from zookeeper or the security groups not allowing access on the ports needed. The default behavior of AWS security groups is to DROP which can be confusing when you're trying to figure something out Lastly, the mention of needing to connect to kafka(in ec2) from your office desktop makes me think you need to step back and detangle this deployment. Running distributed services in the cloud doesn't magically solve the connectivity issue--especially for distributed systems that register themselves in zookeeper. - are you running zookeeper *and* kafka across public ip addresses? if no, then do you have a vpn to connect your 'private' network in ec2? - are you running zookeeper *and* kafka across private ip addresses in ec2? if so, can they all connect with no timeouts? if so, try running a consumer on an ec2 node. I'll throw out a scenario that I expect you might be in: - zk quorum in ec2, all connected to each other across private ip addresses - kafka cluster registered into zk, broker id's list either ip addreses or 'private' names that ec2 provides - you punched a hole zookeeper connections in your security group in ec2 - your desktop connects to zookeeper over a public ip - assuming that works - your desktop finds private names or ip addresses in zk for kafka brokers - your desktop can't resolve those internal names, if it could it wouldn't be able to route to the ip addresses Hope this helps, good luck. On Sun, Feb 23, 2014 at 9:16 PM, Jun Rao jun...@gmail.com wrote: Hmm, then I am not sure what happened. Anyone with EC2 experience can help? Thanks, Jun On Sun, Feb 23, 2014 at 6:00 PM, Balasubramanian Jayaraman (Contingent) balasubramanian.jayara...@autodesk.com wrote: The ports are already added in the security group. I am able to telnet from the same machine I am running the producer/consume test. Is there any configuration I missed? Thanks Bala -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Saturday, February 22, 2014 12:10 AM To: users@kafka.apache.org Subject: Re: Reg Exception in Kafka Maybe you need to add the port to the right security group? Thanks, Jun On Thu, Feb 20, 2014 at 9:58 PM, Balasubramanian Jayaraman (Contingent) balasubramanian.jayara...@autodesk.com wrote: One point to note is that, I am trying to access the Kafka broker (located in Amazon Cloud, EC2 instance) from the Eclipse (located in my office). I am using Kafka from the trunk . Thanks Bala -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Friday, 21 February, 2014 1:51 PM To: users@kafka.apache.org Subject: Re: Reg Exception in Kafka That explains why your producer hits connection timeout. Not sure why the controller to broker connection also times out though, if you can manually establish the connection. Thanks, Jun On Thu, Feb 20, 2014 at 7:37 PM, Balasubramanian Jayaraman (Contingent) balasubramanian.jayara...@autodesk.com wrote: All the brokers reside in the same server and are listening on the port 10092,10093,10094. From the same machine I can connect to the zookeeper and the brokers. But When I tried to connect from an external machine (from Eclipse), I get an exception as communicated earlier. I was not able to connect to any of the brokers. I get the same exception while connecting to all the brokers. Regards Bala -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Thursday, 20 February, 2014 12:05 AM To: users@kafka.apache.org Subject: Re: Reg Exception in Kafka Can you make the connection from the controller host to the other broker? Also, what's the
high-level consumer design
Hey All, I've been cobbling together a high-level consumer for golang building on top of Shopify's Sarama package and wanted to run the basic design by the list and get some feedback or pointers on things I've missed or will eventually encounter on my own. I'm using zookeeper to coordinate topic-partition owners for consumer members in each consumer group. I followed the znode layout that's apparent from watching the console consumer. consumer_root/consumer_group_name/{offsets,owners,ids}. The consumer uses an outer loop to discover the partition list for a given topic, attempts to grab a zookeeper lock on each (topic,partition) tuple, and then for each (topic, partition) it successfully locks, launches a thread (goroutine) for each partition to read the partition stream. The outer loop continues to watch for children events either of: consumer_root/consumer_group/owners/topic_namekafka_root/brokers/topics/topic_name/partitions ...any watch event that fires causes all offset data and consumer handles to be flushed and closed, goroutines watching topic-partitions exit. The loop is restarted. Another thread reads topic-partition-offset data and flushes the offset to:consumer_root/consumer_group/offsets/topic_name/partition_number Have I oversimplified or missed any critical steps?
Re: high-level consumer design
On Mon, Jan 27, 2014 at 4:19 PM, Guozhang Wang wangg...@gmail.com wrote: Hello David, One thing about using ZK locks to own a partition is load balancing. If you are unlucky some consumer may get all the locks and some may get none, hence have no partitions to consume. I've considered this and even encountered it in testing. For our current load levels, we won't hurt us, but if there's a good solution, I'd rather codify smooth consumer balance. Got any suggestions? My thinking thus far is to establish some sort of identity on the consumer and derive an evenness or oddness or some modulo value that induces a small delay when encountering particular partition numbers. It's a hacky idea, but is pretty simple and might be good enough for smoothing consumers. Also you may need some synchronization between the consumer thread with the offset thread. For example, when an event is fired and the consumers need to re-try grabbing the locks, it needs to first stop current fetchers, commit offsets, and then start owning new partitions. This is current design and what I have implemented so far. The last thread to exit is the offset thread and it has a direct communication channel to the consumer threads so it waits for those channels to be closed before it's last flush and exit. Guozhang Thanks for the input! On Mon, Jan 27, 2014 at 3:03 PM, David Birdsong david.birds...@gmail.com wrote: Hey All, I've been cobbling together a high-level consumer for golang building on top of Shopify's Sarama package and wanted to run the basic design by the list and get some feedback or pointers on things I've missed or will eventually encounter on my own. I'm using zookeeper to coordinate topic-partition owners for consumer members in each consumer group. I followed the znode layout that's apparent from watching the console consumer. consumer_root/consumer_group_name/{offsets,owners,ids}. The consumer uses an outer loop to discover the partition list for a given topic, attempts to grab a zookeeper lock on each (topic,partition) tuple, and then for each (topic, partition) it successfully locks, launches a thread (goroutine) for each partition to read the partition stream. The outer loop continues to watch for children events either of: consumer_root/consumer_group/owners/topic_namekafka_root/brokers/topics/topic_name/partitions ...any watch event that fires causes all offset data and consumer handles to be flushed and closed, goroutines watching topic-partitions exit. The loop is restarted. Another thread reads topic-partition-offset data and flushes the offset to:consumer_root/consumer_group/offsets/topic_name/partition_number Have I oversimplified or missed any critical steps? -- -- Guozhang
Re: cluster expansion
On Fri, Dec 13, 2013 at 11:21 AM, Neha Narkhede neha.narkh...@gmail.comwrote: Partition movement is not an automatic operation in Kafka yet. You need to use the partition reassignment tool - https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool . Also, that feature is stable in 0.8.1. What's the best way to try out 0.8.1? I have a git clone from git://git.apache.org/kafka.git but none of the branches or tags imply 0.8.1 Is it just HEAD from trunk? Thanks, Neha On Fri, Dec 13, 2013 at 6:48 AM, Robert Turner rob...@bigfoot.com wrote: No the 6 partitions for each topic will remain on the original brokers. You could either reassign some partitions from all topics to the new brokers or you could add partitions to the new brokers for each topic. In 0.8.0 there is now an add-partitions tool. Cheers Rob Turner. On 13 December 2013 14:42, Yu, Libo libo...@citi.com wrote: Hi folks, There are three brokers running 0.8-beta1 in our cluster currently. Assume all the topics have six partitions. I am going to add another three brokers to the cluster and upgrade all of them to 0.8. My question is after the cluster is up, will the partition be evenly distributed to all brokers? Thanks. Regards, Libo -- Cheers Rob.
partition reassignment stuck on 0.8
I was running a 2-node kafka cluster off github trunnk at: eedbea6526986783257ad0e025c451a8ee3d9095 ...for a few weeks with no issues. I recently downloaded the 0.8 stable version, configured/started two new brokers with 0.8. I successfully reassigned all but 1 partition from the older pair to the newer pair, but have 1 partition seemingly stuck on an the old leader. The replicas, ISR, and leader are all the same--no extra nodes are replicating this last partition--this was true before any changes. I came across this thread: http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3ccacnty1ddbjse1bxrj1ertrxi+zbz3wawyvjdevvjpootnyo...@mail.gmail.com%3E ..and unlike the poster, I'm free to play fast and loose, so I built off of trunk at: dd58d753ce3ffb41776a6fa6322cb822f500 I first upgraded one of the desired target ISR's and after a few minutes upgraded the existing leader and bounced it, temporarily breaking that partition--no luck. I'm at a loss as to how to recover this partition's data; short of the data being recovered, how to even regain use of the partition. The data's not critical, this was just an exercise in gaining operation familiarity w/ kafka. I can't find any docs on how to get out of this situation.
Re: partition reassignment stuck on 0.8
On Thu, Dec 12, 2013 at 9:28 PM, Jun Rao jun...@gmail.com wrote: Could you try starting from scratch again? The recent fix that we had may not be able to recover a cluster already in an inconsistent state. Thanks, Jun How does one start from scratch? Wipe ZK, is there some state file? I have other topics that aren't problematic. Is there no topic-level emergency button to push? On Thu, Dec 12, 2013 at 8:45 PM, David Birdsong david.birds...@gmail.com wrote: I was running a 2-node kafka cluster off github trunnk at: eedbea6526986783257ad0e025c451a8ee3d9095 ...for a few weeks with no issues. I recently downloaded the 0.8 stable version, configured/started two new brokers with 0.8. I successfully reassigned all but 1 partition from the older pair to the newer pair, but have 1 partition seemingly stuck on an the old leader. The replicas, ISR, and leader are all the same--no extra nodes are replicating this last partition--this was true before any changes. I came across this thread: http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3ccacnty1ddbjse1bxrj1ertrxi+zbz3wawyvjdevvjpootnyo...@mail.gmail.com%3E ..and unlike the poster, I'm free to play fast and loose, so I built off of trunk at: dd58d753ce3ffb41776a6fa6322cb822f500 I first upgraded one of the desired target ISR's and after a few minutes upgraded the existing leader and bounced it, temporarily breaking that partition--no luck. I'm at a loss as to how to recover this partition's data; short of the data being recovered, how to even regain use of the partition. The data's not critical, this was just an exercise in gaining operation familiarity w/ kafka. I can't find any docs on how to get out of this situation.
Re: partition reassignment stuck on 0.8
On Thu, Dec 12, 2013 at 9:28 PM, Guozhang Wang wangg...@gmail.com wrote: David, Could you try to see if this is due to https://issues.apache.org/jira/browse/KAFKA-1178? Guozhang Which node do I look for this on? Leader? ISR-candidate? Controller? On Thu, Dec 12, 2013 at 8:45 PM, David Birdsong david.birds...@gmail.com wrote: I was running a 2-node kafka cluster off github trunnk at: eedbea6526986783257ad0e025c451a8ee3d9095 ...for a few weeks with no issues. I recently downloaded the 0.8 stable version, configured/started two new brokers with 0.8. I successfully reassigned all but 1 partition from the older pair to the newer pair, but have 1 partition seemingly stuck on an the old leader. The replicas, ISR, and leader are all the same--no extra nodes are replicating this last partition--this was true before any changes. I came across this thread: http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3ccacnty1ddbjse1bxrj1ertrxi+zbz3wawyvjdevvjpootnyo...@mail.gmail.com%3E ..and unlike the poster, I'm free to play fast and loose, so I built off of trunk at: dd58d753ce3ffb41776a6fa6322cb822f500 I first upgraded one of the desired target ISR's and after a few minutes upgraded the existing leader and bounced it, temporarily breaking that partition--no luck. I'm at a loss as to how to recover this partition's data; short of the data being recovered, how to even regain use of the partition. The data's not critical, this was just an exercise in gaining operation familiarity w/ kafka. I can't find any docs on how to get out of this situation. -- -- Guozhang
Re: partition reassignment stuck on 0.8
On Thu, Dec 12, 2013 at 9:46 PM, Jun Rao jun...@gmail.com wrote: Since we don't support delete topics yet, you would have to wipe out all ZK and kafka logs. Thanks, Jun got it and done. so it sounds like i should run a number of disparate clusters to spread risk for topics since a partition is an SPOF. On Thu, Dec 12, 2013 at 9:32 PM, David Birdsong david.birds...@gmail.com wrote: On Thu, Dec 12, 2013 at 9:28 PM, Jun Rao jun...@gmail.com wrote: Could you try starting from scratch again? The recent fix that we had may not be able to recover a cluster already in an inconsistent state. Thanks, Jun How does one start from scratch? Wipe ZK, is there some state file? I have other topics that aren't problematic. Is there no topic-level emergency button to push? On Thu, Dec 12, 2013 at 8:45 PM, David Birdsong david.birds...@gmail.com wrote: I was running a 2-node kafka cluster off github trunnk at: eedbea6526986783257ad0e025c451a8ee3d9095 ...for a few weeks with no issues. I recently downloaded the 0.8 stable version, configured/started two new brokers with 0.8. I successfully reassigned all but 1 partition from the older pair to the newer pair, but have 1 partition seemingly stuck on an the old leader. The replicas, ISR, and leader are all the same--no extra nodes are replicating this last partition--this was true before any changes. I came across this thread: http://mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/%3ccacnty1ddbjse1bxrj1ertrxi+zbz3wawyvjdevvjpootnyo...@mail.gmail.com%3E ..and unlike the poster, I'm free to play fast and loose, so I built off of trunk at: dd58d753ce3ffb41776a6fa6322cb822f500 I first upgraded one of the desired target ISR's and after a few minutes upgraded the existing leader and bounced it, temporarily breaking that partition--no luck. I'm at a loss as to how to recover this partition's data; short of the data being recovered, how to even regain use of the partition. The data's not critical, this was just an exercise in gaining operation familiarity w/ kafka. I can't find any docs on how to get out of this situation.