[jira] [Commented] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified
[ https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548436#comment-13548436 ] Maxime Brugidou commented on KAFKA-690: --- this would resolve KAFKA-653 TopicMetadataRequest throws exception when no topics are specified -- Key: KAFKA-690 URL: https://issues.apache.org/jira/browse/KAFKA-690 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: David Arthur Fix For: 0.8 Attachments: KAFKA-690.patch If no topics are sent in a TopicMetadataRequest, `readFrom` throws an exception when trying to get the the head of the topic list for a debug statement. java.util.NoSuchElementException: head of empty list at scala.collection.immutable.Nil$.head(List.scala:386) at scala.collection.immutable.Nil$.head(List.scala:383) at kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43) at kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43) at kafka.utils.Logging$class.debug(Logging.scala:51) at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25) at kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43) at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37) at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:47) at kafka.network.Processor.read(SocketServer.scala:320) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:680) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-634) ConsoleProducer compresses messages and ignores the --compress flag
[ https://issues.apache.org/jira/browse/KAFKA-634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548441#comment-13548441 ] Maxime Brugidou commented on KAFKA-634: --- KAFKA-506 fixed this (commit f64fd3dcbaace1dba7bbd72398bb3e7d28b41d61 in the 0.8 branch) This will be fixed in 0.8 I guess ConsoleProducer compresses messages and ignores the --compress flag --- Key: KAFKA-634 URL: https://issues.apache.org/jira/browse/KAFKA-634 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.7 Reporter: Anentropic Labels: console, producer I am using the kafka-producer-shell.sh script without the --compress option however my messages seem to be gzipped the docs say compression is off by default: http://incubator.apache.org/kafka/configuration.html The only producer.properties file I can find is at: /home/ubuntu/kafka-0.7.2-incubating-src/config/producer.properties In there is: compression.codec=0 My process looks like: root 1748 1746 0 Nov19 ?00:02:37 java -Xmx512M -server -Dlog4j.configuration=file:/usr/local/bin/kafka/../config/log4j.properties -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -cp :/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-compiler.jar:/usr/local/bin/kafka/../project/boot/scala-2.8.0/lib/scala-library.jar:/usr/local/bin/kafka/../core/target/scala_2.8.0/kafka-0.7.2.jar:/usr/local/bin/kafka/../core/lib/*.jar:/usr/local/bin/kafka/../perf/target/scala_2.8.0/kafka-perf-0.7.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/jopt-simple-3.2.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/log4j-1.2.15.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/snappy-java-1.0.4.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zkclient-0.1.jar:/usr/local/bin/kafka/../core/lib_managed/scala_2.8.0/compile/zookeeper-3.3.4.jar kafka.producer.ConsoleProducer --topic logtail --zookeeper x.x.x.x:2181 But the messages come out gobbledegook unless I use a client that understands compressed messages, and in that client it identifies the bit as set to 1, gzip compression. Jun Rao jun...@gmail.com via incubator.apache.org Nov 26 (1 day ago) to kafka-users This seems to be a bug in ConsoleProducer. It also compresses messages and ignores the --compress flag. Could you file a jira? Thanks, Jun -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: kafka 0.8 producer throughput
What's the ack for? If it fails, it will try another broker? Can this be disabled or it's a major design change? On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao jun...@gmail.com wrote: The 50MB/s number is for 0.7. We haven't carefully measured the performance in 0.8 yet. We do expect the throughput that a single producer can drive in 0.8 to be less. This is because the 0.8 producer needs to wait for an RPC response from the broker while in 0.7, there is no ack for the producer. Nevertheless, 2MB/s seems low. Could you try increasing flush interval to sth bigger, like 2? Thanks, Jun On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) ju...@cisco.com wrote: According to Kafka official document, the producer throughput is about 50MB/S. But I do some test, the producer throughout is only about 2MB/S. The test environment is the same with document says. One producer, One broker, One Zookeeper are in independent machine. Message size is 100 bytes, batch size is 200, flush interval is 600 messages. The test environment is the same, the configuration is the same. The why there is such big gap the my test result and the document says?
[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist
[ https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548613#comment-13548613 ] Jay Kreps commented on KAFKA-689: - This is pretty hacky though, no? fetching metadata should not create topics--that is like a getter subtly changing values underneith you. I think this is more evidence for needing to expose a proper create_topic api. Can't append to a topic/partition that does not already exist - Key: KAFKA-689 URL: https://issues.apache.org/jira/browse/KAFKA-689 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8 Reporter: David Arthur Attachments: kafka.log, produce-payload.bin With a totally fresh Kafka (empty logs dir and empty ZK), if I send a ProduceRequest for a new topic, Kafka responds with kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 doesn't exist on 0. This is when sending a ProduceRequest over the network (from Python, in this case). If I use the console producer it works fine (topic and partition get created). If I then send the same payload from before over the network, it works. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: Kafka 0.8 without replication - handle broker failure/availability
This is a good point. We have discussed this a little bit before. The key constraint is that with replication factor 1 you can choose one of the following: (1) high availability, (2) correct semantic partitioning. That is to say, if a particular partition is unavailable you have no choice but to give up and throw an error or else send the message elsewhere. Obviously replication fixes this by just making the partitions highly available. It isn't really correct for us to choose one of these for the user. If they are depending on partitioning, silently sending data elsewhere may be worse then giving an error. So the user needs to somehow specify which behavior they want. Here is a JIRA where we can work out the details. I suspect this is a blocker for 0.8: https://issues.apache.org/jira/browse/KAFKA-691 As a work around in the meantime you can probably run with replication--although it sounds like you don't really need it, it shouldn't hurt. -Jay On Wed, Jan 9, 2013 at 2:38 AM, Maxime Brugidou maxime.brugi...@gmail.comwrote: Hello, I am currently testing the 0.8 branch (and it works quite well). We plan to not use the replication feature for now since we don't really need it, we can afford to lose data in case of unrecoverable failure from a broker. However, we really don't want to have producers/consumers fail if a broker is down. The ideal scenario (that was working on 0.7) is that producers would just produce to available partitions and consumers would consume from available partitions. If the broker comes back online, the consumer will catch up, if not we can decide to throw away the data. Is this feasible from 0.8? right now if i kill a broker it just makes everything fail... Multiple issues will come up: - Since now the partitions are set globally and never change, the availability of a topic vary depending on where the partitions are located - We would need tools to make sure topics are spread enough and rebalance them accordingly, (using the DDL i heard about, i'm not sure yet about how it works, i tried editing the json strings in zk, it somehow works, and there's the reassignment admin command too) That looks rather complicated, or maybe I'm missing something? The model that was used in 0.7 looked much easier to operate (it had drawbacks, and couldn't do intra-cluster replication, but at least the availability of the cluster was much higher). Thanks in advance for any help/clues, Maxime
[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist
[ https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548646#comment-13548646 ] Jay Kreps commented on KAFKA-689: - Well I guess what I am saying is that getting metadata is not intuitively at all related to creating topics. I had noticed this code before but hadn't really thought about it. I assume the reason for this is because to make a correct produce request you have to know the host so the old strategy of doing auto-create on produce doesn't work in 0.8. I think there are two sensible strategies for auto-create: 1. Auto create on produce. This is tricky because you have to somehow ensure that the local node would hold the partitions used (and how did the client come up with those partitions anyway?) 2. Add a public api for creating topics and make the client implement auto create client-side I would favor (2). There is no harm in the current scheme as long as people are warned that we intend to change it. Can't append to a topic/partition that does not already exist - Key: KAFKA-689 URL: https://issues.apache.org/jira/browse/KAFKA-689 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8 Reporter: David Arthur Attachments: kafka.log, produce-payload.bin With a totally fresh Kafka (empty logs dir and empty ZK), if I send a ProduceRequest for a new topic, Kafka responds with kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 doesn't exist on 0. This is when sending a ProduceRequest over the network (from Python, in this case). If I use the console producer it works fine (topic and partition get created). If I then send the same payload from before over the network, it works. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: kafka 0.8 producer throughput
We haven't done a ton of performance work on 0.8 yet. Regardless, requiring the ack will certainly reduce per-producer throughput, but it is too early to say by how much. Obviously this won't impact broker throughput (so if you have many producers you may not notice). The plan to fix this is just to make the produce request non-blocking. This will allow the same kind of throughput we had before but still allow us to give you back and error response if you want it. The hope would be to make this change in 0.9 -Jay On Wed, Jan 9, 2013 at 8:24 AM, Jun Rao jun...@gmail.com wrote: In 0.8, ack is always required. The ack returns an errorcode that indicates the reason if a produce request fails (e.g., the request is sent to a broker that's not a leader). It also returns the offset of the produced messages. However, the producer can choose when to receive the acks (e.g., when data reaches 1 replica or all replicas). If the ack indicates an error, the client can choose to retry. The retry logic is built into our high level producer. Thanks, Jun On Wed, Jan 9, 2013 at 6:20 AM, S Ahmed sahmed1...@gmail.com wrote: What's the ack for? If it fails, it will try another broker? Can this be disabled or it's a major design change? On Wed, Jan 9, 2013 at 12:40 AM, Jun Rao jun...@gmail.com wrote: The 50MB/s number is for 0.7. We haven't carefully measured the performance in 0.8 yet. We do expect the throughput that a single producer can drive in 0.8 to be less. This is because the 0.8 producer needs to wait for an RPC response from the broker while in 0.7, there is no ack for the producer. Nevertheless, 2MB/s seems low. Could you try increasing flush interval to sth bigger, like 2? Thanks, Jun On Tue, Jan 8, 2013 at 8:32 PM, Jun Guo -X (jungu - CIIC at Cisco) ju...@cisco.com wrote: According to Kafka official document, the producer throughput is about 50MB/S. But I do some test, the producer throughout is only about 2MB/S. The test environment is the same with document says. One producer, One broker, One Zookeeper are in independent machine. Message size is 100 bytes, batch size is 200, flush interval is 600 messages. The test environment is the same, the configuration is the same. The why there is such big gap the my test result and the document says?
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548667#comment-13548667 ] Maxime Brugidou commented on KAFKA-691: --- I think the work-around is not really acceptable for me since it will consume 3x the resources (because replication of 3 is the minimum acceptable) and it will still make the cluster less available anyway (unless i have only 3 brokers). The thing is that 0.7 was making the cluster 100% available (for my use case, accepting data loss) as long a single broker was alive. A way to handle this would be to: 1. Have a lot of partitions per topic (more than the # of brokers) 2. Have something that rebalances the partitions and make sure a broker has a at least a partition for each topic (to make every topic available) 3. Have a setting in the consumer/producer that say I don't care about partitioning, just produce/consume wherever you can Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-690) TopicMetadataRequest throws exception when no topics are specified
[ https://issues.apache.org/jira/browse/KAFKA-690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-690. - Resolution: Fixed Assignee: David Arthur TopicMetadataRequest throws exception when no topics are specified -- Key: KAFKA-690 URL: https://issues.apache.org/jira/browse/KAFKA-690 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8 Reporter: David Arthur Assignee: David Arthur Fix For: 0.8 Attachments: KAFKA-690.patch If no topics are sent in a TopicMetadataRequest, `readFrom` throws an exception when trying to get the the head of the topic list for a debug statement. java.util.NoSuchElementException: head of empty list at scala.collection.immutable.Nil$.head(List.scala:386) at scala.collection.immutable.Nil$.head(List.scala:383) at kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43) at kafka.api.TopicMetadataRequest$$anonfun$readFrom$2.apply(TopicMetadataRequest.scala:43) at kafka.utils.Logging$class.debug(Logging.scala:51) at kafka.api.TopicMetadataRequest$.debug(TopicMetadataRequest.scala:25) at kafka.api.TopicMetadataRequest$.readFrom(TopicMetadataRequest.scala:43) at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37) at kafka.api.RequestKeys$$anonfun$4.apply(RequestKeys.scala:37) at kafka.network.RequestChannel$Request.init(RequestChannel.scala:47) at kafka.network.Processor.read(SocketServer.scala:320) at kafka.network.Processor.run(SocketServer.scala:231) at java.lang.Thread.run(Thread.java:680) -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-662) Create testcases for unclean shut down
[ https://issues.apache.org/jira/browse/KAFKA-662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548689#comment-13548689 ] John Fung commented on KAFKA-662: - 2. testcase_9072 - This testcase is to test the basic behavior of unclean shutdown where log truncation takes place. B1 B2 === === a. Both brokers are up running b. Send 5 messages m0m0 m1m1 m2m2 m3m3 m4m4 c. Shut down B2 d. Send 10 messages m5 m6 m7 m8 m9 m10 m11 m12 m13 m14 e. Shut down B1 (B1 B2 are both down) f. Sleep 5 seconds g. Start B2 (B1 is still down) h. Send 5 messages m15 m16 m17 m18 m19 i. Start B1 (both B1 B2 are up running) Recovering ... j. Since there are discrepancy in no. of messages maintained by the internal index, there will be log truncation in B1's data log: m0m0 m1m1 m2m2 m3m3 m4m4 m15 m15 m16 m16 m17 m17 m18 m18 m19 m19 $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka_server_1_logs/t001-0/.log Dumping /tmp/kafka_server_1_logs/t001-0/.log Starting offset: 0 offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 2307843899 payload: Topic:t001:ThreadID:0:MessageID:00 offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 3770868426 payload: Topic:t001:ThreadID:0:MessageID:01 offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 1528229081 payload: Topic:t001:ThreadID:0:MessageID:02 offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 844818728 payload: Topic:t001:ThreadID:0:MessageID:03 offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 4157601470 payload: Topic:t001:ThreadID:0:MessageID:04 offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 1414561216 payload: Topic:t001:ThreadID:0:MessageID:15 offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 4018435027 payload: Topic:t001:ThreadID:0:MessageID:16 offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 2261284386 payload: Topic:t001:ThreadID:0:MessageID:17 offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 3219081918 payload: Topic:t001:ThreadID:0:MessageID:18 offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 3599978319 payload: Topic:t001:ThreadID:0:MessageID:19 $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /tmp/kafka_server_2_logs/t001-0/.log Dumping /tmp/kafka_server_2_logs/t001-0/.log Starting offset: 0 offset: 0 position: 0 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 2307843899 payload: Topic:t001:ThreadID:0:MessageID:00 offset: 1 position: 126 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 3770868426 payload: Topic:t001:ThreadID:0:MessageID:01 offset: 2 position: 252 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 1528229081 payload: Topic:t001:ThreadID:0:MessageID:02 offset: 3 position: 378 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 844818728 payload: Topic:t001:ThreadID:0:MessageID:03 offset: 4 position: 504 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 4157601470 payload: Topic:t001:ThreadID:0:MessageID:04 offset: 5 position: 630 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 1414561216 payload: Topic:t001:ThreadID:0:MessageID:15 offset: 6 position: 756 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 4018435027 payload: Topic:t001:ThreadID:0:MessageID:16 offset: 7 position: 882 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 2261284386 payload: Topic:t001:ThreadID:0:MessageID:17 offset: 8 position: 1008 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 3219081918 payload: Topic:t001:ThreadID:0:MessageID:18 offset: 9 position: 1134 isvalid: true payloadsize: 100 magic: 0 compresscodec: NoCompressionCodec crc: 3599978319 payload: Topic:t001:ThreadID:0:MessageID:19
[jira] [Commented] (KAFKA-689) Can't append to a topic/partition that does not already exist
[ https://issues.apache.org/jira/browse/KAFKA-689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548748#comment-13548748 ] Jun Rao commented on KAFKA-689: --- Currently, auto-creation is a broker-side flag. Basically, the broker controls whether a topic can be created automatically or not. This is likely useful for admin. The getMetadata API implicitly implies auto-creation, subject to the server side config. This is probably a bit hacky. It does save one extra RPC. We can think a bit more if adding a separate create topic API is a better strategy. Can't append to a topic/partition that does not already exist - Key: KAFKA-689 URL: https://issues.apache.org/jira/browse/KAFKA-689 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8 Reporter: David Arthur Attachments: kafka.log, produce-payload.bin With a totally fresh Kafka (empty logs dir and empty ZK), if I send a ProduceRequest for a new topic, Kafka responds with kafka.common.UnknownTopicOrPartitionException: Topic test partition 0 doesn't exist on 0. This is when sending a ProduceRequest over the network (from Python, in this case). If I use the console producer it works fine (topic and partition get created). If I then send the same payload from before over the network, it works. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-643) Refactor api definition layer
[ https://issues.apache.org/jira/browse/KAFKA-643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548797#comment-13548797 ] David Arthur commented on KAFKA-643: +1 for splitting generic/specific parts of the API (this is basically what I do in my Python client). +1 for specifying the protocol in a ~BNF form. This would require protocols to be specified as LL grammars (which they all are), which is required for efficient ByteBuffer packing/unpacking anyways. However, how would this scheme handle recursive definitions (like MessageSet)? I've always felt the depth of this should be limited to one, meaning a single Message can contain a compressed MessageSet which can only be composed of regular (uncompressed) Messages. In https://github.com/mumrah/kafka-python/blob/master/kafka/client.py#L355, I have to endlessly recurse to ensure I've fully consumed the messages - kind of a pain. If the depth was limited, I could decode it non-recursively. +0 for not using Avro et al. I understand the performance implications of using one of these frameworks, but it sure does make client development easier. However, as long as the protocol spec is clear (and correct) implementing a client is not so bad. What about the Java API? As far as I can tell, the purpose of these classes is to delegate to the real APIs and handle Java - Scala data type conversion. It seems like this should be able to be automatic/automagic. Although, I guess for the implicits stuff to work the Java classes must be present. I know it's very new (Scala 10) and experimental, but macros might help in simplifying the APIs: http://docs.scala-lang.org/overviews/macros/overview.html. Refactor api definition layer - Key: KAFKA-643 URL: https://issues.apache.org/jira/browse/KAFKA-643 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Jay Kreps Assignee: Jay Kreps The way we are defining our protocol is really a bit embarrassing. It is full of ad hoc serialization code for each API. This code is very fiddly and opaque and when it has errors they are hard to debug. Since it is all done one-off it is also very easy for it to become inconsistent. This was tolerable when there were only two apis with a few fields each, but now there are a half dozen more complex apis. By my count there is now over 1000 lines of code in kafka.apis.*. One option would be to use protocol buffers or thrift or another schema-oriented code gen RPC language. However I think this is probably the wrong direction for a couple reasons. One is that we want something that works well with our I/O model, both network and disk, which is very NIO-centric. So it should work directly with ByteBuffers. Second I feel that these systems complicate the specification of the protocol. They give a schema, which is a great high-level description, but the translation of that to bytes is essentially whatever their code-gen engine chooses to do. These things are a great way to build application services, but not great for something like what we are building. Instead I think we should do what we have done, specify the protocol as a wiki. However we should write a little helper code to make our lives easier. Here is my recommendation for how this code would work. We add two helper classes: Schema and Record. You define messages formats like this: import Types._ val FetchRequestProtocol = Schema(ReplicaId-int32, MaxWaitTime-int32, MinBytes-int32, Seq(TopicName-utf8, Seq(Partition-int32, FetchOffset-int64, MaxBytes-int32))) Note that this almost exactly matches the BNF for the fetch request: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol Once defined this schema can be used to parse messages: val record: Record = FetchRequestProtocol.readFrom(buffer) A record is just a wrapper around an array. The readFrom method parses out the fields specified in the schema and populates the array. Fields in the record can be accessed by name, e.g. record(ReplicaId) For common access this is probably good enough. However since the position is fixed, it is also possible to get the element by a Field object, which gets rid of the hashmap lookup and goes directly to the right slot. E.g. val ReplicaIdField = FetchRequestProtocol(ReplicaId) // do this as a global variable ... record(ReplicaIdField) This will be for cases where we are a bit performance conscious and don't want to do umpteen hashmap lookups to resolve string field names. Likewise the other direction, to write out a record: record.writeTo(buffer) and to get
[jira] [Updated] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention
[ https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Fung updated KAFKA-688: Attachment: kafka-688-v1.patch System Test - Update all testcase__properties.json for properties keys uniform naming convention Key: KAFKA-688 URL: https://issues.apache.org/jira/browse/KAFKA-688 Project: Kafka Issue Type: Task Reporter: John Fung Assignee: John Fung Labels: replication-testing Attachments: kafka-688-v1.patch After the changes made in uniform naming convention of properties keys (KAFKA-648), all testcase__properties.json files need to be updated on the following properties keys changes: brokerid = broker.id log.file.size = log.segment.size groupid = group.id -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention
[ https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548886#comment-13548886 ] John Fung commented on KAFKA-688: - Uploaded kafka-688-v1.patch which is required after checking in the patch in KAFKA-648. The changes include: 1. Updated all testcase__properties.json on the following properties keys: brokerid = broker.id log.file.size = log.segment.size groupid = group.id 2. Added 07_client property to migration_tool_testsuite/testcase_/testcase__properties.json files under those brokers running in 0.7 code 3. Modified system_test/utils/kafka_system_test_utils.py to add brokerid key to server.properties file if the broker is running under 0.7 System Test - Update all testcase__properties.json for properties keys uniform naming convention Key: KAFKA-688 URL: https://issues.apache.org/jira/browse/KAFKA-688 Project: Kafka Issue Type: Task Reporter: John Fung Assignee: John Fung Labels: replication-testing Attachments: kafka-688-v1.patch After the changes made in uniform naming convention of properties keys (KAFKA-648), all testcase__properties.json files need to be updated on the following properties keys changes: brokerid = broker.id log.file.size = log.segment.size groupid = group.id -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-688) System Test - Update all testcase_xxxx_properties.json for properties keys uniform naming convention
[ https://issues.apache.org/jira/browse/KAFKA-688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Fung updated KAFKA-688: Status: Patch Available (was: Open) System Test - Update all testcase__properties.json for properties keys uniform naming convention Key: KAFKA-688 URL: https://issues.apache.org/jira/browse/KAFKA-688 Project: Kafka Issue Type: Task Reporter: John Fung Assignee: John Fung Labels: replication-testing Attachments: kafka-688-v1.patch After the changes made in uniform naming convention of properties keys (KAFKA-648), all testcase__properties.json files need to be updated on the following properties keys changes: brokerid = broker.id log.file.size = log.segment.size groupid = group.id -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-648) Use uniform convention for naming properties keys
[ https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriram Subramanian updated KAFKA-648: - Attachment: configchanges-v4.patch Reverted the change from migrationtool/consumer.properties Use uniform convention for naming properties keys -- Key: KAFKA-648 URL: https://issues.apache.org/jira/browse/KAFKA-648 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Sriram Subramanian Priority: Blocker Fix For: 0.8, 0.8.1 Attachments: configchanges-1.patch, configchanges-v2.patch, configchanges-v3.patch, configchanges-v4.patch Currently, the convention that we seem to use to get a property value in *Config is as follows: val configVal = property.getType(config.val, ...) // dot is used to separate two words in the key and the first letter of second word is capitalized in configVal. We should use similar convention for groupId, consumerId, clientId, correlationId. This change will probably be backward non-compatible. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys
[ https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13548989#comment-13548989 ] John Fung commented on KAFKA-648: - Tested v4 patch with KAFKA-688-v1.patch with the latest patch and it works fine. Use uniform convention for naming properties keys -- Key: KAFKA-648 URL: https://issues.apache.org/jira/browse/KAFKA-648 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Sriram Subramanian Priority: Blocker Fix For: 0.8, 0.8.1 Attachments: configchanges-1.patch, configchanges-v2.patch, configchanges-v3.patch, configchanges-v4.patch Currently, the convention that we seem to use to get a property value in *Config is as follows: val configVal = property.getType(config.val, ...) // dot is used to separate two words in the key and the first letter of second word is capitalized in configVal. We should use similar convention for groupId, consumerId, clientId, correlationId. This change will probably be backward non-compatible. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-648) Use uniform convention for naming properties keys
[ https://issues.apache.org/jira/browse/KAFKA-648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549075#comment-13549075 ] Jun Rao commented on KAFKA-648: --- Thanks for patch v4. Made another pass. Should we make the following changes? 40. KafkaConfig: max.message.size = max.message.bytes socket.send.buffer = socket.send.buffer.bytes socket.receive.buffer = socket.receive.buffer.bytes log.segment.size = log.segment.bytes log.retention.size = log.retention.bytes log.flush.interval = log.flush.interval.messages log.default.flush.interval.ms = log.flush.interval.ms log.flush.intervals.ms.per.topic = log.flush.interval.ms.per.topic replica.socket.buffersize = replica.socket.receive.buffer.bytes replica.fetch.size = replica.max.fetch.bytes fetch.request.purgatory.purge.interval = fetch.purgatory.purge.interval.requests producer.request.purgatory.purge.interval = producer.purgatory.purge.interval.requests replica.max.lag.time.ms = max.replica.lag.time.ms replica.max.lag.bytes = max.replica.lag.bytes replica.fetch.max.wait.ms = max.replica.fetch.wait.ms replica.fetch.min.bytes = min.replica.fetch.bytes 41. ProducerConfig: producer.retry.count. In other configs we have num.network.threads. To be consistent, shouldn't we use num.producer.retries? 42. AsyncProducerConfig: queue.time = queue.time.ms buffer.size = socket.send.buffer.bytes 43. ConsumerConfig: socket.buffer.size = socket.receive.buffer.bytes fetch.size = max.fetch.bytes Use uniform convention for naming properties keys -- Key: KAFKA-648 URL: https://issues.apache.org/jira/browse/KAFKA-648 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Sriram Subramanian Priority: Blocker Fix For: 0.8, 0.8.1 Attachments: configchanges-1.patch, configchanges-v2.patch, configchanges-v3.patch, configchanges-v4.patch Currently, the convention that we seem to use to get a property value in *Config is as follows: val configVal = property.getType(config.val, ...) // dot is used to separate two words in the key and the first letter of second word is capitalized in configVal. We should use similar convention for groupId, consumerId, clientId, correlationId. This change will probably be backward non-compatible. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549105#comment-13549105 ] Maxime Brugidou commented on KAFKA-691: --- I agree with Jun solution, this would solve 3 (1 and 2 can be done manualy already -- just send a ReassignPartition command when you add a broker) I could probably implement this very quickly, I'm just not sure of how you get the availability of a partition, but i'll try to figure it out and submit a first patch tomorrow. Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: kafka mavenification
ok, in progress doing that now. On Mon, Jan 7, 2013 at 12:24 PM, Jun Rao jun...@gmail.com wrote: Joe, 0.8 will be released from the 0.8 branch and trunk is for post 0.8. So, you will need to commit the maven changes to 0.8 and then merge them to trunk. Thanks, Jun On Sun, Jan 6, 2013 at 10:53 AM, Joe Stein crypt...@gmail.com wrote: Ok, I will commit this patch then just to trunk and we can have the 0.8 beta mavenized. That works! /* Joe Stein, Chief Architect http://www.medialets.com Twitter: @allthingshadoop Mobile: 917-597-9771 */ On Jan 6, 2013, at 1:31 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Where do we feel we are with 0.8 ? We can release a public beta by the end of the month, but we are still few weeks away from announcing a release. Thanks, Neha On Sat, Jan 5, 2013 at 10:50 PM, Joe Stein crypt...@gmail.com wrote: Where do we feel we are with 0.8 ? -- /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop */
[jira] [Commented] (KAFKA-133) Publish kafka jar to a public maven repository
[ https://issues.apache.org/jira/browse/KAFKA-133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549250#comment-13549250 ] Joe Stein commented on KAFKA-133: - committed to 0.8 branch also Publish kafka jar to a public maven repository -- Key: KAFKA-133 URL: https://issues.apache.org/jira/browse/KAFKA-133 Project: Kafka Issue Type: Improvement Affects Versions: 0.6, 0.8 Reporter: Neha Narkhede Labels: patch Fix For: 0.8 Attachments: KAFKA-133.patch, pom.xml The released kafka jar must be download manually and then deploy to a private repository before they can be used by a developer using maven2. Similar to other Apache projects, it will be nice to have a way to publish Kafka releases to a public maven repo. In the past, we gave it a try using sbt publish to Sonatype Nexus maven repo, but ran into some authentication problems. It will be good to revisit this and get it resolved. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549276#comment-13549276 ] Jay Kreps commented on KAFKA-691: - That would be awesome. If you don't mind just give the proposed set of changes on the JIRA first and lets get everyone on board with how it should work since it is a reasonably important change (or, if you don't mind revising your patch we can start with that). Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: About Kafka 0.8 producer
Hi, Then is to say: for kafka 0.8, when you add a new broker into the cluster, you need not to change the producer broker setting, is it right? Bruce
[jira] [Commented] (KAFKA-691) Fault tolerance broken with replication factor 1
[ https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13549373#comment-13549373 ] Jun Rao commented on KAFKA-691: --- DefaultEventHander.getPartitionListForTopic() returns Seq[PartitionAndLeader]. If PartitionAndLeader.leaderBrokerIdOpt is none, the partition is not available. There is another tricky issue. If a partition is not available, when do we refresh the metadata to check if the partition becomes available again? Currently, we refresh the metadata if we fail to send the data. However, if we always route the messages to available partitions, we may never fail to send. One possible solution is that if there is at least one partition not available in Seq[PartitionAndLeader], we refresh the metadata if a configurable amount of time has passed (e.g., 10 mins). Fault tolerance broken with replication factor 1 Key: KAFKA-691 URL: https://issues.apache.org/jira/browse/KAFKA-691 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jay Kreps In 0.7 if a partition was down we would just send the message elsewhere. This meant that the partitioning was really more of a stickiness then a hard guarantee. This made it impossible to depend on it for partitioned, stateful processing. In 0.8 when running with replication this should not be a problem generally as the partitions are now highly available and fail over to other replicas. However in the case of replication factor = 1 no longer really works for most cases as now a dead broker will give errors for that broker. I am not sure of the best fix. Intuitively I think this is something that should be handled by the Partitioner interface. However currently the partitioner has no knowledge of which nodes are available. So you could use a random partitioner, but that would keep going back to the down node. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira