[jira] [Commented] (KAFKA-246) log configuration values used
[ https://issues.apache.org/jira/browse/KAFKA-246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13252909#comment-13252909 ] Jay Kreps commented on KAFKA-246: - Thanks for the patch! I wonder if instead of adding a logging statement for each value which is a little repetitive we could instead do the following: Create a Props object that wraps Properties and has helpers like getInt, getString, etc. Something like this: https://github.com/voldemort/voldemort/blob/master/src/java/voldemort/utils/Props.java Then add something internal to this helper that records whether a given property is used or not (i.e. a property is used if the get method is called for that key). This class could be used to log out all configuration in a single place and also could log WARN messages for any properties that are not used (since that is likely a typo). log configuration values used - Key: KAFKA-246 URL: https://issues.apache.org/jira/browse/KAFKA-246 Project: Kafka Issue Type: Improvement Affects Versions: 0.8 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-246-broker-0.8.patch Currently, it's hard to figure out which configuration value is being used and whether a new configuration is being picked up. Logging all configuration values during startup time can address this issue. We should cover broker, producer and consumer. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13250878#comment-13250878 ] Jay Kreps commented on KAFKA-48: Good point Jun, now it is if(w == null) Seq.empty else w.collectSatisfiedRequests(request) I will wait for more feedback before making a new patch since this is a pretty trivial change. Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Jay Kreps Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13250982#comment-13250982 ] Jay Kreps commented on KAFKA-48: Hey Neha, yes, my hope is to get the patch evaluated as is, and then take another pass at cleaning up the way we handle the satisfaction action as Jun and you requested and try out other approaches to the purgatory data structure asynchronously. That should take these cleanup/polishing items out of the critical path. I like your idea of the dual priority queues, but I need to work through it more to fully understand it. Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Jun Rao Assignee: Jay Kreps Attachments: KAFKA-48-v2.patch, KAFKA-48-v3.patch, KAFKA-48-v4.patch, KAFKA-48.patch, kafka-48-v3-to-v4-changes.diff Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-266) Kafka web console design
[ https://issues.apache.org/jira/browse/KAFKA-266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13229629#comment-13229629 ] Jay Kreps commented on KAFKA-266: - My take is that, as Joe says, most serious folks already have some system they use for monitoring across their stack (ganglia or whatever). To really work operationally you can't try to replace this or add a new system. For this reason, I kind of prefer the web app to be an optional stand-alone thing since it may not be of use to everyone, though i think that complicates its design (potentially). I think the advantage of the web app is custom display of very kafka-specific things (the audit, cluster status, etc). Kafka web console design Key: KAFKA-266 URL: https://issues.apache.org/jira/browse/KAFKA-266 Project: Kafka Issue Type: New Feature Components: contrib Reporter: Evan Chan Original Estimate: 672h Remaining Estimate: 672h This issue is created to track a community-contributed Kafka Web UI. Here is an initial list of goals: - Be able to easily see which brokers are up - Be able to see lists of topics, connected producers, consumer groups, connected consumers - Be able to see, for each consumer/partition, its offset, and more importantly, # of bytes unconsumed (== largest offset for partition - current offset) - (Wish list) have a graphical view of the offsets - (Wish list) be able to clean up consumer state, such as stale claimed partitions List of challenges/questions: - Which framework? Play! for Scala? - Is all the data available from JMX and ZK? Hopefully, watching the files on the filesystem can be avoided - How to handle large numbers of topics, partitions, consumers, etc. efficiently -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-45) Broker startup, leader election, becoming a leader/follower for intra-cluster replication
[ https://issues.apache.org/jira/browse/KAFKA-45?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13226523#comment-13226523 ] Jay Kreps commented on KAFKA-45: I think we are overthinking this. Currently cleanup is not a precise SLA, it is just a guarantee of the form we will never delete anything younger than X OR we will always maintain at least Y bytes of messages. Trying to maintain this in synchronous form across nodes is overkill I think. It is fine if every node acts independently as long as each of them respects the SLA. I think this should be much simpler and more likely to work. Broker startup, leader election, becoming a leader/follower for intra-cluster replication - Key: KAFKA-45 URL: https://issues.apache.org/jira/browse/KAFKA-45 Project: Kafka Issue Type: Bug Reporter: Jun Rao We need to implement the logic for starting a broker with replicated partitions, the leader election logic and how to become a leader and a follower. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-280) change on-disk log layout to {log.dir}/topicname/partitionid
[ https://issues.apache.org/jira/browse/KAFKA-280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13226529#comment-13226529 ] Jay Kreps commented on KAFKA-280: - FWIW, I think this new proposal is esthetically nicer too. change on-disk log layout to {log.dir}/topicname/partitionid Key: KAFKA-280 URL: https://issues.apache.org/jira/browse/KAFKA-280 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Currently, the on-disk layout is {log.dir}/topicname-partitionid. The problem is that there is no appropriate place to store topicname level information such as topic version. An alternative layout is {log.dir}/topicname/partitionid. Then, we can store topic level meta data under {log.dir}/topicname. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-281) support multiple root log directories
[ https://issues.apache.org/jira/browse/KAFKA-281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13226535#comment-13226535 ] Jay Kreps commented on KAFKA-281: - Is this to work around the max subdirectory limits some filesystems have (e.g. I think ext4 has a limit of 64k subdirectories per directory)? The other advantage of this is that you can actually get rid of RAID and just run with JBOD using a separate mount point for each drive and having a data directory per drive (a la Hadoop). We wouldn't do this now, but if we had replication this would be a big win. The overhead of RAID is usually like a 20-30% perf hit, plus the additional disk space it takes up. In this setup you would be depending on replication for disk failures. The trade-off is that a single drive failure would kill a machine. In practice due to raid resync perf hit we seem to have this problem already. support multiple root log directories - Key: KAFKA-281 URL: https://issues.apache.org/jira/browse/KAFKA-281 Project: Kafka Issue Type: Improvement Components: core Reporter: Jun Rao Currently, the log layout is {log.dir}/topicname-partitionid and one can only specify 1 {log.dir}. This limits the # of topics we can have per broker. We can potentially support multiple directories for {log.dir} and just assign topics using hashing or round-robin. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-266) Kafka web console design
[ https://issues.apache.org/jira/browse/KAFKA-266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13226543#comment-13226543 ] Jay Kreps commented on KAFKA-266: - yeah i will post a patch. Kafka web console design Key: KAFKA-266 URL: https://issues.apache.org/jira/browse/KAFKA-266 Project: Kafka Issue Type: New Feature Components: contrib Reporter: Evan Chan Original Estimate: 672h Remaining Estimate: 672h This issue is created to track a community-contributed Kafka Web UI. Here is an initial list of goals: - Be able to easily see which brokers are up - Be able to see lists of topics, connected producers, consumer groups, connected consumers - Be able to see, for each consumer/partition, its offset, and more importantly, # of bytes unconsumed (== largest offset for partition - current offset) - (Wish list) have a graphical view of the offsets - (Wish list) be able to clean up consumer state, such as stale claimed partitions List of challenges/questions: - Which framework? Play! for Scala? - Is all the data available from JMX and ZK? Hopefully, watching the files on the filesystem can be avoided - How to handle large numbers of topics, partitions, consumers, etc. efficiently -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-293) Allow to configure all broker ids at once similar to how zookeeper handles server ids
[ https://issues.apache.org/jira/browse/KAFKA-293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13226547#comment-13226547 ] Jay Kreps commented on KAFKA-293: - The current approach is, admittedly, hacky; but the zookeeper approach is equally hacky if you ask me. I don't think it is good to put any host information in your server config. If we want to fix this issue I think the right way to go would be to create a zk sequence of ids. Servers would store the broker id in their data directory (maybe we could add a ${data.dir}/meta.properties file to hold this kind of stuff). If a server starts and has no meta.properties file then it would increment the zk id counter and take a new id and store it persistently with its logs. On startup the broker would take its id from this file. The reason this approach is better is because the node.id is not really configuration. That is if you swap the node.ids for two brokers you effectively corrupt the logs in the eyes of consumers. So the node.id is pinned to a particular set of logs. This would make any manual borking of your node id impossible and takes the node id out of the things the user has to configure. Thoughts? Allow to configure all broker ids at once similar to how zookeeper handles server ids - Key: KAFKA-293 URL: https://issues.apache.org/jira/browse/KAFKA-293 Project: Kafka Issue Type: Improvement Affects Versions: 0.7 Reporter: Thomas Dudziak Zookeeper allows to specify all server ids in the same configuration (https://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_configuration) which has the benefit that the configuration file is the same for all zookeeper instances. A similar approach for Kafka would be quite useful, e.g. brokerid.1=host 1 brokerid.2=host 2 etc. It'd still require per-instance configuration (myid file in the zookeeper case) but that can be created separately (e.g. by the deployment tool used). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13202586#comment-13202586 ] Jay Kreps commented on KAFKA-240: - Hey Prashanth, this is great. Question do we need a builder class for FetchRequest versus just using named arguments? It doesn't make a big difference but it would probably be good for us to agree on a pattern and carry it forward for all the various requests. With named arguments presumably it would be something like: new FetchRequest(correlationId = 2345454, versionId = 2, ...) implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8 Reporter: Jun Rao Labels: fetch, replication, wireprotocol Fix For: 0.8 Attachments: KAFKA-240-FetchRequest-v1.patch We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-240) implement new producer and consumer request format
[ https://issues.apache.org/jira/browse/KAFKA-240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13200128#comment-13200128 ] Jay Kreps commented on KAFKA-240: - Cool, I have a draft of long-poll support in KAFKA-48. I am hard coding the values that are in the new fetch request (min_size, max_wait). We should discuss which goes in first since this will change the server side KafkaRequestHandler/KafkaApis code a fair amount. If you are refactoring to remove the old requests I expect that will be the bigger patch and it would be better for me to rebase to you than vice versa. But if you guys are a ways out I can go first too. implement new producer and consumer request format -- Key: KAFKA-240 URL: https://issues.apache.org/jira/browse/KAFKA-240 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao Fix For: 0.8 We want to change the producer/consumer request/response format according to the discussion in the following wiki: https://cwiki.apache.org/confluence/display/KAFKA/New+Wire+Format+Proposal -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13200159#comment-13200159 ] Jay Kreps commented on KAFKA-48: Two other issues with this patch, I forgot to mention: - There is a race condition between checking the available bytes, and adding the watchers for the topics. I *think* this is okay since the min_bytes is a minimum not a maximum, so in the rare case that a produce comes in before the watchers are added we will just wait slightly longer than we should have. I think this is probably better than properly synchronizing and locking out all produces on that partition. - The other issues is that the delay queue is only emptied right now when the delay expires. If the request is fulfilled before the delay expires, the request is marked completed, but it remains in the delay queue until it expires. This is a problem and needs to be fixed. The problem is that if the client sets a low min_bytes and a high max_wait these requests may accumulate. Currently we would have to do an O(N) walk of the waiting requests to fix this. I am going to try to come up with an improved set of data structures to fix this without requiring that. Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Alan Cabrera Assignee: Jay Kreps Attachments: KAFKA-48.patch Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-259) Give better error message when trying to run shell scripts without having built/downloaded the jars yet
[ https://issues.apache.org/jira/browse/KAFKA-259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13197972#comment-13197972 ] Jay Kreps commented on KAFKA-259: - Yeah, I didn't mean that in a snotty way, just that if we version control the jars the java people get all sulky and complain that we aren't using maven to download them, but if we do that then the non-maven people are unhappy because nothing works. Give better error message when trying to run shell scripts without having built/downloaded the jars yet --- Key: KAFKA-259 URL: https://issues.apache.org/jira/browse/KAFKA-259 Project: Kafka Issue Type: Bug Affects Versions: 0.6 Environment: Mac OSX Lion Reporter: Ross Crawford-d'Heureuse Priority: Minor Labels: newbie Hi there, I've cloned from the kafka github repo and tried to run the start server script: ./bin/kafka-server-start.sh config/server.properties Which results in: Exception in thread main java.lang.NoClassDefFoundError: kafka/Kafka Caused by: java.lang.ClassNotFoundException: kafka.Kafka at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) It seems that Im missing a build step? what have I forgotten to do? Thanks in advance and I look forward to using kafka. regards rcdh -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-261) Corrupted request shuts down the broker
[ https://issues.apache.org/jira/browse/KAFKA-261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13198501#comment-13198501 ] Jay Kreps commented on KAFKA-261: - +1 Corrupted request shuts down the broker --- Key: KAFKA-261 URL: https://issues.apache.org/jira/browse/KAFKA-261 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Jun Rao Assignee: Jun Rao Attachments: kafka-261.patch Currently, a corrupted produce request brings down the broker. Instead, we should just log it and let it go. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-259) Exception in thread main java.lang.NoClassDefFoundError: kafka/Kafka Caused by: java.lang.ClassNotFoundException: kafka.Kafka
[ https://issues.apache.org/jira/browse/KAFKA-259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13197344#comment-13197344 ] Jay Kreps commented on KAFKA-259: - This is covered in the README and the releases come with packaged jars. The only thing I think we could do better is error out if there are no jars in dist, let's change this bug to be about doing that. Exception in thread main java.lang.NoClassDefFoundError: kafka/Kafka Caused by: java.lang.ClassNotFoundException: kafka.Kafka --- Key: KAFKA-259 URL: https://issues.apache.org/jira/browse/KAFKA-259 Project: Kafka Issue Type: Bug Affects Versions: 0.6 Environment: Mac OSX Lion Reporter: Ross Crawford-d'Heureuse Priority: Minor Hi there, I've cloned from the kafka github repo and tried to run the start server script: ./bin/kafka-server-start.sh config/server.properties Which results in: Exception in thread main java.lang.NoClassDefFoundError: kafka/Kafka Caused by: java.lang.ClassNotFoundException: kafka.Kafka at java.net.URLClassLoader$1.run(URLClassLoader.java:202) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:190) at java.lang.ClassLoader.loadClass(ClassLoader.java:306) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301) at java.lang.ClassLoader.loadClass(ClassLoader.java:247) It seems that Im missing a build step? what have I forgotten to do? Thanks in advance and I look forward to using kafka. regards rcdh -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-202) Make the request processing in kafka asynchonous
[ https://issues.apache.org/jira/browse/KAFKA-202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13185842#comment-13185842 ] Jay Kreps commented on KAFKA-202: - Hey Neha, 0.8 doesn't build for me and is missing a few files in the patch. Did those get missed in the checkin? [info] == core-kafka / compile == [info] Source analysis: 134 new/modified, 0 indirectly invalidated, 0 removed. [info] Compiling main sources... [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:45: not found: type RequestChannel [error] val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) [error]^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:190: not found: type RequestChannel [error]val requestChannel: RequestChannel, [error]^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/network/SocketServer.scala:301: not found: value RequestChannel [error] val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds) [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:26: RequestChannel is not a member of kafka.network [error] import kafka.network.{SocketServerStats, SocketServer, RequestChannel} [error]^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:40: not found: type KafkaRequestHandlerPool [error] var requestHandlerPool: KafkaRequestHandlerPool = null [error] ^ [error] /Users/jkreps/work/kafka-git/core/src/main/scala/kafka/server/KafkaServer.scala:69: not found: type KafkaRequestHandlerPool [error] requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, new KafkaApis(logManager).handle, config.numIoThreads) [error] ^ [error] 6 errors found [info] == core-kafka / compile == [error] Error running compile: Compilation failed [info] [info] Total time: 21 s, completed Jan 13, 2012 12:46:07 PM [info] [info] Total session time: 24 s, completed Jan 13, 2012 12:46:07 PM [error] Error during build Make the request processing in kafka asynchonous Key: KAFKA-202 URL: https://issues.apache.org/jira/browse/KAFKA-202 Project: Kafka Issue Type: New Feature Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8 Attachments: KAFKA-202-v2.patch, KAFKA-202-v3.patch, KAFKA-202-v4.patch, KAFKA-202-v5.patch, KAFKA-202-v6.patch, KAFKA-48-socket-server-refactor-draft.patch We need to handle long-lived requests to support replication. To make this work we need to make the processing mechanism asynchronous from the network threads. To accomplish this we will retain the existing pool of network threads but add a new pool of request handling threads. These will do all the disk I/O. There will be a queue mechanism to transfer requests to and from this secondary pool. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-238) add a getTopicMetaData method in broker and expose it to producer
[ https://issues.apache.org/jira/browse/KAFKA-238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13180736#comment-13180736 ] Jay Kreps commented on KAFKA-238: - One general recommendation I have is that we think about making this as general as possible. The way we are doing APIs is fairly high overhead, and the general difficulty of doing client upgrades means we really want to have fewer, more general apis. Questions: Shouldn't we also give back the host/port info for replicas? Might there be a case where it is better to read from replicas? Should this be per-topic or should we just get all metadata? Also, should we consider other metadata? For example the other metadata we have on a per-partition basis is node id, total data size, number of segments, beginning offset and date for each segment. This would make it possible to get rid of the getOffsetsBefore api as that would be a special case of this more general metadata api. Related thought: currently we maintain a custom object for each request. This is very good for fetch and produce which need efficiency for the message sets. It is kind of a hassle for this case. An alternative would be to just send a single string field containing the metadata as json, which would make evolution easier for this kind of performance-insensitive api. On the other hand since it is just one api object maybe the cost of a JSON parser jar and non-uniformity with the other apis isn't worth it. add a getTopicMetaData method in broker and expose it to producer -- Key: KAFKA-238 URL: https://issues.apache.org/jira/browse/KAFKA-238 Project: Kafka Issue Type: Sub-task Components: core Reporter: Jun Rao We need a way to propagate the leader and the partition information to the producer so that it can do load balancing and semantic partitioning. One way to do that is to have the producer get the information from ZK directly. This means that the producer needs to maintain a ZK session and has to subscribe to watchers, which can be complicated. An alternative approach is to have the following api on the broker. TopicMetaData getTopicMetaData(String: topic) TopicMetaData { Array[PartitionMetaData]: partitionsMetaData } PartitionMetaData { Int: partitionId String: leaderHostname Int: leaderPort } Using this api, the producer can get the metadata about a topic during initial startup or leadership change of a partition. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-229) SimpleConsumer is not logging exceptions correctly so detailed stack trace is not coming in the logs
[ https://issues.apache.org/jira/browse/KAFKA-229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13178537#comment-13178537 ] Jay Kreps commented on KAFKA-229: - +1 committed. SimpleConsumer is not logging exceptions correctly so detailed stack trace is not coming in the logs Key: KAFKA-229 URL: https://issues.apache.org/jira/browse/KAFKA-229 Project: Kafka Issue Type: Bug Reporter: Joe Stein Fix For: 0.8 Attachments: KAFKA-229.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-200) Support configurable send / receive socket buffer size in server
[ https://issues.apache.org/jira/browse/KAFKA-200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13168589#comment-13168589 ] Jay Kreps commented on KAFKA-200: - Interesting. Can we validate the setting directly by checking getReceiveBufferSize()? I think the reason it may be working is that linux defaults tcp window scaling on...I think the current approach would not work on Solaris because it defaults tcp window scaling off. Or maybe I am wrong. But I think we would be better off doing it the way they recommend and setting the buffer size in the way the documentation encourages. Support configurable send / receive socket buffer size in server Key: KAFKA-200 URL: https://issues.apache.org/jira/browse/KAFKA-200 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: John Fung Fix For: 0.8 Attachments: KAFKA-200.patch * Make the send / receive socket buffer size configurable in server. * KafkaConfig.scala already has the following existing variables to support send / receive buffer: socketSendBuffer socketReceiveBuffer * The patch attached to this ticket will read the following existing settings in kafka/config/server.properties and set the corresponding socket buffers . . . # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer=1048576 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-200) Support configurable send / receive socket buffer size in server
[ https://issues.apache.org/jira/browse/KAFKA-200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13168590#comment-13168590 ] Jay Kreps commented on KAFKA-200: - But yes, the way TCP is supposed to work is that it takes the minimum of the size the server or client can support. Support configurable send / receive socket buffer size in server Key: KAFKA-200 URL: https://issues.apache.org/jira/browse/KAFKA-200 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.7 Reporter: John Fung Fix For: 0.8 Attachments: KAFKA-200.patch * Make the send / receive socket buffer size configurable in server. * KafkaConfig.scala already has the following existing variables to support send / receive buffer: socketSendBuffer socketReceiveBuffer * The patch attached to this ticket will read the following existing settings in kafka/config/server.properties and set the corresponding socket buffers . . . # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer=1048576 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer=1048576 -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-197) Embedded consumer doesn't shut down if the server can't start
[ https://issues.apache.org/jira/browse/KAFKA-197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13162587#comment-13162587 ] Jay Kreps commented on KAFKA-197: - I don't think we want to call halt(), that is like kill -9 the process. I think we want the logs to flush and shutdown gracefully. Can't we just do a graceful shutdown on both the server and the embedded consumer? Embedded consumer doesn't shut down if the server can't start - Key: KAFKA-197 URL: https://issues.apache.org/jira/browse/KAFKA-197 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Jun Rao Fix For: 0.7.1 Attachments: KAFKA-197.patch If a broker embeds a consumer and the broker itself doesn't start (e.g., conflicting broker id in ZK), the embedded consumer is still running. In this case, we should probably shut down the embedded consumer too. To do this, we need to either throw an exception or return an error in KafkaServer.startup and act accordingly in KafkaServerStartable.startup. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-222) Mavenize contrib
[ https://issues.apache.org/jira/browse/KAFKA-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13162613#comment-13162613 ] Jay Kreps commented on KAFKA-222: - Don't we need to do the same thing now on Josh's patch? The longer we hold off on that patch the more it will drift. Mavenize contrib Key: KAFKA-222 URL: https://issues.apache.org/jira/browse/KAFKA-222 Project: Kafka Issue Type: Task Components: contrib Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Blocker Fix For: 0.7 Attachments: kafka-222.patch To reduce the overhead of maintaining the NOTICE and LICENSE files, we need to mavenize most checked-in jars. Out of the current checked-in jars, the following ones cannot be mavenized - 1. lib/sbt-launch.jar: This is required to fire up the build system of Kafka (SBT) 2. core/lib/zkclient-20110412.jar: Kafka uses a patched zkclient jar 3. contrib/hadoop-consumer/lib/piggybank.jar: This is not available through Maven 4. contrib/hadoop-consumer/lib/pig-0.8.0-core.jar: This is also not available through Maven -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-197) Embedded consumer doesn't shut down if the server can't start
[ https://issues.apache.org/jira/browse/KAFKA-197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13162616#comment-13162616 ] Jay Kreps commented on KAFKA-197: - A less invasive way would just be to have the embedded consumer register a shutdown hook and use System.exit. I am a little concerned about this whole embedded consumer thing, though. The original approach where we wrote to the local log in process was pretty fool proof. I think sending to a remote broker is actually riddled with issues. The producer send buffer is vulnerable to quite a large loss on any unclean shutdown or indeed any shutdown bugs. And also any condition that leads to a broker being unable to take requests but still registered in zk will lead to unbounded data loss. I wonder if this issue isn't just a special case of many many bad things that could happen. With the current approach I actually don't see any benefits at all to bundling the replication process with the kafka broker. It would actually be better to have that run independently it seems to me. Embedded consumer doesn't shut down if the server can't start - Key: KAFKA-197 URL: https://issues.apache.org/jira/browse/KAFKA-197 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Jun Rao Fix For: 0.7.1 Attachments: KAFKA-197.patch If a broker embeds a consumer and the broker itself doesn't start (e.g., conflicting broker id in ZK), the embedded consumer is still running. In this case, we should probably shut down the embedded consumer too. To do this, we need to either throw an exception or return an error in KafkaServer.startup and act accordingly in KafkaServerStartable.startup. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-212) IllegalThreadStateException in topic watcher for Kafka mirroring
[ https://issues.apache.org/jira/browse/KAFKA-212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13158699#comment-13158699 ] Jay Kreps commented on KAFKA-212: - I don't think that answers my question, though, which is how do we know if we are leaking threads? I guess the patch doesn't make it better or worse, since we definitely don't want to keep them in the list, but can you assess what happens if shutdown fails? Can that happen? Do we log it? Or is there a guarantee that the thread must shutdown in some bounded period of time? IllegalThreadStateException in topic watcher for Kafka mirroring Key: KAFKA-212 URL: https://issues.apache.org/jira/browse/KAFKA-212 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: KAFKA-212.patch If the kafka mirroring embedded consumer receives a new topic watcher notification, it runs into the following exception [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException (kafka.consumer.ZookeeperTopicEventWatcher) [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException at java.lang.Thread.start(Thread.java:595) at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.server.EmbeddedConsumer.startNewConsumerThreads(KafkaServerStartable.scala:142) at kafka.server.EmbeddedConsumer.handleTopicEvent(KafkaServerStartable.scala:109) at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree2$1(ZookeeperTopicEventWatcher.scala:83) at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:78) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) (kafka.consumer.ZookeeperTopicEventWatcher) This happens since it tries to start a thread which has finished executing -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-134) Upgrade Kafka to sbt 0.10.1
[ https://issues.apache.org/jira/browse/KAFKA-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155953#comment-13155953 ] Jay Kreps commented on KAFKA-134: - So Neha, can you review this patch. I would love to get this applied so we can get the eclipse support working again. Upgrade Kafka to sbt 0.10.1 --- Key: KAFKA-134 URL: https://issues.apache.org/jira/browse/KAFKA-134 Project: Kafka Issue Type: Improvement Components: packaging Reporter: Joshua Hartman Attachments: kafka_patch.txt Upgrading to sbt 0.10.1 is a nice to have as sbt moves forward. Plus, it's a requirement for me to help publish Kafka to maven :) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-134) Upgrade Kafka to sbt 0.10.1
[ https://issues.apache.org/jira/browse/KAFKA-134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155989#comment-13155989 ] Jay Kreps commented on KAFKA-134: - Well either way. The problem is I am totally lost in SBT with any version, but I feel the longer this sits the more it gets out of sync with all the various build files changes we are making. I tried to review this but basically failed after about 45 minutes of training to grok the weirdness of SBT, so i am hoping someone more experienced can take a shot. Upgrade Kafka to sbt 0.10.1 --- Key: KAFKA-134 URL: https://issues.apache.org/jira/browse/KAFKA-134 Project: Kafka Issue Type: Improvement Components: packaging Reporter: Joshua Hartman Attachments: kafka_patch.txt Upgrading to sbt 0.10.1 is a nice to have as sbt moves forward. Plus, it's a requirement for me to help publish Kafka to maven :) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-212) IllegalThreadStateException in topic watcher for Kafka mirroring
[ https://issues.apache.org/jira/browse/KAFKA-212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13156276#comment-13156276 ] Jay Kreps commented on KAFKA-212: - Any thread that doesn't shut down cleanly will leak, is that a problem? Can that happen? IllegalThreadStateException in topic watcher for Kafka mirroring Key: KAFKA-212 URL: https://issues.apache.org/jira/browse/KAFKA-212 Project: Kafka Issue Type: Bug Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.7.1 Attachments: KAFKA-212.patch If the kafka mirroring embedded consumer receives a new topic watcher notification, it runs into the following exception [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException (kafka.consumer.ZookeeperTopicEventWatcher) [2011-11-23 02:49:15,612] FATAL java.lang.IllegalThreadStateException at java.lang.Thread.start(Thread.java:595) at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) at kafka.server.EmbeddedConsumer$$anonfun$startNewConsumerThreads$3.apply(KafkaServerStartable.scala:142) at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61) at scala.collection.immutable.List.foreach(List.scala:45) at kafka.server.EmbeddedConsumer.startNewConsumerThreads(KafkaServerStartable.scala:142) at kafka.server.EmbeddedConsumer.handleTopicEvent(KafkaServerStartable.scala:109) at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.liftedTree2$1(ZookeeperTopicEventWatcher.scala:83) at kafka.consumer.ZookeeperTopicEventWatcher$ZkTopicEventListener.handleChildChange(ZookeeperTopicEventWatcher.scala:78) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) (kafka.consumer.ZookeeperTopicEventWatcher) This happens since it tries to start a thread which has finished executing -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155331#comment-13155331 ] Jay Kreps commented on KAFKA-49: Hi Taylor, as you say the request id was meant to be the version. However in retrospect I do think I like the idea of separating the request and the version of the request. I agree it would be nice to split this. I think the open question here is whether we should try to maintain backwards compatibility for the next major release. It would probably be very convenient for us at code-writing time not to, but is more painful at rollout time. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Reporter: Jun Rao Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-176) Fix existing perf tools
[ https://issues.apache.org/jira/browse/KAFKA-176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13155674#comment-13155674 ] Jay Kreps commented on KAFKA-176: - +1 Fix existing perf tools --- Key: KAFKA-176 URL: https://issues.apache.org/jira/browse/KAFKA-176 Project: Kafka Issue Type: Sub-task Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.8 Attachments: KAFKA-176-v2.patch, kafka-176.patch The existing perf tools - ProducerPerformance.scala, ConsumerPerformance.scala and SimpleConsumerPerformance.scala are slightly buggy. It will be good to - 1. move them to a perf directory from the existing kafka/tools location 2. fix the bugs, so that they measure throughput correctly -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-208) Efficient polling mechanism for many topics
[ https://issues.apache.org/jira/browse/KAFKA-208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13152122#comment-13152122 ] Jay Kreps commented on KAFKA-208: - Ah, I see, now. So if I understand your need you actually have a very large number of topics (say a thousand) so the reason for splitting the register and poll calls is to avoid sending the topic names each time. I think the problem with a separate register call is how to manage that metadata on the server. I would be concerned about something that had to be manually deallocated because it is possible for the client to leak server resources. Presumably with enough support it could somehow be tied to the life of the socket that created it...so maybe each socket could only register at most one interest set and that would be deallocated if that socket closed. Efficient polling mechanism for many topics --- Key: KAFKA-208 URL: https://issues.apache.org/jira/browse/KAFKA-208 Project: Kafka Issue Type: New Feature Reporter: Taylor Gautier Currently, the way to poll many topics is to submit a request for each one in turn, and read the responses. Especially if there are few messages delivered on many topics, the network overhead to implement this scheme can far outweigh the bandwidth of the actual messages delivered. Effectively, for topics A, B, C the request/response scheme is the following: - Request A offset a - Request B offset b - Request C offset c - no messages - 1 message offset b - no messages - Request A offset a - Request B offset b' - Request C offset c - no messages - no messages - no messages etc. I propose a more efficient mechanism which works a bit like epoll in that the client can register interest in a particular topic. There are many options for the implementation, but the one I suggest goes like so: - Register interest A offset a - Register interest B offset b - Register interest C offset c - Next message (null) - messages for B (1 message) - Next message (topic B offset b') - no messages - Unregister Interest C ... It is possible to implement the Next Message request as either blocking or non-blocking. I suggest that the request format include space for the timeout, which if set to 0 will be a nonblocking response, and if set to anything other than 0, will block for at most timeout ms. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-208) Efficient polling mechanism for many topics
[ https://issues.apache.org/jira/browse/KAFKA-208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13151760#comment-13151760 ] Jay Kreps commented on KAFKA-208: - Hey Taylor how does this relate to KAFKA-48 and KAFKA-170? I think you are proposing something slightly different but I am not sure the exact relationship. Efficient polling mechanism for many topics --- Key: KAFKA-208 URL: https://issues.apache.org/jira/browse/KAFKA-208 Project: Kafka Issue Type: New Feature Reporter: Taylor Gautier Currently, the way to poll many topics is to submit a request for each one in turn, and read the responses. Especially if there are few messages delivered on the many topics, the network overhead to implement this scheme can far outweigh the bandwidth of the actual messages delivered. Effectively, for topics A, B, C the request/response scheme is the following: - Request A offset a - Request B offset b - Request C offset c - no messages - 1 message offset b - no messages - Request A offset a - Request B offset b' - Request C offset c - no messages - no messages - no messages etc. I propose a more efficient mechanism which works a bit like epoll in that the client can register interest in a particular topic. There are many options for the implementation, but the one I suggest goes like so: - Register interest A offset a - Register interest B offset b - Register interest C offset c - Next message (null) - messages for B (1 message) - Next message (topic B offset b') - no messages - Unregister Interest C ... It is possible to implement the Next Message request as either blocking or non-blocking. I suggest that the request format include space for the timeout, which if set to 0 will be a nonblocking response, and if set to anything other than 0, will block for at most timeout ms. If a message is ready it would act -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-204) BoundedByteBufferReceive hides OutOfMemoryError
[ https://issues.apache.org/jira/browse/KAFKA-204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13150590#comment-13150590 ] Jay Kreps commented on KAFKA-204: - Uh...does anyone know what the motivation of this was originally? Catching OutOfMemoryError is a bit unorthodox... BoundedByteBufferReceive hides OutOfMemoryError --- Key: KAFKA-204 URL: https://issues.apache.org/jira/browse/KAFKA-204 Project: Kafka Issue Type: Bug Affects Versions: 0.7 Reporter: Chris Burroughs Priority: Critical private def byteBufferAllocate(size: Int): ByteBuffer = { var buffer: ByteBuffer = null try { buffer = ByteBuffer.allocate(size) } catch { case e: OutOfMemoryError = throw new RuntimeException(OOME with size + size, e) case e2 = throw e2 } buffer } This hides the fact that an Error occurred, and will likely result in some log handler printing a message, instead of exiting with non-zero status. Knowing how large the allocation was that caused an OOM is really nice, so I'd suggest logging in byteBufferAllocate and then re-throwing OutOfMemoryError -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-49) Add acknowledgement to the produce request.
[ https://issues.apache.org/jira/browse/KAFKA-49?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13151039#comment-13151039 ] Jay Kreps commented on KAFKA-49: If there are any other producer-related changes coming we should try to batch these together at the same time to avoid having to update clients too often. Add acknowledgement to the produce request. --- Key: KAFKA-49 URL: https://issues.apache.org/jira/browse/KAFKA-49 Project: Kafka Issue Type: Bug Reporter: Jun Rao Currently, the produce request doesn't get acknowledged. We need to have a broker send a response to the producer and have the producer wait for the response before sending the next request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-174) Add performance suite for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-174?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149812#comment-13149812 ] Jay Kreps commented on KAFKA-174: - Chris, following up on this, I need to redo the JMX for the network server for KAFKA-202, let's move the discussion on internal metrics and JMX to KAFKA-203. Add performance suite for Kafka --- Key: KAFKA-174 URL: https://issues.apache.org/jira/browse/KAFKA-174 Project: Kafka Issue Type: New Feature Reporter: Neha Narkhede Fix For: 0.8 This is a placeholder JIRA for adding a perf suite to Kafka. The high level proposal is here - https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing There will be more JIRAs covering smaller tasks to fully implement this. They will be linked to this JIRA. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-193) use by name parameter helper for logging and trait to include lazy logging and refactor code to use the new LogHelper
[ https://issues.apache.org/jira/browse/KAFKA-193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149120#comment-13149120 ] Jay Kreps commented on KAFKA-193: - Yeah so my recommendations. 1. Let's have the same variants for each level for consistency. Sounds like that is info(m:=String), info(m:=String, t: Throwable) and info(t: Throwable). 2. I am cool with either String or Object as the message type (e.g. info(m:=String) or info(m:=Object). I don't see a great benefit of the Object type in log4j. 3. Let's definitely make the form that takes only a throwable print the stack trace. I think t.getStackTrace() won't quite do it because the return value is just an array. We can probably have that do info(, t), which I think would do what we want (?) or we have a utility function which I think formats the stack trace. use by name parameter helper for logging and trait to include lazy logging and refactor code to use the new LogHelper - Key: KAFKA-193 URL: https://issues.apache.org/jira/browse/KAFKA-193 Project: Kafka Issue Type: Improvement Affects Versions: 0.7 Reporter: Joe Stein Fix For: 0.8 Attachments: kafka-193.patch 1) New tait to include logging and helper methods so if (log.isDebugEnabled()) is not required because it is in the helper and log paramaters are passed by name so not executed to tidy up the code 2) refactor all occurrence of logging to use the log helper 3/4 (possibly to be handled in to tickets) the lint affect from this for changes patched but not on trunk and new patches moving forward until this is baked in -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-173) Support encoding for non ascii characters
[ https://issues.apache.org/jira/browse/KAFKA-173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149159#comment-13149159 ] Jay Kreps commented on KAFKA-173: - Anyone who can review this? Support encoding for non ascii characters - Key: KAFKA-173 URL: https://issues.apache.org/jira/browse/KAFKA-173 Project: Kafka Issue Type: Bug Components: clients Reporter: Alejandro Priority: Minor Attachments: 0001-Several-fixes-for-Ruby-client.patch See attached patch. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-163) Ruby client needs to support new compression byte
[ https://issues.apache.org/jira/browse/KAFKA-163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13149160#comment-13149160 ] Jay Kreps commented on KAFKA-163: - Hi Aaron, I think there is missing licensing info. Can you update that and trying running the rat tool under bin/ to verify it is all apache compliant? Ruby client needs to support new compression byte - Key: KAFKA-163 URL: https://issues.apache.org/jira/browse/KAFKA-163 Project: Kafka Issue Type: New Feature Components: clients Reporter: AaronR Priority: Minor Attachments: KAFKA-158.patch, KAFKA-163.patch, rat.out Ruby client updates -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-193) use by name parameter helper for logging and trait to include lazy logging and refactor code to use the new LogHelper
[ https://issues.apache.org/jira/browse/KAFKA-193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13148974#comment-13148974 ] Jay Kreps commented on KAFKA-193: - This looks good to me. One thing I notice is that the type of the argument is, for example, debug(m: =String) but the type in log4j is void debug(Object m) Is this intentional? There are two impacts from this: Good: people will no longer do error(e) The problem with this is that I think it uses e.toString which doesn't print the stack trace but just prints the message. Bad(?): I can no longer print a non-string without calling toString, e.g. debug(kafkaRequestObj) use by name parameter helper for logging and trait to include lazy logging and refactor code to use the new LogHelper - Key: KAFKA-193 URL: https://issues.apache.org/jira/browse/KAFKA-193 Project: Kafka Issue Type: Improvement Affects Versions: 0.7 Reporter: Joe Stein Fix For: 0.8 Attachments: kafka-193.patch 1) New tait to include logging and helper methods so if (log.isDebugEnabled()) is not required because it is in the helper and log paramaters are passed by name so not executed to tidy up the code 2) refactor all occurrence of logging to use the log helper 3/4 (possibly to be handled in to tickets) the lint affect from this for changes patched but not on trunk and new patches moving forward until this is baked in -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-199) Add a how-to-mirror document
[ https://issues.apache.org/jira/browse/KAFKA-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13148256#comment-13148256 ] Jay Kreps commented on KAFKA-199: - This is great! Personally I really prefer the site html to the wiki for reading, though the wiki is easier to edit. Add a how-to-mirror document Key: KAFKA-199 URL: https://issues.apache.org/jira/browse/KAFKA-199 Project: Kafka Issue Type: Improvement Components: website Reporter: Joel Koshy Attachments: KAFKA-199.patch, mirroring.png I owe kafka-dev a mirroring document, and someone on the user mailing list had questions on mirroring which reminded me to work on this. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-199) Add a how-to-mirror document
[ https://issues.apache.org/jira/browse/KAFKA-199?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13148258#comment-13148258 ] Jay Kreps commented on KAFKA-199: - Also, one comment, the bit about using mirroring for scalability doesn't quite make sense if you think about it. A single cluster of N machines should be able to handle more reads then two clusters of N/2 (even if you don't count the reads the mirror needs to do). It does help isolation for different SLAs (i.e. ad hoc usage), and multi data center stuff. Add a how-to-mirror document Key: KAFKA-199 URL: https://issues.apache.org/jira/browse/KAFKA-199 Project: Kafka Issue Type: Improvement Components: website Reporter: Joel Koshy Attachments: KAFKA-199.patch, mirroring.png I owe kafka-dev a mirroring document, and someone on the user mailing list had questions on mirroring which reminded me to work on this. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-191) Investigate removing the synchronization in Log.flush
[ https://issues.apache.org/jira/browse/KAFKA-191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13144898#comment-13144898 ] Jay Kreps commented on KAFKA-191: - Neha--Sounds good. I have a patch (since it is just deleting and reordering, the code change itself is trivial), I will attach. Here are my thoughts. I think we can just remove the synchronization and re-order things so that the unflushed counter and lastFlushTime both remain valid lower bounds. It is possible that the time we set could get overwritten by another thread but it is unlikely to make any practical difference. See if you agree with that logic, I am not 100% positive. I am not at all sure that this will actually help performance though for two reasons. First, I think it is possible that the file itself may be synchronized. Either at the java level or the OS level. So I am not sure if one can write to the file while a flush is occurring in another thread. This may take some research to understand. Second, if it is possible to do parallel write and flush, I think this still may not be ideal (though maybe a good short term hack). My reasoning is that this patch only fixes the blocking behavior for the time-based flush, but my question is why would I ever want to block? I really see two use cases: 1. I want every message I write immediately flushed to disk in a blocking fashion before the append() is considered completed. This corresponds to flush.interval=1 in the current system. 2. I want to periodically flush data, which could be based on the number of messages, or time, (or theoretically based on unflushed bytes, though we haven't implemented that). So what I am thinking is that case (1) clearly needs to be blocking to make sense. But for any periodic flush I don't see a reason to block appends. It is true that this makes the intervals inexact, but I think that is probably fine.. For example, even if I set flush.interval=5, it is unlikely that I could actually care that it is exactly 5, I just want to flush often, say ~5 messages. (Even if I did want it exact, since we always write the full messageset, I still might not get that). So I am thinking a better long-term approach might be to have a central threadpool that handles all flushing and have that always be asynchronous. So if I set flush.interval=5, then that means the background thread is triggered every 5 messages BUT no one blocks on this. In addition to this we add an immediate.commit=true/false option to force data to be flushed in a blocking way as part of the append. Obviously the above only works if a parallel append and flush are possible. Investigate removing the synchronization in Log.flush - Key: KAFKA-191 URL: https://issues.apache.org/jira/browse/KAFKA-191 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Neha Narkhede Attachments: KAFKA-191.patch Currently we have the following synchronization in Log.scala: 1. append, roll, and flush all share a write lock 2. read is non-blocking Unfortunately this means that flush time latency is added to appends (even if the flush is done by a background thread). To fix this we should investigate a scheme to make append and flush not block each other. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-181) Log errors for unrecognized config options
[ https://issues.apache.org/jira/browse/KAFKA-181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13141517#comment-13141517 ] Jay Kreps commented on KAFKA-181: - Yes, please yes. I recommend we create a Config object that wraps java.util.Properties. It should include all the random Utils helpers we have for parsing ints and stuff. Whenever a get() is called for a property string we should record that property in a set. We can add a method that intersects the requested properties with the provided properties to get unused properties. This config can be used in KafkaConfig and other configs. As a side note, there are many places where we need to be able let the user provide pluggins that implement an interface. Examples are the EventHandler and Serializer interfaces in the producer, and you could imagine us making other things such as offset storage pluggable. One requirement to make this work is that it needs to be possible for the user to set properties for their plugin. For example to create an AvroSerializer you need to be able to pass in a schema.registry.url parameter which needs to get passed through unmolested to the AvroSerializerImpl to use. To enable the config objects like KafkaConfig that parse out their options should retain the original Config instance. The general contract for pluggins should be that they must provide a constructor that takes a Config so that these configs can be passed through. Log errors for unrecognized config options -- Key: KAFKA-181 URL: https://issues.apache.org/jira/browse/KAFKA-181 Project: Kafka Issue Type: Improvement Components: core Reporter: Joel Koshy Fix For: 0.8 Currently, unrecognized config options are silently ignored. Notably, if a config has a typo or if a deprecated config is used, then there is no warning issued and defaults are assumed. One can argue that the broker or a consumer or a producer with an unrecognized config option should not even be allowed to start up especially if defaults are silently assumed, but it would be good to at least log an error. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13140419#comment-13140419 ] Jay Kreps commented on KAFKA-48: Yes, these are all good points. The work I have done so far just splits request processing into a separate thread pool and enables asynchronous handling. This is a fairly general thing we need for a few different use cases. Perhaps this should be broken into a separate JIRA. I have thought a little bit about how to do long poll, though. Logically what I want to do is make it possible to give a minimum byte size for the response and a maximum delay in ms; then have the server delay the response until we have at least min_bytes messages in the response OR we hit the maximum delay time. The goal is both to improve latency (by avoiding waiting in between poll requests), to reduce load on the server (by not polling), and to make it possible to improve throughput. If you set min_bytes = 0 or max_delay_ms = 0 you effectively get the current behavior. The throughput improvement comes if you set the min_bytes 1; this would give a way to artificially increase the response size for requests to the topic (i.e. avoid fetching only a few messages at a time) while still giving hard latency guarantees. We have seen, the request size is one of the important things for network throughput. As you say, the only case to really consider is the multi-fetch case. The single topic fetch can just be seen as a special case of this. I think your first proposal is closer to what I had in mind. Having the response contain an empty message set for the topics that have no data has very little overhead since it is just positionally indexed, so it is like 4 bytes or something. I don't like doing a poll() style interface that just returns ready topics doesn't seem very useful to me because the only logical thing you can do is then initiate a fetch on those topics, right? So might as well just send back the data and have a single request type to worry about? One of the tricky questions for multifetch is what does the minimum byte size pertain to? A straight-forward implementation in the current system would be to add the min_bytes and timeout to the fetch request which would effectively bundle it up N times in the multi-fetch (currently multi-fetch is just N fetches glued together). This doesn't really make sense, though. Which of these minimum sizes would cause the single response to be sent? Would it be when all conditions were satisfied or when one was satisfied? I think the only thing that makes sense is to set these things at the request level. Ideally what I would like to do is remove the fetch request entirely because it is redundant and fix multi-fetch to have the following: [(topic1, partitions1), (topic2, partitions2),...], max_total_size, max_wait_ms This also fixes the weird thing in multifetch now where you have to specify the topic with each partition, so a request for 10 partitions on the same topic repeats the topic name 10 times. This is an invasive change, though, since it means request format changes. I am also not 100% sure how to implement the min_bytes parameter efficiently for multi-fetch. For the single fetch case it is pretty easy, the implementation would be to keep a sort of hybrid priority queue by timeout time (e.g. the unix timestamp at which we owe a response). When a fetch request came in we would try to service it immediately, and if we could meet its requirements we would immediately send a response. If we can't meet its min_bytes requirement then we would calculate the offset for that topic/partition at which the request would be unblocked (e.g. if the current offset is X and the min_bytes is M then the target size is X+M). We would insert new requests into this watchers list maintaining a sort by increasing target size. Each time a produce request is handled we would respond to all the watching requests whose target size is then new offset, this would just require walking the list until we see a request with a target size greater than the current offset. All the newly unblocked requests would be added to the response queue. So this means the only work added to a produce request is the work of transferring newly unblocked requests to the response queue and at most we only need to examine one blocked request. The timeout could be implemented by keeping a priority queue of requests based on the unix timestamp of the latest allowable response (i.e. the ts the request came in, plus the max_wait_ms). We could add a background thread to remove items from this as their timeout occurs, and add them to the response queue with an empty response. For the multifetch case, things are harder to do efficiently. The timeouts can still work the same way. However the min_bytes is now over all the topics the request
[jira] [Commented] (KAFKA-48) Implement optional long poll support in fetch request
[ https://issues.apache.org/jira/browse/KAFKA-48?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13140467#comment-13140467 ] Jay Kreps commented on KAFKA-48: Hi Taylor, Could you give a little more detail on your use case for ordering the fetches? I think you have a use case I haven't thought of, but I don't know if I understand it. Is your motivation some kind of quality of service over the topics? As you say, this would definitely be a new request type for compatibility, and we would probably try to deprecate the old format over the next few releases as we can get clients updated. Your point about complexity is valid. I think for our usage since we use kafka very heavily the pain of grandfathering in new APIs is the hardest part, and the socket server refactoring is next, so I was thinking the difficulty of implementing a few internal data structures is not too bad. I suppose it depends on if I work out a concrete plan there or not. If the best we can do is iterate over the full set of watchers it may not be worth it. Implement optional long poll support in fetch request --- Key: KAFKA-48 URL: https://issues.apache.org/jira/browse/KAFKA-48 Project: Kafka Issue Type: Bug Reporter: Alan Cabrera Assignee: Jay Kreps Attachments: KAFKA-48-socket-server-refactor-draft.patch Currently, the fetch request is non-blocking. If there is nothing on the broker for the consumer to retrieve, the broker simply returns an empty set to the consumer. This can be inefficient, if you want to ensure low-latency because you keep polling over and over. We should make a blocking version of the fetch request so that the fetch request is not returned until the broker has at least one message for the fetcher or some timeout passes. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-176) Fix existing perf tools
[ https://issues.apache.org/jira/browse/KAFKA-176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13140895#comment-13140895 ] Jay Kreps commented on KAFKA-176: - tryCleanupZookeeper looks cut-and-pasted from place to place. It shows up in ConsoleConsumer.scala, ReplayLogProducer.scala, and ConsumerPerformance.scala. We should not do that. Can we make some kind of utility function for that? Also there is a PerfConfig class, which is a great idea as a way to normalize some of the config options we are using between all the tools. But it looks like the class is just duplicated between the tools. Can this be shared? I would like to cleanup the scripts used to run these things so that we get rid of all the silly ancient ones (*simple-perf-test.sh and *shell.sh), but I think I will open a seperate ticket for that since it is unrelated to your changes. Fix existing perf tools --- Key: KAFKA-176 URL: https://issues.apache.org/jira/browse/KAFKA-176 Project: Kafka Issue Type: Sub-task Reporter: Neha Narkhede Assignee: Neha Narkhede Fix For: 0.8 Attachments: kafka-176.patch The existing perf tools - ProducerPerformance.scala, ConsumerPerformance.scala and SimpleConsumerPerformance.scala are slightly buggy. It will be good to - 1. move them to a perf directory from the existing kafka/tools location 2. fix the bugs, so that they measure throughput correctly -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-168) Support locality in consumer partition assignment algorithm
[ https://issues.apache.org/jira/browse/KAFKA-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13138614#comment-13138614 ] Jay Kreps commented on KAFKA-168: - The broker must have an abstract id not tied to host:port, to make it possible to change the hostname or port or move the data. The consumer id could contain host (there is no port), though I don't know if this is a good idea. What I had in mind was just adding this metadata. E.g. programmatically getting the hostname and adding a rack config parameter and adding this in zk as metadata. Support locality in consumer partition assignment algorithm --- Key: KAFKA-168 URL: https://issues.apache.org/jira/browse/KAFKA-168 Project: Kafka Issue Type: New Feature Components: core Reporter: Jay Kreps There are some use-cases where it makes sense to co-locate brokers and consumer processes. In this case it would be nice to optimize the assignment of partitions to consumers so that the consumer preferentially consumes from the broker with which it is co-located. If we are going to do KAFKA-167, moving the assignment to the broker, it would make sense to do that first so we only have to change the logic in one place. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-171) Kafka producer should do a single write to send message sets
[ https://issues.apache.org/jira/browse/KAFKA-171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13136193#comment-13136193 ] Jay Kreps commented on KAFKA-171: - Attached is a draft patch which turns the request into a single write. This is just a draft if this actually improves performance we should change Receive to use ScatteringByteChannel for consistency and also clean up a few more files with the same trick. On my mac laptop I do see a change in tcpdump which seems to eliminate the 4 byte send. However I don't see any positive result in performance for synchronous single-threaded sends of 10 byte messages (which should be the worst case for this). I think this may just be because I am testing over localhost. Here are the details on the results I have: TRUNK: jkreps-mn:kafka-git jkreps$ sudo tcpdump -i lo0 port 9093 tcpdump: verbose output suppressed, use -v or -vv for full protocol decode listening on lo0, link-type NULL (BSD loopback), capture size 96 bytes 10:32:30.128938 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: S 323648854:323648854(0) win 65535 mss 16344,nop,wscale 3,nop,nop,timestamp 377871870 0,sackOK,eol 10:32:30.129004 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56953: S 526915069:526915069(0) ack 323648855 win 65535 mss 16344,nop,wscale 3,nop,nop,timestamp 377871870 377871870,sackOK,eol 10:32:30.129013 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: . ack 1 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.129022 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56953: . ack 1 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.129306 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: P 1:5(4) ack 1 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.129319 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56953: . ack 5 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.129339 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: P 5:41(36) ack 1 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.129350 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56953: . ack 41 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.151892 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: F 41:41(0) ack 1 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.151938 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56953: . ack 42 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.151946 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: . ack 1 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.152554 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56953: F 1:1(0) ack 42 win 65535 nop,nop,timestamp 377871870 377871870 10:32:30.152571 IP jkreps-mn.linkedin.biz.56953 jkreps-mn.linkedin.biz.9093: . ack 2 win 65535 nop,nop,timestamp 377871870 377871870 PATCHED: jkreps-mn:kafka-git jkreps$ sudo tcpdump -i lo0 port 9093 tcpdump: verbose output suppressed, use -v or -vv for full protocol decode listening on lo0, link-type NULL (BSD loopback), capture size 96 bytes 10:35:40.637220 IP jkreps-mn.linkedin.biz.56993 jkreps-mn.linkedin.biz.9093: S 1456363353:1456363353(0) win 65535 mss 16344,nop,wscale 3,nop,nop,timestamp 377873772 0,sackOK,eol 10:35:40.637287 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56993: S 1260172914:1260172914(0) ack 1456363354 win 65535 mss 16344,nop,wscale 3,nop,nop,timestamp 377873772 377873772,sackOK,eol 10:35:40.637296 IP jkreps-mn.linkedin.biz.56993 jkreps-mn.linkedin.biz.9093: . ack 1 win 65535 nop,nop,timestamp 377873772 377873772 10:35:40.637306 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56993: . ack 1 win 65535 nop,nop,timestamp 377873772 377873772 10:35:40.657848 IP jkreps-mn.linkedin.biz.56993 jkreps-mn.linkedin.biz.9093: P 1:41(40) ack 1 win 65535 nop,nop,timestamp 377873773 377873772 10:35:40.657886 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56993: . ack 41 win 65535 nop,nop,timestamp 377873773 377873773 10:35:40.711399 IP jkreps-mn.linkedin.biz.56993 jkreps-mn.linkedin.biz.9093: F 41:41(0) ack 1 win 65535 nop,nop,timestamp 377873773 377873773 10:35:40.711430 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56993: . ack 42 win 65535 nop,nop,timestamp 377873773 377873773 10:35:40.711437 IP jkreps-mn.linkedin.biz.56993 jkreps-mn.linkedin.biz.9093: . ack 1 win 65535 nop,nop,timestamp 377873773 377873773 10:35:40.762640 IP jkreps-mn.linkedin.biz.9093 jkreps-mn.linkedin.biz.56993: F 1:1(0) ack 42 win 65535 nop,nop,timestamp 377873774 377873773 10:35:40.762678 IP jkreps-mn.linkedin.biz.56993 jkreps-mn.linkedin.biz.9093: . ack 2 win 65535 nop,nop,timestamp 377873774 377873774 TRUNK: bin/kafka-producer-perf-test.sh --topic test --brokerinfo zk.connect=localhost:2181 --messages 30 --message-size 10 --batch-size 1 --threads 1 ... [2011-10-26
[jira] [Commented] (KAFKA-171) Kafka producer should do a single write to send message sets
[ https://issues.apache.org/jira/browse/KAFKA-171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13136223#comment-13136223 ] Jay Kreps commented on KAFKA-171: - Moving off localhost between my mac laptop and dev workstation (linux) I see similar results: TRUNK: jkreps-mn:kafka-git jkreps$ bin/kafka-producer-perf-test.sh --topic test --brokerinfo zk.connect=jkreps-ld:2181 --messages 50 --message-size 10 --batch-size 1 --threads 1 [2011-10-26 11:59:51,795] INFO Total Num Messages: 50 bytes: 500 in 13.046 secs (kafka.tools.ProducerPerformance$) [2011-10-26 11:59:51,795] INFO Messages/sec: 38325.9237 (kafka.tools.ProducerPerformance$) [2011-10-26 11:59:51,795] INFO MB/sec: 0.3655 (kafka.tools.ProducerPerformance$) PATCHED: jkreps-mn:kafka-git jkreps$ bin/kafka-producer-perf-test.sh --topic test --brokerinfo zk.connect=jkreps-ld:2181 --messages 50 --message-size 10 --batch-size 1 --threads 1 [2011-10-26 11:58:42,335] INFO Total Num Messages: 50 bytes: 500 in 13.125 secs (kafka.tools.ProducerPerformance$) [2011-10-26 11:58:42,335] INFO Messages/sec: 38095.2381 (kafka.tools.ProducerPerformance$) [2011-10-26 11:58:42,335] INFO MB/sec: 0.3633 (kafka.tools.ProducerPerformance$) Kafka producer should do a single write to send message sets Key: KAFKA-171 URL: https://issues.apache.org/jira/browse/KAFKA-171 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7, 0.8 Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.8 Attachments: KAFKA-171-draft.patch From email thread: http://mail-archives.apache.org/mod_mbox/incubator-kafka-dev/201110.mbox/%3ccafbh0q1pyuj32thbayq29e6j4wt_mrg5suusfdegwj6rmex...@mail.gmail.com%3e Before sending an actual message, kafka producer do send a (control) message of 4 bytes to the server. Kafka producer always does this action before send some message to the server. I think this is because in BoundedByteBufferSend.scala we do essentially channel.write(sizeBuffer) channel.write(dataBuffer) The correct solution is to use vector I/O and instead do channel.write(Array(sizeBuffer, dataBuffer)) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-166) Create a tool to jump JMX data to a csv file to help build out performance tests
[ https://issues.apache.org/jira/browse/KAFKA-166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13134258#comment-13134258 ] Jay Kreps commented on KAFKA-166: - Yeah, I am not opposed to moving to that metrics package if it doesn't bring in a bunch of dependencies. I think that is a separate issue, though. Right now I am just trying to support scripting up performance tests using our existing jmx metrics. The goal is to be able to dump out perf statistics in CSV for some post analysis and graphs. I wrote a strawman wiki on the goal here: https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing The goal is be able to do nightly performance and integration runs. Neha had done the majority of the work so far, and I was just helping to gather more stats and do the R graphs. Create a tool to jump JMX data to a csv file to help build out performance tests Key: KAFKA-166 URL: https://issues.apache.org/jira/browse/KAFKA-166 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 0.8 Reporter: Jay Kreps Assignee: Jay Kreps Attachments: KAFKA-166.patch In order to get sane performance stats we need to be able to integrate the values we keep in JMX. To enable this it would be nice to have a generic tool that dumped JMX stats to a csv file. We could use this against the producer, consumer, and broker to collect kafka metrics while the tests were running. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-168) Support locality in consumer partition assignment algorithm
[ https://issues.apache.org/jira/browse/KAFKA-168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13134270#comment-13134270 ] Jay Kreps commented on KAFKA-168: - Ideally we would also support rack locality in the partition assignment so that the brokers would have both a hostname and a rack name in their config. We would prefer partitions from an identical host, then from the same rack, then any partition. Support locality in consumer partition assignment algorithm --- Key: KAFKA-168 URL: https://issues.apache.org/jira/browse/KAFKA-168 Project: Kafka Issue Type: New Feature Components: core Reporter: Jay Kreps There are some use-cases where it makes sense to co-locate brokers and consumer processes. In this case it would be nice to optimize the assignment of partitions to consumers so that the consumer preferentially consumes from the broker with which it is co-located. If we are going to do KAFKA-167, moving the assignment to the broker, it would make sense to do that first so we only have to change the logic in one place. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-164) Config should default to a higher-throughput configuration for log.flush.interval
[ https://issues.apache.org/jira/browse/KAFKA-164?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13133787#comment-13133787 ] Jay Kreps commented on KAFKA-164: - Err, ignore the random Jmx file in the patch...left over from another commit. Config should default to a higher-throughput configuration for log.flush.interval - Key: KAFKA-164 URL: https://issues.apache.org/jira/browse/KAFKA-164 Project: Kafka Issue Type: Improvement Components: config Affects Versions: 0.7 Reporter: Jay Kreps Assignee: Jay Kreps Fix For: 0.7 Attachments: KAFKA-164-v2.patch, KAFKA-164.patch Currently we default the flush interval to log.flush.interval=1. This is very slow as it immediately flushes each message. I recommend we change this to 2 and drop the time-based flush to 1 second. This should be a good default trade-off between latency and throughput. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-50) kafka intra-cluster replication support
[ https://issues.apache.org/jira/browse/KAFKA-50?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13130749#comment-13130749 ] Jay Kreps commented on KAFKA-50: Hey Sharad, your comments are all correct. I think using HDFS would certainly require the least implementation effort and contains a mature replication system tested at large scale. The downside is that HDFS is fairly complex in its own right, and has a number of drawbacks for high-availability, low-latency cases (spof is one but not the only one). Also many use cases do not need replication, but supporting hdfs and local fs efficiently probably means two pretty different implementations. We felt that this kind of multi-subscriber log is a really important abstraction in its own right for systems of all kinds and so our thought was to just kind of suck it up and do the full implementation since we thought the end result would be better. kafka intra-cluster replication support --- Key: KAFKA-50 URL: https://issues.apache.org/jira/browse/KAFKA-50 Project: Kafka Issue Type: New Feature Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8 Attachments: kafka_replication_highlevel_design.pdf, kafka_replication_lowlevel_design.pdf Currently, Kafka doesn't have replication. Each log segment is stored in a single broker. This limits both the availability and the durability of Kafka. If a broker goes down, all log segments stored on that broker become unavailable to consumers. If a broker dies permanently (e.g., disk failure), all unconsumed data on that node is lost forever. Our goal is to replicate every log segment to multiple broker nodes to improve both the availability and the durability. We'd like to support the following in Kafka replication: 1. Configurable synchronous and asynchronous replication 2. Small unavailable window (e.g., less than 5 seconds) during broker failures 3. Auto recovery when a failed broker rejoins 4. Balanced load when a broker fails (i.e., the load on the failed broker is evenly spread among multiple surviving brokers) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-160) ZK consumer gets into infinite loop if a message is larger than fetch size
[ https://issues.apache.org/jira/browse/KAFKA-160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13130044#comment-13130044 ] Jay Kreps commented on KAFKA-160: - Hey did I break this? ZK consumer gets into infinite loop if a message is larger than fetch size -- Key: KAFKA-160 URL: https://issues.apache.org/jira/browse/KAFKA-160 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Jun Rao Assignee: Jun Rao Attachments: KAFKA-160.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-156) Messages should not be dropped when brokers are unavailable
[ https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13129265#comment-13129265 ] Jay Kreps commented on KAFKA-156: - This would be a good feature to have. I think we will probably not use it at LinkedIn, because managing disk space on all machines is something we would like to avoid. I think likely we will just use replication in cases where we want to guarantee delivery. However many people have asked for this. There are a lot of implementation details to work out. I think the sanest thing to do would be to use the Log.scala class to create a single log for the client. In the case of the async producer, it should really log the event to this log immediately and background thread should pull from this log instead of using an in-memory blocking queue. This would guarantee delivery (assuming nothing happens to the filesystem of the sender machine). The sync producer could probably just log events that fail? That kind of changes the semantics though. It would be good to see a concrete proposal of how all this would plug together, what guarantees it would give, etc. Messages should not be dropped when brokers are unavailable --- Key: KAFKA-156 URL: https://issues.apache.org/jira/browse/KAFKA-156 Project: Kafka Issue Type: Improvement Reporter: Sharad Agarwal Fix For: 0.8 When none of the broker is available, producer should spool the messages to disk and keep retrying for brokers to come back. This will also enable brokers upgrade/maintenance without message loss. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-156) Messages should not be dropped when brokers are unavailable
[ https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13129296#comment-13129296 ] Jay Kreps commented on KAFKA-156: - To clarify what I was saying, the single log would likely have the message content as well as the key (if any), and the topic...essentially all the metadata about the send. I think there is no reason to keep more than one log since this is effectively just the unsent messages log. Messages should not be dropped when brokers are unavailable --- Key: KAFKA-156 URL: https://issues.apache.org/jira/browse/KAFKA-156 Project: Kafka Issue Type: Improvement Reporter: Sharad Agarwal Fix For: 0.8 When none of the broker is available, producer should spool the messages to disk and keep retrying for brokers to come back. This will also enable brokers upgrade/maintenance without message loss. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-130) Provide a default producer for receiving messages from STDIN
[ https://issues.apache.org/jira/browse/KAFKA-130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13119131#comment-13119131 ] Jay Kreps commented on KAFKA-130: - I will add some docs, can I get a +1 on this? Provide a default producer for receiving messages from STDIN Key: KAFKA-130 URL: https://issues.apache.org/jira/browse/KAFKA-130 Project: Kafka Issue Type: New Feature Affects Versions: 0.6 Reporter: Felix GV Assignee: Jay Kreps Priority: Minor Attachments: kafka-console-producer.patch It would be useful to provide a default producer we can fire up that reads from STDIN and sends one message per line to the broker. The most obvious use case for this is to pipe a tail -f command into it, to tail log files as they're generated. Making it depend on STDIN seems more flexible than a producer that just tails files though. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira