Re: Kafka with Zookeeper behind AWS ELB
Luigi, I strongly urge you to consider a 5 node ZK deployment. I've always done that in the past for resiliency during maintenance. In a 3 node cluster, you can only tolerate one "failure", so if you bring one node down for maintenance and another node crashes during said maintenance, your ZK cluster is down. All the deployments I've had were 5 nodes of ZK and 5 nodes of Kafka. - Pradeep On Thu, Jul 20, 2017 at 9:12 AM, Luigi Tagliamonte < luigi.tagliamont...@gmail.com> wrote: > Yes Andrey, > you can use an ENI without EIP on AWS if you only want a private address. > > After some consideration, I think that growing the zookeeper cluster more > than 3 nodes is really unlikely so I think that I will attach 3 ENI to 3 > servers in autoscaling and I will configure Kafka in using this 3 IPs. > In this way I can get rid of the additional ELB/Haproxy layer, if I will > ever need to grow the zk ensemble I will re-engineering the solution. > > I'm wondering if reusing an old IP on a brand new zk node will create > issues in the ensemble. > Is anybody here aware of possible drawbacks? > > On Wed, Jul 19, 2017 at 11:58 PM, Andrey Dyachkov < > andrey.dyach...@gmail.com > > wrote: > > > The problem with EIP it is a public ip. > > Another option is to have the secondary interface attached to the > instance > > on start(or a bit later) with the private static ip, but we are > > investigating the possibility. > > On Wed 19. Jul 2017 at 23:38, Luigi Tagliamonte < > > luigi.tagliamont...@gmail.com> wrote: > > > > > Hello Andrey, > > > I see that the ELB is not going to help directly with the bug, but > > > introduces a nice layer that makes zookeeper DNS management easier. > > > Introducing and ELB I don't have to deal with keep DNS in sync for all > > the > > > servers in the zk ensemble. > > > For the moment I can use an HAproxy with EIP and when the bug is > solved I > > > can move to ELB. > > > What do you think about it? > > > Regards > > > L. > > > > > > On Wed, Jul 19, 2017 at 2:16 PM, Andrey Dyachkov < > > > andrey.dyach...@gmail.com> > > > wrote: > > > > > > > Hi, > > > > I have just posted almost the same question in dev list. > > > > Zookeeper client resolves address only once, on start, introducing > ELB > > > > won't really help here (ELBs can be replaced, which involved ip > > change), > > > > but I am eager to know if there is a solution for that. > > > > > > > > On Wed, 19 Jul 2017 at 23:08 Luigi Tagliamonte < > > > > luigi.tagliamont...@gmail.com> wrote: > > > > > > > > > Hello, Users! > > > > > I'm designing a Kafka deployment on AWS and it's my first time > > working > > > > with > > > > > Kafka and Zookeeper so I've collected a lot of info so far but also > > > some > > > > > questions that I would love to submit to a much expert audience > like > > > you. > > > > > > > > > > I have been experimenting with exhibitor and zookeeper in auto > > scaling > > > > > group and the exhibitor orchestration seems to work so far. > > > > > > > > > > I was trying to find a way to configure zookeeper servers in Kafka > > conf > > > > and > > > > > do not have to reconfigure them in case a zookeeper node needs to > be > > > > > replaced/dies, so i taught of course of using DNS but then I read > > that > > > > > zkclient library used by Kafka has this bug: > > > > > https://issues.apache.org/jira/browse/ZOOKEEPER-2184. > > > > > > > > > > So I'm now thinking about using an ELB in front of the zookeeper > > > cluster. > > > > > Teorically on how zookeeper client should work there should be no > > > problem > > > > > but I'm wondering if any of you used that and how is the outcome? > > > > > > > > > -- > > > > > > > > With great enthusiasm, > > > > Andrey > > > > > > > > > -- > > > > With great enthusiasm, > > Andrey > > >
Re: Scaling up kafka consumers
A single partition can be consumed by at most a single consumer. Consumers compete to take ownership of a partition. So, in order to gain parallelism you need to add more partitions. There is a library that allows multiple consumers to consume from a single partition https://github.com/gerritjvv/kafka-fast. But I've never used it. On Fri, Feb 24, 2017 at 7:30 AM, Jakub Stranskywrote: > Hello everyone, > > I was reading/checking kafka documentation regarding point-2-point and > publish subscribe communications patterns in kafka and I am wondering how > to scale up consumer side in point to point scenario when consuming from > single kafka topic. > > Let say I have a single topic with single partition and I have one node > where the kafka consumer is running. If I want to scale up my service I add > another node - which has the same configuration as the first one (topic, > partition and consumer group id). Those two nodes start competing for > messages from kafka topic. > > What I am not sure in this scenario and is actually subject of my question > is "*Whether they do get each node unique messages or there is still > possibility that some messages will be consumed by both nodes etc*". > Because I can see scenarios that both nodes are started at the same time - > they gets the same topic offset from zookeeper and started consuming > messages from that offset. OR am I thinking in a wrong direction? > > Thanks > Jakub >
Re: How does one deploy to consumers without causing re-balancing for real time use case?
I believe if you're calling the .close() method on shutdown, then the LeaveGroupRequest will be made. If you're doing a kill -9, I'm not sure if that request will be made. On Fri, Feb 10, 2017 at 8:47 AM, Praveen <praveev...@gmail.com> wrote: > @Pradeep - I just read your thread, the 1hr pause was when all the > consumers where shutdown simultaneously. I'm testing out rolling restart > to get the actual numbers. The initial numbers are promising. > > `STOP (1) (1min later kicks off) -> REBALANCE -> START (1) -> REBALANCE > (takes 1min to get a partition)` > > In your thread, Ewen says - > > "The LeaveGroupRequest is only sent on a graceful shutdown. If a > consumer knows it is going to > shutdown, it is good to proactively make sure the group knows it needs to > rebalance work because some of the partitions that were handled by the > consumer need to be handled by some other group members." > > So does this mean that the consumer should inform the group ahead of > time before it goes down? Currently, I just shutdown the process. > > > On Fri, Feb 10, 2017 at 8:35 AM, Pradeep Gollakota <pradeep...@gmail.com> > wrote: > > > I asked a similar question a while ago. There doesn't appear to be a way > to > > not triggering the rebalance. But I'm not sure why it would be taking > > 1hr > > in your case. For us it was pretty fast. > > > > https://www.mail-archive.com/users@kafka.apache.org/msg23925.html > > > > > > > > On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG < > > krzysztof.lesniew...@nexiot.ch> wrote: > > > > > Would be great to get some input on it. > > > > > > - Krzysztof Lesniewski > > > > > > > > > On 06.02.2017 08:27, Praveen wrote: > > > > > >> I have a 16 broker kafka cluster. There is a topic with 32 partitions > > >> containing real time data and on the other side, I have 32 boxes w/ 1 > > >> consumer reading from these partitions. > > >> > > >> Today our deployment strategy is stop, deploy and start the processes > on > > >> all the 32 consumers. This triggers re-balancing and takes a long > period > > >> of > > >> time (> 1hr). Such a long pause isn't good for real time processing. > > >> > > >> I was thinking of rolling deploy but I think that will still cause > > >> re-balancing b/c we will still have consumers go down and come up. > > >> > > >> How do you deploy to consumers without triggering re-balancing (or > > >> triggering one that doesn't affect your SLA) when doing real time > > >> processing? > > >> > > >> Thanks, > > >> Praveen > > >> > > >> > > > > > >
Re: How does one deploy to consumers without causing re-balancing for real time use case?
I asked a similar question a while ago. There doesn't appear to be a way to not triggering the rebalance. But I'm not sure why it would be taking > 1hr in your case. For us it was pretty fast. https://www.mail-archive.com/users@kafka.apache.org/msg23925.html On Fri, Feb 10, 2017 at 4:28 AM, Krzysztof Lesniewski, Nexiot AG < krzysztof.lesniew...@nexiot.ch> wrote: > Would be great to get some input on it. > > - Krzysztof Lesniewski > > > On 06.02.2017 08:27, Praveen wrote: > >> I have a 16 broker kafka cluster. There is a topic with 32 partitions >> containing real time data and on the other side, I have 32 boxes w/ 1 >> consumer reading from these partitions. >> >> Today our deployment strategy is stop, deploy and start the processes on >> all the 32 consumers. This triggers re-balancing and takes a long period >> of >> time (> 1hr). Such a long pause isn't good for real time processing. >> >> I was thinking of rolling deploy but I think that will still cause >> re-balancing b/c we will still have consumers go down and come up. >> >> How do you deploy to consumers without triggering re-balancing (or >> triggering one that doesn't affect your SLA) when doing real time >> processing? >> >> Thanks, >> Praveen >> >> >
Re: Consumer Rebalancing Question
What I mean by "flapping" in this context is unnecessary rebalancing happening. The example I would give is what a Hadoop Datanode would do in case of a shutdown. By default, it will wait 10 minutes before replicating the blocks owned by the Datanode so routine maintenance wouldn't cause unnecessary shuffling of blocks. In this context, if I'm performing a rolling restart, as soon as worker 1 shuts down, it's work is picked up by other workers. But worker 1 comes back 3 seconds (or whatever) later and requests the work back. Then worker 2 goes down and it's work is assigned to other workers for 3 seconds before yet another rebalance. So, in theory, the order of operations will look something like this: STOP (1) -> REBALANCE -> START (1) -> REBALANCE -> STOP (2) -> REBALANCE -> START (2) -> REBALANCE -> >From what I understand, there's currently no way to prevent this type of shuffling of partitions from worker to worker while the consumers are under maintenance. I'm also not sure if this an issue I don't need to worry about. - Pradeep On Thu, Jan 5, 2017 at 8:29 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > Not sure I understand your question about flapping. The LeaveGroupRequest > is only sent on a graceful shutdown. If a consumer knows it is going to > shutdown, it is good to proactively make sure the group knows it needs to > rebalance work because some of the partitions that were handled by the > consumer need to be handled by some other group members. > > There's no "flapping" in the sense that the leave group requests should > just inform the other members that they need to take over some of the work. > I would normally think of "flapping" as meaning that things start/stop > unnecessarily. In this case, *someone* needs to deal with the rebalance and > pick up the work being dropped by the worker. There's no flapping because > it's a one-time event -- one worker is shutting down, decides to drop the > work, and a rebalance sorts it out and reassigns it to another member of > the group. This happens once and then the "issue" is resolved without any > additional interruptions. > > -Ewen > > On Thu, Jan 5, 2017 at 3:01 PM, Pradeep Gollakota <pradeep...@gmail.com> > wrote: > > > I see... doesn't that cause flapping though? > > > > On Wed, Jan 4, 2017 at 8:22 PM, Ewen Cheslack-Postava <e...@confluent.io > > > > wrote: > > > > > The coordinator will immediately move the group into a rebalance if it > > > needs it. The reason LeaveGroupRequest was added was to avoid having to > > > wait for the session timeout before completing a rebalance. So aside > from > > > the latency of cleanup/committing offests/rejoining after a heartbeat, > > > rolling bounces should be fast for consumer groups. > > > > > > -Ewen > > > > > > On Wed, Jan 4, 2017 at 5:19 PM, Pradeep Gollakota < > pradeep...@gmail.com> > > > wrote: > > > > > > > Hi Kafka folks! > > > > > > > > When a consumer is closed, it will issue a LeaveGroupRequest. Does > > anyone > > > > know how long the coordinator waits before reassigning the partitions > > > that > > > > were assigned to the leaving consumer to a new consumer? I ask > because > > > I'm > > > > trying to understand the behavior of consumers if you're doing a > > rolling > > > > restart. > > > > > > > > Thanks! > > > > Pradeep > > > > > > > > > >
Consumer Rebalancing Question
Hi Kafka folks! When a consumer is closed, it will issue a LeaveGroupRequest. Does anyone know how long the coordinator waits before reassigning the partitions that were assigned to the leaving consumer to a new consumer? I ask because I'm trying to understand the behavior of consumers if you're doing a rolling restart. Thanks! Pradeep
Re: kafka + autoscaling groups fuckery
Just out of curiosity, if you guys are in AWS for everything, why not use Kinesis? On Tue, Jun 28, 2016 at 3:49 PM, Charity Majorswrote: > Hi there, > > I just finished implementing kafka + autoscaling groups in a way that made > sense to me. I have a _lot_ of experience with ASGs and various storage > types but I'm a kafka noob (about 4-5 months of using in development and > staging and pre-launch production). > > It seems to be working fine from the Kafka POV but causing troubling side > effects elsewhere that I don't understand. I don't know enough about Kafka > to know if my implementation is just fundamentally flawed for some reason, > or if so how and why. > > My process is basically this: > > - *Terminate a node*, or increment the size of the ASG by one. (I'm not > doing any graceful shutdowns because I don't want to rely on graceful > shutdowns, and I'm not attempting to act upon more than one node at a > time. Planning on doing a ZK lock or something later to enforce one > process at a time, if I can work the major kinks out.) > > - *Firstboot script,* which runs on all hosts from rc.init. (We run ASGs > for *everything.) It infers things like the chef role, environment, > cluster name, etc, registers DNS, bootstraps and runs chef-client, etc. > For storage nodes, it formats and mounts a PIOPS volume under the right > mount point, or just remounts the volume if it already contains data. Etc. > > - *Run a balancing script from firstboot* on kafka nodes. It checks to > see how many brokers there are and what their ids are, and checks for any > underbalanced partitions with less than 3 ISRs. Then we generate a new > assignment file for rebalancing partitions, and execute it. We watch on > the host for all the partitions to finish rebalancing, then complete. > > *- So far so good*. I have repeatedly killed kafka nodes and had them > come up, rebalance the cluster, and everything on the kafka side looks > healthy. All the partitions have the correct number of ISRs, etc. > > But after doing this, we have repeatedly gotten into a state where > consumers that are pulling off the kafka partitions enter a weird state > where their last known offset is *ahead* of the last known offset for that > partition, and we can't recover from it. > > *A example.* Last night I terminated ... I think it was broker 1002 or > 1005, and it came back up as broker 1009. It rebalanced on boot, > everything looked good from the kafka side. This morning we noticed that > the storage node that maps to partition 5 has been broken for like 22 > hours, it thinks the next offset is too far ahead / out of bounds so > stopped consuming. This happened shortly after broker 1009 came online and > the consumer caught up. > > From the storage node log: > > time="2016-06-28T21:51:48.286035635Z" level=info msg="Serving at > 0.0.0.0:8089..." > time="2016-06-28T21:51:48.293946529Z" level=error msg="Error creating > consumer" error="kafka server: The requested offset is outside the range of > offsets maintained by the server for the given topic/partition." > time="2016-06-28T21:51:48.294532365Z" level=error msg="Failed to start > services: kafka server: The requested offset is outside the range of > offsets maintained by the server for the given topic/partition." > time="2016-06-28T21:51:48.29461156Z" level=info msg="Shutting down..." > > From the mysql mapping of partitions to storage nodes/statuses: > > PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$ > hound-kennel > > Listing by default. Use -action setstate, addslot, removeslot, removenode> for other actions > > PartStatus Last UpdatedHostname > 0 live2016-06-28 22:29:10 + UTC retriever-772045ec > 1 live2016-06-28 22:29:29 + UTC retriever-75e0e4f2 > 2 live2016-06-28 22:29:25 + UTC retriever-78804480 > 3 live2016-06-28 22:30:01 + UTC retriever-c0da5f85 > 4 live2016-06-28 22:29:42 + UTC retriever-122c6d8e > 5 2016-06-28 21:53:48 + UTC > > > PRODUCTION ubuntu@retriever-112c6d8d:/srv/hound/retriever/log$ > hound-kennel -partition 5 -action nextoffset > > Next offset for partition 5: 12040353 > > > Interestingly, the primary for partition 5 is 1004, and its follower is > the new node 1009. (Partition 2 has 1009 as its leader and 1004 as its > follower, and seems just fine.) > > I've attached all the kafka logs for the broker 1009 node since it > launched yesterday. > > I guess my main question is: *Is there something I am fundamentally > missing about the kafka model that makes it it not play well with > autoscaling?* I see a couple of other people on the internet talking > about using ASGs with kafka, but always in the context of maintaining a > list of broker ids and reusing them. > > *I don't want to do that. I want the path
Re: Datacenter to datacenter over the open internet
At Lithium, we have multiple datacenters and we distcp our data across our Hadoop clusters. We have 2 DCs in NA and 1 in EU. We have a non-redundant direct connect from our EU cluster to one of our NA DCs. If and when this fails, we have automatic failover to a VPN that goes over the internet. The amount of data thats moving across the clusters is not much, so we can get away with this. We don't have Kafka replication setup yet, but we will be setting it up using Mirror Maker and the same constraints apply. Of course opening up your Kafka cluster to be reachable by the internet would work too, but IMHO a VPN is more secure and reduces the surface area of your infrastructure that could come under attack. It sucks that you can't get your executives on board for a p2p direct connect as that is the best solution. On Tue, Oct 6, 2015 at 5:48 PM, Gwen Shapirawrote: > You can configure "advertised.host.name" for each broker, which is the > name > external consumers and producers will use to refer to the brokers. > > On Tue, Oct 6, 2015 at 3:31 PM, Tom Brown wrote: > > > Hello, > > > > How do you consume a kafka topic from a remote location without a > dedicated > > connection? How do you protect the server? > > > > The setup: data streams into our datacenter. We process it, and publish > it > > to a kafka cluster. The consumer is located in a different datacenter > with > > no direct connection. The most efficient scenario would be to setup a > > point-to-point link but that idea has no traction with our executives. We > > can setup a VPN; While functional, our IT department assures us that it > > won't be able to scale. > > > > What we're currently planning is to expose the kafka cluster IP addresses > > to the internet, and only allow access via firewall. Each message will be > > encrypted with a shared private key, so we're not worried about messages > > being intercepted. What we are worried about is this: how brokers refer > to > > each other-- when a broker directs the consumer to the server that is in > > charge of a particular region, does it use the host name (that could be > > externally mapped to the public IP) or does it use the detected/private > IP > > address. > > > > What solution would you use to consume a remote cluster? > > > > --Tom > > >
Re: Dealing with large messages
Thanks for the replies! I was rather hoping not to have to implement a side channel solution. :/ If we have to do this, we may use an HBase table with a TTL the same as our topic so the large objects are "gc'ed"... thoughts? On Tue, Oct 6, 2015 at 8:45 AM, Gwen Shapira <g...@confluent.io> wrote: > Storing large blobs in S3 or HDFS and placing URIs in Kafka is the most > common solution I've seen in use. > > On Tue, Oct 6, 2015 at 8:32 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > The best practice I think is to just put large objects in a blob store > > and have messages embed references to those blobs. Interestingly we > > ended up having to implement large-message-support at LinkedIn but for > > various reasons were forced to put messages inline (i.e., against the > > above recommendation). So we ended up having to break up large > > messages into smaller chunks. This obviously adds considerable > > complexity to the consumer since the checkpointing can become pretty > > complicated. There are other nuances as well - we can probably do a > > short talk on this at an upcoming meetup. > > > > Joel > > > > > > On Mon, Oct 5, 2015 at 9:31 PM, Rahul Jain <rahul...@gmail.com> wrote: > > > In addition to the config changes mentioned in that post, you may also > > have > > > to change producer config if you are using the new producer. > > > > > > Specifically, *max.request.size* and *request.timeout.ms > > > <http://request.timeout.ms>* have to be increased to allow the > producer > > to > > > send large messages. > > > > > > > > > On 6 Oct 2015 02:02, "James Cheng" <jch...@tivo.com> wrote: > > > > > >> Here’s an article that Gwen wrote earlier this year on handling large > > >> messages in Kafka. > > >> > > >> http://ingest.tips/2015/01/21/handling-large-messages-kafka/ > > >> > > >> -James > > >> > > >> > On Oct 5, 2015, at 11:20 AM, Pradeep Gollakota < > pradeep...@gmail.com> > > >> wrote: > > >> > > > >> > Fellow Kafkaers, > > >> > > > >> > We have a pretty heavyweight legacy event logging system for batch > > >> > processing. We're now sending the events into Kafka now for realtime > > >> > analytics. But we have some pretty large messages (> 40 MB). > > >> > > > >> > I'm wondering if any of you have use cases where you have to send > > large > > >> > messages to Kafka and how you're dealing with them. > > >> > > > >> > Thanks, > > >> > Pradeep > > >> > > >> > > >> > > >> > > >> This email and any attachments may contain confidential and privileged > > >> material for the sole use of the intended recipient. Any review, > > copying, > > >> or distribution of this email (or any attachments) by others is > > prohibited. > > >> If you are not the intended recipient, please contact the sender > > >> immediately and permanently delete this email and any attachments. No > > >> employee or agent of TiVo Inc. is authorized to conclude any binding > > >> agreement on behalf of TiVo Inc. by email. Binding agreements with > TiVo > > >> Inc. may only be made by a signed written agreement. > > >> > > >
Dealing with large messages
Fellow Kafkaers, We have a pretty heavyweight legacy event logging system for batch processing. We're now sending the events into Kafka now for realtime analytics. But we have some pretty large messages (> 40 MB). I'm wondering if any of you have use cases where you have to send large messages to Kafka and how you're dealing with them. Thanks, Pradeep
Re: number of topics given many consumers and groups within the data
To add a little more context to Shaun's question, we have around 400 customers. Each customer has a stream of events. Some customers generate a lot of data while others don't. We need to ensure that each customer's data is sorted globally by timestamp. We have two use cases around consumption: 1. A user may consume an individual customers data 2. A user may consume data for all customers Given these two use cases, I think the better strategy is to have a separate topic per customer as Todd suggested. On Wed, Sep 30, 2015 at 9:26 AM, Todd Palinowrote: > So I disagree with the idea to use custom partitioning, depending on your > requirements. Having a consumer consume from a single partition is not > (currently) that easy. If you don't care which consumer gets which > partition (group), then it's not that bad. You have 20 partitions, you have > 20 consumers, and you use custom partitioning as noted. The consumers use > the high level consumer with a single group, each one will get one > partition each, and it's pretty straightforward. If a consumer crashes, you > will end up with two partitions on one of the remaining consumers. If this > is OK, this is a decent solution. > > If, however, you require that each consumer always have the same group of > data, and you need to know what that group is beforehand, it's more > difficult. You need to use the simple consumer to do it, which means you > need to implement a lot of logic for error and status code handling > yourself, and do it right. In this case, I think your idea of using 400 > separate topics is sound. This way you can still use the high level > consumer, which takes care of the error handling for you, and your data is > separated out by topic. > > Provided it is not an issue to implement it in your producer, I would go > with the separate topics. Alternately, if you're not sure you always want > separate topics, you could go with something similar to your second idea, > but have a consumer read the single topic and split the data out into 400 > separate topics in Kafka (no need for Cassandra or Redis or anything else). > Then your real consumers can all consume their separate topics. Reading and > writing the data one extra time is much better than rereading all of it 400 > times and throwing most of it away. > > -Todd > > > On Wed, Sep 30, 2015 at 9:06 AM, Ben Stopford wrote: > > > Hi Shaun > > > > You might consider using a custom partition assignment strategy to push > > your different “groups" to different partitions. This would allow you > walk > > the middle ground between "all consumers consume everything” and “one > topic > > per consumer” as you vary the number of partitions in the topic, albeit > at > > the cost of a little extra complexity. > > > > Also, not sure if you’ve seen it but there is quite a good section in the > > FAQ here < > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowmanytopicscanIhave > ?> > > on topic and partition sizing. > > > > B > > > > > On 29 Sep 2015, at 18:48, Shaun Senecal > > wrote: > > > > > > Hi > > > > > > > > > I heave read Jay Kreps post regarding the number of topics that can be > > handled by a broker ( > > https://www.quora.com/How-many-topics-can-be-created-in-Apache-Kafka), > > and it has left me with more questions that I dont see answered anywhere > > else. > > > > > > > > > We have a data stream which will be consumed by many consumers (~400). > > We also have many "groups" within our data. A group in the data > > corresponds 1:1 with what the consumers would consume, so consumer A only > > ever see group A messages, consumer B only consumes group B messages, > etc. > > > > > > > > > The downstream consumers will be consuming via a websocket API, so the > > API server will be the thing consuming from kafka. > > > > > > > > > If I use a single topic with, say, 20 partitions, the consumers in the > > API server would need to re-read the same messages over and over for each > > consumer, which seems like a waste of network and a potential bottleneck. > > > > > > > > > Alternatively, I could use a single topic with 20 partitions and have a > > single consumer in the API put the messages into cassandra/redis (as > > suggested by Jay), and serve out the downstream consumer streams that > way. > > However, that requires using a secondary sorted storage, which seems > like a > > waste (and added complexity) given that Kafka already has the data > exactly > > as I need it. Especially if cassandra/redis are required to maintain a > > long TTL on the stream. > > > > > > > > > Finally, I could use 1 topic per group, each with a single partition. > > This would result in 400 topics on the broker, but would allow the API > > server to simply serve the stream for each consumer directly from kafka > and > > wont require additional machinery to serve out the requests. > > > > > > > > > The 400 topic solution makes the
Re: integrate Camus and Hive?
If I understood your question correctly, you want to be able to read the output of Camus in Hive and be able to know partition values. If my understanding is right, you can do so by using the following. Hive provides the ability to provide custom patterns for partitions. You can use this in combination with MSCK REPAIR TABLE to automatically detect and load the partitions into the metastore. Take a look at this SO http://stackoverflow.com/questions/24289571/hive-0-13-external-table-dynamic-partitioning-custom-pattern Does that help? On Mon, Mar 9, 2015 at 1:42 PM, Yang tedd...@gmail.com wrote: I believe many users like us would export the output from camus as a hive external table. but the dir structure of camus is like //MM/DD/xx while hive generally expects /year=/month=MM/day=DD/xx if you define that table to be partitioned by (year, month, day). otherwise you'd have to add those partitions created by camus through a separate command. but in the latter case, would a camus job create 1 partitions ? how would we find out the /MM/DD values from outside ? well you could always do something by hadoop dfs -ls and then grep the output, but it's kind of not clean thanks yang
Re: [kafka-clients] Re: [VOTE] 0.8.2.0 Candidate 3
Lithium Technologies would love to host you guys for a release party in SF if you guys want. :) On Tue, Feb 3, 2015 at 11:04 AM, Gwen Shapira gshap...@cloudera.com wrote: When's the party? :) On Mon, Feb 2, 2015 at 8:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Yay! -Jay On Mon, Feb 2, 2015 at 2:23 PM, Neha Narkhede n...@confluent.io wrote: Great! Thanks Jun for helping with the release and everyone involved for your contributions. On Mon, Feb 2, 2015 at 1:32 PM, Joe Stein joe.st...@stealth.ly wrote: Huzzah! Thanks Jun for preparing the release candidates and getting this out to the community. - Joe Stein On Mon, Feb 2, 2015 at 2:27 PM, Jun Rao j...@confluent.io wrote: The following are the results of the votes. +1 binding = 3 votes +1 non-binding = 1 votes -1 = 0 votes 0 = 0 votes The vote passes. I will release artifacts to maven central, update the dist svn and download site. Will send out an announce after that. Thanks everyone that contributed to the work in 0.8.2.0! Jun On Wed, Jan 28, 2015 at 9:22 PM, Jun Rao j...@confluent.io wrote: This is the third candidate for release of Apache Kafka 0.8.2.0. Release Notes for the 0.8.2.0 release https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/RELEASE_NOTES.html *** Please download, test and vote by Saturday, Jan 31, 11:30pm PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS in addition to the md5, sha1 and sha2 (SHA256) checksum. * Release artifacts to be voted upon (source and binary): https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/ * Maven artifacts to be voted upon prior to release: https://repository.apache.org/content/groups/staging/ * scala-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/scaladoc/ * java-doc https://people.apache.org/~junrao/kafka-0.8.2.0-candidate3/javadoc/ * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=223ac42a7a2a0dab378cc411f4938a9cea1eb7ea (commit 7130da90a9ee9e6fb4beb2a2a6ab05c06c9bfac4) /*** Thanks, Jun -- You received this message because you are subscribed to the Google Groups kafka-clients group. To unsubscribe from this group and stop receiving emails from it, send an email to kafka-clients+unsubscr...@googlegroups.com. To post to this group, send email to kafka-clie...@googlegroups.com . Visit this group at http://groups.google.com/group/kafka-clients. To view this discussion on the web visit https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com https://groups.google.com/d/msgid/kafka-clients/CAFc58G-XRpw9ik35%2BCmsYm-uc2hjHet6fOpw4bF90ka9Z%3DhH%3Dw%40mail.gmail.com?utm_medium=emailutm_source=footer . For more options, visit https://groups.google.com/d/optout. -- Thanks, Neha -- You received this message because you are subscribed to the Google Groups kafka-clients group. To unsubscribe from this group and stop receiving emails from it, send an email to kafka-clients+unsubscr...@googlegroups.com. To post to this group, send email to kafka-clie...@googlegroups.com. Visit this group at http://groups.google.com/group/kafka-clients. To view this discussion on the web visit https://groups.google.com/d/msgid/kafka-clients/CAOeJiJjkYXyK_3qxJYpchG%2B_-c1Jt6K_skT_1geP%3DEJXV5w9uQ%40mail.gmail.com . For more options, visit https://groups.google.com/d/optout.
Re: New Producer - ONLY sync mode?
This is a great question Otis. Like Gwen said, you can accomplish Sync mode by setting the batch size to 1. But this does highlight a shortcoming of the new producer API. I really like the design of the new API and it has really great properties and I'm enjoying working with it. However, once API that I think we're lacking is a batch API. Currently, I have to iterate over a batch and call .send() on each record, which returns n callbacks instead of 1 callback for the whole batch. This significantly complicates recovery logic where we need to commit a batch as opposed 1 record at a time. Do you guys have any plans to add better semantics around batches? On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com wrote: If I understood the code and Jay correctly - if you wait for the future it will be a similar delay to that of the old sync producer. Put another way, if you test it out and see longer delays than the sync producer had, we need to find out why and fix it. Gwen On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Nope, unfortunately it can't do that. X is a remote app, doesn't listen to anything external, calls Y via HTTPS. So X has to decide what to do with its data based on Y's synchronous response. It has to block until Y responds. And it wouldn't be pretty, I think, because nobody wants to run apps that talk to remove servers and hang on to connections more than they have to. But perhaps that is the only way? Or maybe the answer to I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? is YES, in which case the connection from X to Y would be open for just as long as with a SYNC producer running in Y? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com wrote: Can Y have a callback that will handle the notification to X? In this case, perhaps Y can be async and X can buffer the data until the callback triggers and says all good (or resend if the callback indicates an error) On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Thanks for the info. Here's the use case. We have something up stream sending data, say a log shipper called X. It sends it to some remote component Y. Y is the Kafka Producer and it puts data into Kafka. But Y needs to send a reply to X and tell it whether it successfully put all its data into Kafka. If it did not, Y wants to tell X to buffer data locally and resend it later. If producer is ONLY async, Y can't easily do that. Or maybe Y would just need to wait for the Future to come back and only then send the response back to X? If so, I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah as Gwen says there is no sync/async mode anymore. There is a new configuration which does a lot of what async did in terms of allowing batching: batch.size - This is the target amount of data per partition the server will attempt to batch together. linger.ms - This is the time the producer will wait for more data to be sent to better batch up writes. The default is 0 (send immediately). So if you set this to 50 ms the client will send immediately if it has already filled up its batch, otherwise it will wait to accumulate the number of bytes given by batch.size. To send asynchronously you do producer.send(record) whereas to block on a response you do producer.send(record).get(); which will wait for acknowledgement from the server. One advantage of this model is that the client will do it's best to batch under the covers even if linger.ms=0. It will do this by batching any data that arrives while another send is in progress into a single request--giving a kind of group commit effect. The hope is that this will be both simpler to understand (a single api that always works the same) and more powerful (you always get a response with error and offset information whether or not you choose to use it). -Jay On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira gshap...@cloudera.com wrote: If you want to emulate the old sync producer behavior, you need to set the batch size to 1 (in producer config) and wait on the future you get from Send (i.e. future.get) I can't think of good reasons to do so, though. Gwen On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
Re: New Producer - ONLY sync mode?
I looked at the newly added batch API to Kinesis for inspiration. The response on the batch put is a list of message-ids and their status (offset if success else a failure code). Ideally, I think the server should fail the entire batch or succeed the entire batch (i.e. no duplicates), but this is pretty hard to implement. Given that, what Kinesis did is probably good compromise (perhaps while we wait for exactly once semantics :)) In addition, perhaps adding a flush() method to the producer to allow for control over when flushes happen might be another good starting point. With the addition of a flush, it's easier to implement a SyncProducer by doing something like, flush() - n x send() - flush(). This doesn't guarantee that a particular batch isn't broken into two, but with sane batch sizes and sane record sizes, we can assume the guarantee. On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira gshap...@cloudera.com wrote: I've been thinking about that too, since both Flume and Sqoop rely on send(List) API of the old API. I'd like to see this API come back, but I'm debating how we'd handle errors. IIRC, the old API would fail an entire batch on a single error, which can lead to duplicates. Having N callbacks lets me retry / save / whatever just the messages that had issues. If messages had identifiers from the producer side, we could have the API call the callback with a list of message-ids and their status. But they don't :) Any thoughts on how you'd like it to work? Gwen On Mon, Feb 2, 2015 at 1:38 PM, Pradeep Gollakota pradeep...@gmail.com wrote: This is a great question Otis. Like Gwen said, you can accomplish Sync mode by setting the batch size to 1. But this does highlight a shortcoming of the new producer API. I really like the design of the new API and it has really great properties and I'm enjoying working with it. However, once API that I think we're lacking is a batch API. Currently, I have to iterate over a batch and call .send() on each record, which returns n callbacks instead of 1 callback for the whole batch. This significantly complicates recovery logic where we need to commit a batch as opposed 1 record at a time. Do you guys have any plans to add better semantics around batches? On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com wrote: If I understood the code and Jay correctly - if you wait for the future it will be a similar delay to that of the old sync producer. Put another way, if you test it out and see longer delays than the sync producer had, we need to find out why and fix it. Gwen On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Nope, unfortunately it can't do that. X is a remote app, doesn't listen to anything external, calls Y via HTTPS. So X has to decide what to do with its data based on Y's synchronous response. It has to block until Y responds. And it wouldn't be pretty, I think, because nobody wants to run apps that talk to remove servers and hang on to connections more than they have to. But perhaps that is the only way? Or maybe the answer to I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? is YES, in which case the connection from X to Y would be open for just as long as with a SYNC producer running in Y? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com wrote: Can Y have a callback that will handle the notification to X? In this case, perhaps Y can be async and X can buffer the data until the callback triggers and says all good (or resend if the callback indicates an error) On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: Hi, Thanks for the info. Here's the use case. We have something up stream sending data, say a log shipper called X. It sends it to some remote component Y. Y is the Kafka Producer and it puts data into Kafka. But Y needs to send a reply to X and tell it whether it successfully put all its data into Kafka. If it did not, Y wants to tell X to buffer data locally and resend it later. If producer is ONLY async, Y can't easily do that. Or maybe Y would just need to wait for the Future to come back and only then send the response back to X? If so, I'm guessing the delay would be more or less the same as if the Producer was using SYNC mode? Thanks, Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah as Gwen says
Re: Kafka ETL Camus Question
Hi Bhavesh, At Lithium, we don't run Camus in our pipelines yet, though we plan to. But I just wanted to comment regarding speculative execution. We have it disabled at the cluster level and typically don't need it for most of our jobs. Especially with something like Camus, I don't see any need to run parallel copies of the same task. On Mon, Feb 2, 2015 at 10:36 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Jun, Thanks for info. I did not get answer to my question there so I thought I try my luck here :) Thanks, Bhavesh On Mon, Feb 2, 2015 at 9:46 PM, Jun Rao j...@confluent.io wrote: You can probably ask the Camus mailing list. Thanks, Jun On Thu, Jan 29, 2015 at 1:59 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com wrote: Hi Kafka Team or Linked-In Team, I would like to know if you guys run Camus ETL job with speculative execution true or false. Does it make sense to set this to false ? Having true, it creates additional load on brokers for each map task (create a map task to pull same partition twice). Is there any advantage to this having it on vs off ? mapred.map.tasks.speculative.execution Thanks, Bhavesh
Re: Max. storage for Kafka and impact
@Joe, Achanta is using Indian English numerals which is why it's a little confusing. http://en.wikipedia.org/wiki/Indian_English#Numbering_system 1,00,000 [1 lakh] (Indian English) == 100,000 [1 hundred thousand] (The rest of the world :P) On Fri Dec 19 2014 at 9:40:29 AM Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: Joe, - Correction, it's 1,00,000 partitions - We can have at max only 1 consumer/partition. Not 50 per 1 partition. Yes, we have a hashing mechanism to support future partition increase as well. We override the Default Partitioner. - We use both Simple and HighLevel consumers depending on the consumption use-case. - I clearly mentioned that 200 TB/week and not a day. - We have separate producers and consumers, each operating as different processes in different machines. I was explaining why we may end up with so many partitions. I think the question about 200 TB/day got deviated. Any suggestions reg. the performance impact of the 200TB/week? On Fri, Dec 19, 2014 at 10:53 PM, Joe Stein joe.st...@stealth.ly wrote: Wait, how do you get 2,000 topics each with 50 partitions == 1,000,000 partitions? I think you can take what I said below and change my 250 to 25 as I went with your result (1,000,000) and not your arguments (2,000 x 50). And you should think on the processing as a separate step from fetch and commit your offset in batch post processing. Then you only need more partitions to fetch batches to process in parallel. Regards, Joestein On Fri, Dec 19, 2014 at 12:01 PM, Joe Stein joe.st...@stealth.ly wrote: see some comments inline On Fri, Dec 19, 2014 at 11:30 AM, Achanta Vamsi Subhash achanta.va...@flipkart.com wrote: We require: - many topics - ordering of messages for every topic Ordering is only on a per partition basis so you might have to pick a partition key that makes sense for what you are doing. - Consumers hit different Http EndPoints which may be slow (in a push model). In case of a Pull model, consumers may pull at the rate at which they can process. - We need parallelism to hit with as many consumers. Hence, we currently have around 50 consumers/topic = 50 partitions. I think you might be mixing up the fetch with the processing. You can have 1 partition and still have 50 message being processed in parallel (so a batch of messages). What language are you working in? How are you doing this processing exactly? Currently we have: 2000 topics x 50 = 1,00,000 partitions. If this is really the case then you are going to need at least 250 brokers (~ 4,000 partitions per broker). If you do that then you are in the 200TB per day world which doesn't sound to be the case. I really think you need to strategize more on your processing model some more. The incoming rate of ingestion at max is 100 MB/sec. We are planning for a big cluster with many brokers. It is possible to handle this on just 3 brokers depending on message size, ability to batch, durability are also factors you really need to be thinking about. We have exactly the same use cases as mentioned in this video (usage at LinkedIn): https://www.youtube.com/watch?v=19DvtEC0EbQ To handle the zookeeper scenario, as mentioned in the above video, we are planning to use SSDs and would upgrade to the new consumer (0.9+) once its available as per the below video. https://www.youtube.com/watch?v=7TZiN521FQA On Fri, Dec 19, 2014 at 9:06 PM, Jayesh Thakrar j_thak...@yahoo.com.invalid wrote: Technically/conceptually it is possible to have 200,000 topics, but do you really need it like that?What do you intend to do with those messages - i.e. how do you forsee them being processed downstream? And are those topics really there to segregate different kinds of processing or different ids?E.g. if you were LinkedIn, Facebook or Google, would you have have one topic per user or one topic per kind of event (e.g. login, pageview, adview, etc.)Remember there is significant book-keeping done within Zookeeper - and these many topics will make that book-keeping significant. As for storage, I don't think it should be an issue with sufficient spindles, servers and higher than default memory configuration. Jayesh From: Achanta Vamsi Subhash achanta.va...@flipkart.com To: users@kafka.apache.org users@kafka.apache.org Sent: Friday, December 19, 2014 9:00 AM Subject: Re: Max. storage for Kafka and impact Yes. We need those many max partitions as we have a central messaging service and thousands of topics. On Friday, December 19, 2014, nitin sharma kumarsharma.ni...@gmail.com wrote: hi, Few things you have to plan for: a. Ensure that from
Re: [DISCUSS] Kafka Security Specific Features
I'm actually not convinced that encryption needs to be handled server side in Kafka. I think the best solution for encryption is to handle it producer/consumer side just like compression. This will offload key management to the users and we'll still be able to leverage the sendfile optimization for better performance. On Fri, Jun 6, 2014 at 10:48 AM, Rob Withers robert.w.with...@gmail.com wrote: On consideration, if we have 3 different access groups (1 for production WRITE and 2 consumers) they all need to decode the same encryption and so all need the same public/private keycerts won't work, unless you write a CertAuthority to build multiple certs with the same keys. Better seems to not use certs and wrap the encryption specification with an ACL capabilities for each group of access. On Jun 6, 2014, at 11:43 AM, Rob Withers wrote: This is quite interesting to me and it is an excelent opportunity to promote a slightly different security scheme. Object-capabilities are perfect for online security and would use ACL style authentication to gain capabilities filtered to those allowed resources for allow actions (READ/WRITE/DELETE/LIST/SCAN). Erights.org has the quitenscential (??) object capabilities model and capnproto is impleemting this for C++. I have a java implementation at http://github.com/pauwau/pauwau but the master is broken. 0.2 works, basically. B asically a TLS connection with no certificate server, it is peer to peer. It has some advanced features, but the lining of capabilities with authorization so that you can only invoke correct services is extended to the secure user. Regarding non-repudiation, on disk, why not prepend a CRC? Regarding on-disk encryption, multiple users/groups may need to access, with different capabilities. Sounds like zookeeper needs to store a cert for each class of access so that a group member can access the decrypted data from disk. Use cert-based async decryption. The only isue is storing the private key in zookeeper. Perhaps some hash magic could be used. Thanks for kafka, Rob On Jun 5, 2014, at 3:01 PM, Jay Kreps wrote: Hey Joe, I don't really understand the sections you added to the wiki. Can you clarify them? Is non-repudiation what SASL would call integrity checks? If so don't SSL and and many of the SASL schemes already support this as well as on-the-wire encryption? Or are you proposing an on-disk encryption scheme? Is this actually needed? Isn't a on-the-wire encryption when combined with mutual authentication and permissions sufficient for most uses? On-disk encryption seems unnecessary because if an attacker can get root on the kafka boxes it can potentially modify Kafka to do anything he or she wants with data. So this seems to break any security model. I understand the problem of a large organization not really having a trusted network and wanting to secure data transfer and limit and audit data access. The uses for these other things I don't totally understand. Also it would be worth understanding the state of other messaging and storage systems (Hadoop, dbs, etc). What features do they support. I think there is a sense in which you don't have to run faster than the bear, but only faster then your friends. :-) -Jay On Wed, Jun 4, 2014 at 5:57 PM, Joe Stein joe.st...@stealth.ly wrote: I like the idea of working on the spec and prioritizing. I will update the wiki. - Joestein On Wed, Jun 4, 2014 at 1:11 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Joe, Thanks for kicking this discussion off! I totally agree that for something that acts as a central message broker security is critical feature. I think a number of people have been interested in this topic and several people have put effort into special purpose security efforts. Since most the LinkedIn folks are working on the consumer right now I think this would be a great project for any other interested people to take on. There are some challenges in doing these things distributed but it can also be a lot of fun. I think a good first step would be to get a written plan we can all agree on for how things should work. Then we can break things down into chunks that can be done independently while still aiming at a good end state. I had tried to write up some notes that summarized at least the thoughts I had had on security: https://cwiki.apache.org/confluence/display/KAFKA/Security What do you think of that? One assumption I had (which may be incorrect) is that although we want all the things in your list, the two most pressing would be authentication and authorization, and that was all that write up covered. You have more experience in this domain, so I wonder how you would prioritize? Those notes are really sketchy, so I think the first goal I would have would be to get to a real spec we can all agree on and discuss. A lot of the security stuff has a
Re: Remote Zookeeper
Is there a firewall thats blocking connections on port 9092? Also, the broker list should be comma separated. On Tue, Mar 11, 2014 at 9:02 AM, A A andthereitg...@hotmail.com wrote: Sorry one of the brokers for was down. Brought it back up. Tried the following $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 192.168.1.124:9092 --topic test hello brokers [2014-03-11 10:16:55,547] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,576] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,578] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2014-03-11 10:16:55,693] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,706] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,706] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2014-03-11 10:16:55,815] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,823] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,823] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2014-03-11 10:16:55,934] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,949] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:55,949] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test (kafka.producer.async.DefaultEventHandler) [2014-03-11 10:16:56,057] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2014-03-11 10:16:56,059] ERROR Failed to send requests for topics test with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) [2014-03-11 10:16:56,061] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread) kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries. at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) at scala.collection.immutable.Stream.foreach(Stream.scala:254) at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) From: andthereitg...@hotmail.com To: users@kafka.apache.org Subject: RE: Remote Zookeeper Date: Tue, 11 Mar 2014 15:59:44 + Okay thanks. Just to verify my setup I tried the following on broker1 (by publishing to localhost)
Re: New Consumer API discussion
Hi Neha, 6. It seems like #4 can be avoided by using MapTopicPartition, Long or MapTopicPartition, TopicPartitionOffset as the argument type. How? lastCommittedOffsets() is independent of positions(). I'm not sure I understood your suggestion. I think of subscription as you're subscribing to a Set of TopicPartitions. Because the argument to positions() is TopicPartitionOffset ... it's conceivable that the method can be called with two offsets for the same TopicPartition. One way to handle this, is to accept either the first or the last offset for a TopicPartition. However, if the argument type is changed to MapTopicPartition, Long it precludes the possibility of getting duplicate offsets of the same TopicPartition. 7. To address #3, maybe we can return ListTopicPartitionOffset that are invalid. I don't particularly see the advantage of returning a list of invalid partitions from position(). It seems a bit awkward to return a list to indicate what is obviously a bug. Prefer throwing an error since the user should just fix that logic. I'm not sure if an Exception is needed or desirable here. I don't see this as a catastrophic failure or a non-recoverable failure. Even if we just write the bad offsets to a log file and call it a day, I'm ok with that. But my main goal is to communicate to the API users somehow that they've provided bad offests which are simply being ignored. Hi Jay, I would also like to shorten the name TopicOffsetPosition. Offset and Position are duplicative of each other. So perhaps we could call it a PartitionOffset or a TopicPosition or something like that. In general class names that are just a concatenation of the fields (e.g. TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't really describe it just enumerates. But that is more of a nit pick. 1. Did you mean to say TopicPartitionOffset instead of TopicOffsetPosition? 2. +1 on PartitionOffset The lastCommittedPosition is particular bothersome because: 1. The name is weird and long 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). This is sort of what I was talking about in my previous email. My suggestion was to change the return type to MapTopicPartition, Long. What if we made it: long position(TopicPartition tp) void seek(TopicOffsetPosition p) long committed(TopicPartition tp) void commit(TopicOffsetPosition...); 1. Absolutely love the idea of position(TopicPartition tp). 2. I think we also need to provide a method for accessing all positions positions() which maybe returns a MapTopicPartition, Long? 3. What is the difference between position(TopicPartition tp) and committed(TopicPartition tp)? 4. +1 on commit(PartitionOffset...) 5. +1 on seek(PartitionOffset p) 6. We should also provide a seek(PartitionOffset... offsets) Finally, in all the methods where we're using varargs, we should use an appropriate Collection data structure. For example, for the subscribe(TopicPartition... partitions) method, I think a more accurate API would be subscribe(SetTopicPartition partitions). This allows for the code to be self-documenting.
Re: New Consumer API discussion
Hi Jay, I apologize for derailing the conversation about the consumer API. We should start a new discussion about hierarchical topics, if we want to keep talking about it. My final thought on the matter is that, hierarchical topics is still an important feature to have in Kafka, because it gives us flexibility to do namespace level access controls. Getting back to the topic of the Consumer API: 1. Any thoughts on consistency for method arguments and return types? 2. lastCommittedOffsets() method returns a ListTopicPartitionOffsetwhere as the confluence page suggested a MapTopicPartition, Long. I would think that a Map is the more appropriate return type. On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Pradeep, That wiki is fairly old and it predated more flexible subscription mechanisms. In the high-level consumer you currently have wildcard subscription and in the new proposed interface you can actually subscribe based on any logic you want to create a union of streams. Personally I think this gives you everything you would want with a hierarchy and more actual flexibility (since you can define groupings however you want). What do you think? -Jay On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota pradeep...@gmail.com wrote: WRT to hierarchical topics, I'm referring to KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175. I would just like to think through the implications for the Consumer API if and when we do implement hierarchical topics. For example, in the proposal https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics# written by Jay, he says that initially wildcard subscriptions are not going to be supported. But does that mean that they will be supported in v2? If that's the case, that would change the semantics of the Consumer API. As to having classes for Topic, PartitionId, etc. it looks like I was referring to the TopicPartition and TopicPartitionOffset classes (I didn't realize these were already there). I was only looking at the confluence page which shows List[(String, Int, Long)] instead of List[TopicParitionOffset] (as is shown in the javadoc). However, I did notice that we're not being consistent in the Java version. E.g. we have commit(TopicPartitionOffset... offsets) and lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the other hand we have subscribe(String topic, int... partitions). I agree that creating a class for TopicId today would probably not make too much sense today. But with hierarchical topics, I may change my mind. This is exactly what was done in the HBase API in 0.96 when namespaces were added. 0.96 HBase API introduced a class called 'TableName' to represent the namespace and table name. On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Thanks for the feedback. Mattijs - - Constructors link to http://kafka.apache.org/documentation.html#consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. Fixed it to just point to ConsumerConfig for now until we finalize the new configs - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. Missed changing the examples to use consumer.commit(true, offsets). The suggestions by Jay would change it to commit(offsets) and commitAsync(offsets), which will hopefully make it easier to understand those commit APIs. - Love the addition of MockConsumer, awesome for unittesting :) I'm not quite satisfied with what it does as of right now, but we will surely improve it as we start writing the consumer. Jay - 1. ConsumerRebalanceCallback a. Makes sense. Renamed to onPartitionsRevoked b. Ya, it will be good to make it forward compatible with Java 8 capabilities. We can change it to PartitionsAssignedCallback and PartitionsRevokedCallback or RebalanceBeginCallback and RebalanceEndCallback? c. Ya, I thought about that but then didn't name it just RebalanceCallback since there could be a conflict with a controller side rebalance callback if/when we have one. However, you can argue that at that time we can name it ControllerRebalanceCallback instead of polluting a user facing API. So agree with you here. 2. Ya, that is a good idea. Changed to subscribe(String topic, int...partitions). 3. lastCommittedOffset() is not necessarily a local access since the consumer can potentially ask for the last committed offsets of partitions that the consumer does not consume and maintain the offsets for. That's the reason it is batched right now. 4. Yes, look at http://people.apache.org
Re: Building a producer/consumer supporting exactly-once messaging
Have you read this part of the documentation? http://kafka.apache.org/documentation.html#semantics Just wondering if that solves your use case. On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington g.turking...@improvedigital.com wrote: Hi, I've been doing some prototyping on Kafka for a few months now and like what I see. It's a good fit for some of my use cases in the areas of data distribution but also for processing - liking a lot of what I see in Samza. I'm now working through some of the operational issues and have a question to the community. I have several data sources that I want to push into Kafka but some of the most important are arriving as a stream of files being dropped either into a SFTP location or S3. Conceptually the data is really a stream but its being chunked and made more batch by the deployment model of the operational servers. So pulling the data into Kafka and seeing it more as a stream again is a big plus. But, I really don't want duplicate messages. I know Kafka provides at least once semantics and that's fine, I'm happy to have the de-dupe logic external to Kafka. And if I look at my producer I can build up a protocol around adding record metadata and using Zookeeper to give me pretty high confidence that my clients will know if they are reading from a file that was fully published into Kafka or not. I had assumed that this wouldn't be a unique use case but on doing a bunch of searches I really don't find much in terms of either tools that help or even just best practice patterns for handling this type of need to support exactly-once message processing. So now I'm thinking that either I just need better web search skills or that actually this isn't something many others are doing and if so then there's likely a reason for that. Any thoughts? Thanks Garry
Re: New Consumer API discussion
Couple of very quick thoughts. 1. +1 about renaming commit(...) and commitAsync(...) 2. I'd also like to extend the above for the poll() method as well. poll() and pollWithTimeout(long, TimeUnit)? 3. Have you guys given any thought around how this API would be used with hierarchical topics? 4. Would it make sense to add classes such as TopicId, PartitionId, etc? Seems like it would be easier to read code with these classes as opposed to string and longs. - Pradeep On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically popped up on the mailing list. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html , the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs here http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The
Re: Config for new clients (and server)
+1 Jun. On Mon, Feb 10, 2014 at 2:17 PM, Sriram Subramanian srsubraman...@linkedin.com wrote: +1 on Jun's suggestion. On 2/10/14 2:01 PM, Jun Rao jun...@gmail.com wrote: I actually prefer to see those at INFO level. The reason is that the config system in an application can be complex. Some configs can be overridden in different layers and it may not be easy to determine what the final binding value is. The logging in Kafka will serve as the source of truth. For reference, ZK client logs all overridden values during initialization. It's a one time thing during starting up, so shouldn't add much noise. It's very useful for debugging subtle config issues. Exposing final configs programmatically is potentially useful. If we don't want to log overridden values out of box, an app can achieve the same thing using the programming api. The only missing thing is that we won't know those unused property keys, which is probably less important than seeing the overridden values. Thanks, Jun On Mon, Feb 10, 2014 at 10:15 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Jun, I think that is reasonable but would object to having it be debug logging? I think logging out a bunch of noise during normal operation in a client library is pretty ugly. Also, is there value in exposing the final configs programmatically? -Jay On Sun, Feb 9, 2014 at 9:23 PM, Jun Rao jun...@gmail.com wrote: +1 on the new config. Just one comment. Currently, when initiating a config (e.g. ProducerConfig), we log those overridden property values and unused property keys (likely due to mis-spelling). This has been very useful for config verification. It would be good to add similar support in the new config. Thanks, Jun On Tue, Feb 4, 2014 at 9:34 AM, Jay Kreps jay.kr...@gmail.com wrote: We touched on this a bit in previous discussions, but I wanted to draw out the approach to config specifically as an item of discussion. The new producer and consumer use a similar key-value config approach as the existing scala clients but have different implementation code to help define these configs. The plan is to use the same approach on the server, once the new clients are complete; so if we agree on this approach it will be the new default across the board. Let me split this into two parts. First I will try to motivate the use of key-value pairs as a configuration api. Then let me discuss the mechanics of specifying and parsing these. If we agree on the public api then the public api then the implementation details are interesting as this will be shared across producer, consumer, and broker and potentially some tools; but if we disagree about the api then there is no point in discussing the implementation. Let me explain the rationale for this. In a sense a key-value map of configs is the worst possible API to the programmer using the clients. Let me contrast the pros and cons versus a POJO and motivate why I think it is still superior overall. Pro: An application can externalize the configuration of its kafka clients into its own configuration. Whatever config management system the client application is using will likely support key-value pairs, so the client should be able to directly pull whatever configurations are present and use them in its client. This means that any configuration the client supports can be added to any application at runtime. With the pojo approach the client application has to expose each pojo getter as some config parameter. The result of many applications doing this is that the config is different for each and it is very hard to have a standard client config shared across. Moving config into config files allows the usual tooling (version control, review, audit, config deployments separate from code pushes, etc.). Pro: Backwards and forwards compatibility. Provided we stick to our java api many internals can evolve and expose new configs. The application can support both the new and old client by just specifying a config that will be unused in the older version (and of course the reverse--we can remove obsolete configs). Pro: We can use a similar mechanism for both the client and the server. Since most people run the server as a stand-alone process it needs a config file. Pro: Systems like Samza that need to ship configs across the network can easily do so as configs have a natural serialized form. This can be done with pojos using java serialization but it is ugly and has bizare failure cases. Con: The IDE gives nice auto-completion for pojos. Con: There are some advantages to javadoc as a documentation mechanism for java people.
Re: New Consumer API discussion
. Pradeep - 2. Changed to poll(long, TimeUnit) and a negative value for the timeout would block in the poll forever until there is new data 3. We don't have hierarchical topics support. Would you mind explaining what you meant? 4. I'm not so sure that we need a class to express a topic which is a string and a separate class for just partition id. We do have a class for TopicPartition which uniquely identifies a partition of a topic Thanks, Neha On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota pradeep...@gmail.com wrote: Couple of very quick thoughts. 1. +1 about renaming commit(...) and commitAsync(...) 2. I'd also like to extend the above for the poll() method as well. poll() and pollWithTimeout(long, TimeUnit)? 3. Have you guys given any thought around how this API would be used with hierarchical topics? 4. Would it make sense to add classes such as TopicId, PartitionId, etc? Seems like it would be easier to read code with these classes as opposed to string and longs. - Pradeep On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically popped up on the mailing list. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html , the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs here http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single