Re: Review Request 34450: Fix KAFKA-2017
On May 21, 2015, 12:16 a.m., Jun Rao wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 102-106 https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line102 Another way to do this is to only load from ZK on the becoming leader event for an offsetTopic partition. Then, we don't have to read from ZK during join group, which will introduce unnecessary overhead when joining a new group. Guozhang Wang wrote: I thought about this while working on the patch. The reason I feel it may not worth doing the loading thing upon become-leader is that: 1. When we are loading from ZK, we probably need to still reject any join-group request which is not loaded yet, like what we did in offset manager; this will introduce two more round trips (one for rediscover coordinator and one for another join-group, unless we introduce a separate loading in progress error code, then we can reduce it to one) compared with loading from ZK on the fly, which is just one ZK read. 2. It is likely that we only need to load from ZK once for each group, upon the first join-group request received (when two join requests are received at the same time we may need to unnecessarily read twice). And hence the latency overhead is not much compared with loading-all-at-once. The only concern is that it will slow down all handler threads a little bit when coordinator migration happens instead of taking one thread for reading all the ZK paths, which I feel is OK. Yes, there are trade-offs btw preloading from ZK on coordinator change and lazy loading from ZK on HeartBeat. For lazy reading from ZK, we need to worry about the following. a. The group may be cached in ConsumerCoordinator, but it's outdated. This can happen when the coordinator is moved to another node and then is moved back (w/o node being started). So, we need a way to detect if the cache is outdated. b. We need to worry about concurrent updates to the group and the ordering since the group cached in ConsumerCoordinator can now be updated directly and also by loading from ZK. For example, support one api thread is handling a HB request and it doesn't see the group in the cache and is about to load the cache from ZK. At the same time, a separate api thread is handling a new consumer join group request and it loads the group from ZK and updates the group with the new consumer. Now, the thread handling the HB can override the group after completing the loading from ZK. For preloading from ZK, 1. It takes a bit of time to complete the loading from ZK. During this time, if we get a HB request, we can just return a CONSUMER_COORDINATOR_NOT_AVAILABLE error, which forces the client to redisover the coordinator. However, this is probably unlikely to happen. The consumer by default only sends a HB every 10 secs. We can probably complete the loading from ZK in a couple of secs even with 1000 groups. On May 21, 2015, 12:16 a.m., Jun Rao wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 369-381 https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line369 I was thinking whether it's worth including the leader epoch (of the corresponding offset topic partition) in the ZK value as we did for leaderAndIsr to prevent a zombie consumer coordinator from overwriting the value, during a soft failure. I am not sure if it's worth doing this immediately because 1. When this happens, consumers can still recover after the heartbeat fails. 2. It seems that doing this right is a bit more complicated. We need to keep the leader epoch in the ZK value. However, during a leader change, we probably need to update the values in ZK with the new leader epoch as well, in order to truely prevent the zombie coordinator from overwriting the value. So, I think for now, we can just use the simple approach in this patch. Guozhang Wang wrote: I think this is handled by the generation id, which is ever increasing, and coordinator writing to ZK must have its generation id = ZK value + 1. One caveat though, is that when a group is empty we will remove it from ZK and when it appears again we will take it as a new group with generation id resetting to 1. Then a zombie coordinator happen to hold the right generation id after resetting maybe able to override. For this case we can create another JIRA. The checking in the current patch may not be enough. 1. The checking and the update are not atomic. So, the generation id in ZK could change after it's read from ZK and the check is done. 2. The new coordinator may not do a rebalance for a long time. Therefore, the generation id in ZK is still the old one and the Zombie consumer can still override the value in ZK. - Jun --- This is an automatically
[jira] [Created] (KAFKA-2220) Improvement: Could we support rewind by time ?
Li Junjun created KAFKA-2220: Summary: Improvement: Could we support rewind by time ? Key: KAFKA-2220 URL: https://issues.apache.org/jira/browse/KAFKA-2220 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Li Junjun Improvement: Support rewind by time ! My scenarios as follow: A program read record from kafka and process then write to a dir in HDFS like /hive/year=/month=xx/day=xx/hour=10 . If the program goes down . I can restart it , so it read from last offset . But what if the program was config with wrong params , so I need remove dir hour=10 and reconfig my program and I need to find the offset where hour=10 start , but now I can't do this. And there are many scenarios like this. so , can we add a time partition , so we can rewind by time ? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2220) Improvement: Could we support rewind by time ?
[ https://issues.apache.org/jira/browse/KAFKA-2220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Li Junjun updated KAFKA-2220: - Attachment: screenshot.png Improvement: Could we support rewind by time ? Key: KAFKA-2220 URL: https://issues.apache.org/jira/browse/KAFKA-2220 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1.1 Reporter: Li Junjun Attachments: screenshot.png Improvement: Support rewind by time ! My scenarios as follow: A program read record from kafka and process then write to a dir in HDFS like /hive/year=/month=xx/day=xx/hour=10 . If the program goes down . I can restart it , so it read from last offset . But what if the program was config with wrong params , so I need remove dir hour=10 and reconfig my program and I need to find the offset where hour=10 start , but now I can't do this. And there are many scenarios like this. so , can we add a time partition , so we can rewind by time ? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2222) Write Input/output error did not result in broker shutdown
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560387#comment-14560387 ] Jason Rosenberg edited comment on KAFKA- at 5/27/15 5:05 AM: - Interestingly, after restarting the broker, these errors went away (but we did ultimately declare the disk bad, and had it replaced). But not sure why restarting resulted in the errors subsiding was (Author: jbrosenb...@gmail.com): Interestingly, after restarting the broker, these errors went away (but we did ultimately declare the disk bad, and had it replaced). But not sure restarting resulted in the errors subsiding Write Input/output error did not result in broker shutdown Key: KAFKA- URL: https://issues.apache.org/jira/browse/KAFKA- Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.2 Reporter: Jason Rosenberg We had a disk start failing intermittently, and began seeing errors like this in the broker. Usually, IOExceptions during a file write result in the broker shutting down immediately. This is with version 0.8.2.1. {code} 2015-05-21 23:59:57,841 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.3.4 because of error java.io.IOException: Input/output error at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:443) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:575) at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745) {code} This resulted in intermittent producer failures failing to send messages successfully, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2221) Log the entire cause which caused a reconnect in the SimpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560354#comment-14560354 ] jaikiran pai commented on KAFKA-2221: - Created reviewboard https://reviews.apache.org/r/34697/diff/ against branch origin/trunk Log the entire cause which caused a reconnect in the SimpleConsumer --- Key: KAFKA-2221 URL: https://issues.apache.org/jira/browse/KAFKA-2221 Project: Kafka Issue Type: Improvement Reporter: jaikiran pai Assignee: jaikiran pai Priority: Minor Attachments: KAFKA-2221.patch Currently if the SimpleConsumer goes for a reconnect, it logs the exception's message which caused the reconnect. However, in some occasions the message in the exception can be null, thus making it difficult to narrow down the cause for the reconnect. An example of this can be seen in this user mailing list thread http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3CCABME_6T%2Bt90%2B-eQUtnu6R99NqRdMpVj3tqa95Pygg8KOQSNppw%40mail.gmail.com%3E {quote} kafka.consumer.SimpleConsumer: Reconnect due to socket error: null. {quote} It would help narrowing down the problem if the entire exception stacktrace was logged. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2221) Log the entire cause which caused a reconnect in the SimpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jaikiran pai updated KAFKA-2221: Status: Patch Available (was: Open) Log the entire cause which caused a reconnect in the SimpleConsumer --- Key: KAFKA-2221 URL: https://issues.apache.org/jira/browse/KAFKA-2221 Project: Kafka Issue Type: Improvement Reporter: jaikiran pai Assignee: jaikiran pai Priority: Minor Attachments: KAFKA-2221.patch Currently if the SimpleConsumer goes for a reconnect, it logs the exception's message which caused the reconnect. However, in some occasions the message in the exception can be null, thus making it difficult to narrow down the cause for the reconnect. An example of this can be seen in this user mailing list thread http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3CCABME_6T%2Bt90%2B-eQUtnu6R99NqRdMpVj3tqa95Pygg8KOQSNppw%40mail.gmail.com%3E {quote} kafka.consumer.SimpleConsumer: Reconnect due to socket error: null. {quote} It would help narrowing down the problem if the entire exception stacktrace was logged. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2221) Log the entire cause which caused a reconnect in the SimpleConsumer
[ https://issues.apache.org/jira/browse/KAFKA-2221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jaikiran pai updated KAFKA-2221: Attachment: KAFKA-2221.patch Log the entire cause which caused a reconnect in the SimpleConsumer --- Key: KAFKA-2221 URL: https://issues.apache.org/jira/browse/KAFKA-2221 Project: Kafka Issue Type: Improvement Reporter: jaikiran pai Assignee: jaikiran pai Priority: Minor Attachments: KAFKA-2221.patch Currently if the SimpleConsumer goes for a reconnect, it logs the exception's message which caused the reconnect. However, in some occasions the message in the exception can be null, thus making it difficult to narrow down the cause for the reconnect. An example of this can be seen in this user mailing list thread http://mail-archives.apache.org/mod_mbox/kafka-users/201505.mbox/%3CCABME_6T%2Bt90%2B-eQUtnu6R99NqRdMpVj3tqa95Pygg8KOQSNppw%40mail.gmail.com%3E {quote} kafka.consumer.SimpleConsumer: Reconnect due to socket error: null. {quote} It would help narrowing down the problem if the entire exception stacktrace was logged. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 34697: Patch for KAFKA-2221
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34697/ --- Review request for kafka. Bugs: KAFKA-2221 https://issues.apache.org/jira/browse/KAFKA-2221 Repository: kafka Description --- Log the real exception which triggered a reconnect Diffs - core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b Diff: https://reviews.apache.org/r/34697/diff/ Testing --- Thanks, Jaikiran Pai
[jira] [Resolved] (KAFKA-2201) Open file handle leak
[ https://issues.apache.org/jira/browse/KAFKA-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy resolved KAFKA-2201. Resolution: Not A Problem Open file handle leak - Key: KAFKA-2201 URL: https://issues.apache.org/jira/browse/KAFKA-2201 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: Debian Linux 7, 64 bit Oracle JDK 1.7.0u40, 64-bit Reporter: Albert Visagie The kafka broker crashes with the following stack trace from the server.log roughly every 18 hours: [2015-05-19 07:39:22,924] FATAL [KafkaApi-0] Halting due to unrecoverable I/O error while handling produce request: (kafka.server.KafkaApis) kafka.common.KafkaStorageException: I/O exception in append to log 'nnn-1' at kafka.log.Log.append(Log.scala:266) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inReadLock(Utils.scala:541) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282) at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264) at kafka.log.Log.roll(Log.scala:563) at kafka.log.Log.maybeRoll(Log.scala:539) at kafka.log.Log.append(Log.scala:306) ... 21 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885) ... 33 more The Kafka broker's open filehandles as seen by lsof | grep pid | wc -l grows steadily as it runs. Under our load it lasts about 18 hours before crashing with the stack trace above. We were experimenting with settings under Log Retention Policy in server.properties: log.retention.hours=168 log.retention.bytes=107374182 log.segment.bytes=1073741 log.retention.check.interval.ms=3000 The result is that the broker rolls over segments quite rapidly. We don't have to run it that way of course. We are running only one broker at the moment. lsof shows many open files without size and absent from ls in the log directory with the suffix .deleted This is kafka 0.8.2.1 with scala 2.10.4 as downloaded from the website last week. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2222) Write Input/output error did not result in broker shutdown
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560417#comment-14560417 ] Jason Rosenberg commented on KAFKA-: [~guozhang] I don't think so. The file system was not in read-only mode. This disk is mounted as JBOD, and we use multiple log dirs in the config, etc. Write Input/output error did not result in broker shutdown Key: KAFKA- URL: https://issues.apache.org/jira/browse/KAFKA- Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.2 Reporter: Jason Rosenberg We had a disk start failing intermittently, and began seeing errors like this in the broker. Usually, IOExceptions during a file write result in the broker shutting down immediately. This is with version 0.8.2.1. {code} 2015-05-21 23:59:57,841 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.3.4 because of error java.io.IOException: Input/output error at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:443) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:575) at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745) {code} This resulted in intermittent producer failures failing to send messages successfully, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34641: Patch for KAFKA-2214
On May 26, 2015, 7:08 a.m., Michael Noll wrote: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala, line 81 https://reviews.apache.org/r/34641/diff/1/?file=971195#file971195line81 Should we also consider reassignments that are in-progress as errors? The reasoning is that you'd like to do the following in (say) Ansible: Trigger reassignment of partitions, wait until all are completed, and only then continue with the next action. That being said, there are other ways to achieve this in tools like Ansible. For instance, you can trigger reassignents, then repeatedly call `--verify` in a loop and capture its STDOUT, looking for is still in progress and failed. However, this is arguably more error prone because the log messages can change between Kafka versions (and oftentimes such changes are not prominently advertised, so you only notice this once your deployment script breaks). Yes, we can consider in-progress as errors. Other option could be returning a different error code. Let us wait for others suggestions/concerns. - Manikumar Reddy --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/#review85154 --- On May 24, 2015, 11:58 a.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/ --- (Updated May 24, 2015, 11:58 a.m.) Review request for kafka. Bugs: KAFKA-2214 https://issues.apache.org/jira/browse/KAFKA-2214 Repository: kafka Description --- kafka-reassign-partitions.sh: return non-zero error status on failures Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala acaa6112db979dc775af6385c58d2e52786dfba9 Diff: https://reviews.apache.org/r/34641/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-2222) Write Input/output error did not result in broker shutdown
[ https://issues.apache.org/jira/browse/KAFKA-?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560387#comment-14560387 ] Jason Rosenberg commented on KAFKA-: Interestingly, after restarting the broker, these errors went away (but we did ultimately declare the disk bad, and had it replaced). But not sure restarting resulted in the errors subsiding Write Input/output error did not result in broker shutdown Key: KAFKA- URL: https://issues.apache.org/jira/browse/KAFKA- Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.2 Reporter: Jason Rosenberg We had a disk start failing intermittently, and began seeing errors like this in the broker. Usually, IOExceptions during a file write result in the broker shutting down immediately. This is with version 0.8.2.1. {code} 2015-05-21 23:59:57,841 ERROR [kafka-network-thread-27330-2] network.Processor - Closing socket for /1.2.3.4 because of error java.io.IOException: Input/output error at sun.nio.ch.FileChannelImpl.transferTo0(Native Method) at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:443) at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:575) at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147) at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125) at kafka.network.MultiSend.writeTo(Transmission.scala:101) at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231) at kafka.network.Processor.write(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:342) at java.lang.Thread.run(Thread.java:745) {code} This resulted in intermittent producer failures failing to send messages successfully, etc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34641: Patch for KAFKA-2214
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/#review85154 --- core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala https://reviews.apache.org/r/34641/#comment136659 Should we also consider reassignments that are in-progress as errors? The reasoning is that you'd like to do the following in (say) Ansible: Trigger reassignment of partitions, wait until all are completed, and only then continue with the next action. That being said, there are other ways to achieve this in tools like Ansible. For instance, you can trigger reassignents, then repeatedly call `--verify` in a loop and capture its STDOUT, looking for is still in progress and failed. However, this is arguably more error prone because the log messages can change between Kafka versions (and oftentimes such changes are not prominently advertised, so you only notice this once your deployment script breaks). - Michael Noll On May 24, 2015, 11:58 a.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34641/ --- (Updated May 24, 2015, 11:58 a.m.) Review request for kafka. Bugs: KAFKA-2214 https://issues.apache.org/jira/browse/KAFKA-2214 Repository: kafka Description --- kafka-reassign-partitions.sh: return non-zero error status on failures Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala acaa6112db979dc775af6385c58d2e52786dfba9 Diff: https://reviews.apache.org/r/34641/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-2201) Open file handle leak
[ https://issues.apache.org/jira/browse/KAFKA-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14558885#comment-14558885 ] Albert Visagie commented on KAFKA-2201: --- It is indeed. It would appear that the only way to alleviate this fact of life in the JVM universe is: 1. Do not create files so rapidly relative to the broker's other work. (Create mote garbage on the heap per file. Tweak segment size.) 2. Monitor the number of open files. Tweak file sizes and GC. Open file handle leak - Key: KAFKA-2201 URL: https://issues.apache.org/jira/browse/KAFKA-2201 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: Debian Linux 7, 64 bit Oracle JDK 1.7.0u40, 64-bit Reporter: Albert Visagie The kafka broker crashes with the following stack trace from the server.log roughly every 18 hours: [2015-05-19 07:39:22,924] FATAL [KafkaApi-0] Halting due to unrecoverable I/O error while handling produce request: (kafka.server.KafkaApis) kafka.common.KafkaStorageException: I/O exception in append to log 'nnn-1' at kafka.log.Log.append(Log.scala:266) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inReadLock(Utils.scala:541) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282) at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264) at kafka.log.Log.roll(Log.scala:563) at kafka.log.Log.maybeRoll(Log.scala:539) at kafka.log.Log.append(Log.scala:306) ... 21 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885) ... 33 more The Kafka broker's open filehandles as seen by lsof | grep pid | wc -l grows steadily as it runs. Under our load it lasts about 18 hours before crashing with the stack trace above. We were experimenting with settings under Log Retention Policy in server.properties: log.retention.hours=168 log.retention.bytes=107374182 log.segment.bytes=1073741 log.retention.check.interval.ms=3000 The result is that the broker rolls over segments quite rapidly. We don't have to run it that way of course. We are running only one broker at the moment. lsof shows many open files without size and absent from ls in the log directory with the suffix .deleted This is kafka 0.8.2.1 with scala 2.10.4 as downloaded from the website last week. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-2201) Open file handle leak
[ https://issues.apache.org/jira/browse/KAFKA-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14558885#comment-14558885 ] Albert Visagie edited comment on KAFKA-2201 at 5/26/15 8:53 AM: It is indeed. Therefore not a Kafka bug. It would appear that the only way to alleviate this fact of life in the JVM universe is: 1. Do not create files so rapidly relative to the broker's other work. (Create mote garbage on the heap per file. Tweak segment size.) 2. Monitor the number of open files. Tweak file sizes and GC. was (Author: avisagie): It is indeed. It would appear that the only way to alleviate this fact of life in the JVM universe is: 1. Do not create files so rapidly relative to the broker's other work. (Create mote garbage on the heap per file. Tweak segment size.) 2. Monitor the number of open files. Tweak file sizes and GC. Open file handle leak - Key: KAFKA-2201 URL: https://issues.apache.org/jira/browse/KAFKA-2201 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: Debian Linux 7, 64 bit Oracle JDK 1.7.0u40, 64-bit Reporter: Albert Visagie The kafka broker crashes with the following stack trace from the server.log roughly every 18 hours: [2015-05-19 07:39:22,924] FATAL [KafkaApi-0] Halting due to unrecoverable I/O error while handling produce request: (kafka.server.KafkaApis) kafka.common.KafkaStorageException: I/O exception in append to log 'nnn-1' at kafka.log.Log.append(Log.scala:266) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inReadLock(Utils.scala:541) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282) at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264) at kafka.log.Log.roll(Log.scala:563) at kafka.log.Log.maybeRoll(Log.scala:539) at kafka.log.Log.append(Log.scala:306) ... 21 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885) ... 33 more The Kafka broker's open filehandles as seen by lsof | grep pid | wc -l grows steadily as it runs. Under our load it lasts about 18 hours before crashing with the stack trace above. We were experimenting with settings under Log Retention Policy in server.properties: log.retention.hours=168 log.retention.bytes=107374182 log.segment.bytes=1073741 log.retention.check.interval.ms=3000 The result is
Re: Kafka server validating incoming messages
I think it makes sense to take this reasonable precaution and check that the entire message was parsed successfully at the server side. On Tue, May 26, 2015 at 4:31 AM, Grant Henke ghe...@cloudera.com wrote: Bumping this message again to get some input before opening a Jira. On Thu, May 21, 2015 at 11:31 AM, Grant Henke ghe...@cloudera.com wrote: When working on my own implementation of the wire protocol, via a copy and paste error, I accidentally sent an OffsetCommit message to the ConsumerMetadata api. This was clearly my mistake but what surprised me is that I got a valid ConsumerMetadata response back with no error. Digging a little deeper this is because both messages expect a string in the first position, so when the OffsetCommit (much larger) message is parsed as a ConsumerMetadata message everything parses okay on the server side. The only issue is that there are many bytes left over in the sent message since OffsetCommit has many more fields following the initial string. Knowing that this is client/dev error, I still expected Kafka to fail my message based on expected message size. If Kafka parsing messages would be strict about left over bytes it could catch this. Should this check be implemented in the Kafka server when parsing all messages? If so I can open a Jira. Thank you, Grant -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke -- Grant Henke Solutions Consultant | Cloudera ghe...@cloudera.com | twitter.com/ghenke http://twitter.com/gchenke | linkedin.com/in/granthenke
[jira] [Updated] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aditya A Auradkar updated KAFKA-2084: - Attachment: KAFKA-2084_2015-05-26_11:50:50.patch byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch, KAFKA-2084_2015-05-05_17:52:02.patch, KAFKA-2084_2015-05-11_16:16:01.patch, KAFKA-2084_2015-05-26_11:50:50.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka KIP hangout May 26
Below are my notes. Feel free to add/ modify the content. Kafka KIP discussion (May 26, 2015) KIP-12 (sasl/ssl authentication): status check Not much discussion this time. KIP-21 (configuration management) - Aditya will make changes to KIP-4 based on last time’s discussion. - Andrei is OK with the changes suggested for KIP-4. KIP-19 (Add a request timeout to NetworkClient) - In last KIP hangout it was suggested to just have two timeouts, blocking time during send and how long to wait to really send the message out. However, it is difficult to map all timeouts we currently provide to users to one of the above mentioned timeout categories. - New proposal - let user know what kind of things they have to wait for once they call send. KIP has been updated with the details. - Usecases will be added for different timeout configs to KIP. - Should timeout be specified in api or in the configs? No strong usecase for having it in APIs. For now, buffer full timeout will be in configs. KIP-25 (system test): quick overview - Initial patch with examples on writing system tests using ducktape will be posted soon. - Roadmaps for system tests using ducktape will be added to wiki. - Should we have all new system tests in ducktape? Probably not at this time as ducktape is still under evaluation. However, eventually if we plan to move over to using ducktape for system tests, all old system tests will be ported to use ducktape. On Tue, May 26, 2015 at 8:50 AM, Jun Rao j...@confluent.io wrote: Hi, Everyone, We will have a KIP hangout at 11 PST on May 26. The following is the agenda. If you want to attend and is not on the invite, please let me know. Agenda: KIP-12 (sasl/ssl authentication): status check KIP-21 (configuration management) KIP-19 (Add a request timeout to NetworkClient) KIP-25 (system test): quick overview Thanks, Jun -- Regards, Ashish
[DISCUSSION] Partition Selection and Coordination By Brokers for Producers
Hi Kafka Dev Team, I am sorry I am new to process of discussion and/or KIP. So, I had commented other email voting chain. Please do let me know correct process for collecting and staring discussion with Kafka Dev Group. Here is original message: I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem : We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know your feedback on this subject matter. I am sure lots of you run Kafka in Enterprise Environment where you may have different type of producers for same topic (e.g logging client in JavaScript, PHP, Java and Python etc sending to log topic). I would really appreciate your feedback on this. Thanks, Bhavesh
[jira] [Commented] (KAFKA-2217) Refactor Client Selectable Interface for Better Concurrency Options
[ https://issues.apache.org/jira/browse/KAFKA-2217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559567#comment-14559567 ] Jay Kreps commented on KAFKA-2217: -- This adds a lot of allocation per select call. Is there a reason that we want multi-threaded access to the selector? Refactor Client Selectable Interface for Better Concurrency Options --- Key: KAFKA-2217 URL: https://issues.apache.org/jira/browse/KAFKA-2217 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Attachments: KAFKA-2217.patch, KAFKA-2217_2015-05-25_10:45:30.patch, KAFKA-2217_2015-05-26_09:37:29.patch The current Selectable interface makes thread-safe usage without external locking unlikely. In particular, the interface requires implementations to store internal lists containing the results from an invocation of poll. This makes dealing with issues such as KAFKA-2168 more difficult since it adds state which must be synchronized. Here are the offending methods: {code:java} interface Selectable { void poll(long timeout); ListNetworkSend completedSends(); ListNetworkReceive completedReceives(); ListInteger disconnected(); ListInteger connected(); // rest excluded } {code} The user is required to invoke poll, then extract the results from the corresponding methods. In order to avoid missing events, the caller must hold an external lock while they access the results of the poll. Instead, we can return the results directly from poll call using a container class. For example: {code:java} class PollResult { ListNetworkSend completedSends; ListNetworkReceive completedReceives; ListInteger disconnected; ListInteger connected; } interface Selectable { PollResult poll(long timeout); } {code} This should get us closer to a thread-safe NetworkClient, which would enable a more granular approach to synchronizing the KafkaConsumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
On May 12, 2015, 7:38 p.m., Dong Lin wrote: clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java, line 117 https://reviews.apache.org/r/33049/diff/9/?file=955824#file955824line117 metric.value(timeMs), which translates to Rate.measure(config, timeMs), may return Infinity. This may introduce infinite delay. I think this bug is rooted either in class Rate or class SampledStat. In short, SampledStat.purgeObsoleteSamples will remove all Samples, such that SampledStat.oldest(now) == now. Should I open a ticket and submit a patch for it? Aditya Auradkar wrote: Hey dong, yeah you should submit a patch for it. Dong Lin wrote: Sure! I will do it. Dropping since this is being tracked in a separate ticket. - Aditya --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review83441 --- On May 26, 2015, 6:53 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated May 26, 2015, 6:53 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases. 6. This doesn't include a system test. There is a separate ticket for that Diffs - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 387e387998fc3a6c9cb585dab02b5f77b0381fbf core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala e66710d2368334ece66f70d55f57b3f888262620 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
[jira] [Commented] (KAFKA-1944) Rename LogCleaner and related classes to LogCompactor
[ https://issues.apache.org/jira/browse/KAFKA-1944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559488#comment-14559488 ] Ashish K Singh commented on KAFKA-1944: --- [~omkreddy] thanks for the info. It's on my todo list, will get to it soon. Rename LogCleaner and related classes to LogCompactor - Key: KAFKA-1944 URL: https://issues.apache.org/jira/browse/KAFKA-1944 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Ashish K Singh Labels: newbie Following a mailing list discussion: the name LogCleaner is seriously misleading. Its more of a log compactor. Deleting old logs happens elsewhere from what I've seen. Note that this may require renaming related classes, objects, configs and metrics. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2084) byte rate metrics per client ID (producer and consumer)
[ https://issues.apache.org/jira/browse/KAFKA-2084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559613#comment-14559613 ] Aditya A Auradkar commented on KAFKA-2084: -- Updated reviewboard https://reviews.apache.org/r/33049/diff/ against branch trunk byte rate metrics per client ID (producer and consumer) --- Key: KAFKA-2084 URL: https://issues.apache.org/jira/browse/KAFKA-2084 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar Attachments: KAFKA-2084.patch, KAFKA-2084_2015-04-09_18:10:56.patch, KAFKA-2084_2015-04-10_17:24:34.patch, KAFKA-2084_2015-04-21_12:21:18.patch, KAFKA-2084_2015-04-21_12:28:05.patch, KAFKA-2084_2015-05-05_15:27:35.patch, KAFKA-2084_2015-05-05_17:52:02.patch, KAFKA-2084_2015-05-11_16:16:01.patch, KAFKA-2084_2015-05-26_11:50:50.patch We need to be able to track the bytes-in/bytes-out rate on a per-client ID basis. This is necessary for quotas. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated May 26, 2015, 6:50 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description (updated) --- This is currently not being used anywhere in the code because I haven't yet figured out how to enforce delays i.e. purgatory vs delay queue. I'll have a better idea once I look at the new purgatory implementation. Hopefully, this smaller patch is easier to review. Added more testcases Some locking changes for reading/creating the sensors WIP patch Sample usage in ReplicaManager Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases. I've not yet added integration testcases testing the consumer delays.. will update the patch once those are ready Incorporated Jun's comments Adding javadoc KAFKA-2084 - Moved the callbacks to ClientQuotaMetrics Adding more configs Don't quota replica traffic KAFKA-2084 Diffs (updated) - clients/src/main/java/org/apache/kafka/common/metrics/MetricConfig.java dfa1b0a11042ad9d127226f0e0cec8b1d42b8441 clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 387e387998fc3a6c9cb585dab02b5f77b0381fbf core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala e66710d2368334ece66f70d55f57b3f888262620 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/ClientQuotaMetricsTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 core/src/test/scala/unit/kafka/server/ThrottledRequestExpirationTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/33049/diff/ Testing --- Thanks, Aditya Auradkar
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Hi All, This might be too late about partitioning strategy and use cases to cover. I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem: We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know feedback on this subject. Thanks, Bhavesh On Mon, May 18, 2015 at 7:25 AM, Sriharsha Chintalapani harsh...@fastmail.fm wrote: Gianmarco, I’ll send the patch soon. Thanks, Harsha On May 18, 2015 at 1:34:50 AM, Gianmarco De Francisci Morales ( g...@apache.org) wrote: Hi, If everything is in order, can we proceed to implement it? Cheers, -- Gianmarco On 13 May 2015 at 03:06, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, If you open this link https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposa ls All the KIPs are the child page of this page which you can see from the left bar. Only KIP-22 is missing. It looks you created it as a child page of https://cwiki.apache.org/confluence/display/KAFKA/Index Thanks. Jiangjie (Becket) Qin On 5/12/15, 3:12 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi Jiangjie, Its under https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22++Expose+a+Partit ioner+interface+in+the+new+producer I checked other KIPS they are under /KAFKA as well. Thanks, Harsha On May 12, 2015 at 2:12:30 PM, Jiangjie Qin (j...@linkedin.com.invalid) wrote: Hey Harsha, It looks you created the KIP page at wrong place. . . Can you move the page to a child page of https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Propos a ls Thanks. Jiangjie (Becket) Qin On 5/6/15, 6:12 PM, Harsha ka...@harsha.io wrote: Thanks for the review Joel. I agree don't need a init method we can use configure. I'll update the KIP. -Harsha On Wed, May 6, 2015, at 04:45 PM, Joel Koshy wrote: +1 with a minor comment: do we need an init method given it extends Configurable? Also, can you move this wiki out of drafts and add it to the table in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Prop o sals? Thanks, Joel On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote: Thanks Jay. I removed partitioner.metadata from KIP. I¹ll send an updated patch. -- Harsha Sent with Airmail On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn¹t find any other classes using it. JMX reporter overrides it but doesn¹t implement it. So with configurable partitioner how can a user pass in partitioner configuration since its getting instantiated within the producer. Thanks, Harsha On May 4, 2015 at 10:36:45 AM, Jay Kreps (jay.kr...@gmail.com) wrote: Hey Harsha, That proposal sounds good. One minor thing--I don't think we need to have the partitioner.metadata property. Our reason for using string properties is exactly to make config extensible at runtime. So a given partitioner can add whatever properties make sense using the configure() api it defines. -Jay On Sun, May 3, 2015 at 5:57 PM, Harsha ka...@harsha.io wrote: Thanks Jay Gianmarco for the comments. I picked the option A, if user
[jira] [Commented] (KAFKA-2217) Refactor Client Selectable Interface for Better Concurrency Options
[ https://issues.apache.org/jira/browse/KAFKA-2217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559339#comment-14559339 ] Jason Gustafson commented on KAFKA-2217: Updated reviewboard https://reviews.apache.org/r/34608/diff/ against branch upstream/trunk Refactor Client Selectable Interface for Better Concurrency Options --- Key: KAFKA-2217 URL: https://issues.apache.org/jira/browse/KAFKA-2217 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Attachments: KAFKA-2217.patch, KAFKA-2217_2015-05-25_10:45:30.patch, KAFKA-2217_2015-05-26_09:37:29.patch The current Selectable interface makes thread-safe usage without external locking unlikely. In particular, the interface requires implementations to store internal lists containing the results from an invocation of poll. This makes dealing with issues such as KAFKA-2168 more difficult since it adds state which must be synchronized. Here are the offending methods: {code:java} interface Selectable { void poll(long timeout); ListNetworkSend completedSends(); ListNetworkReceive completedReceives(); ListInteger disconnected(); ListInteger connected(); // rest excluded } {code} The user is required to invoke poll, then extract the results from the corresponding methods. In order to avoid missing events, the caller must hold an external lock while they access the results of the poll. Instead, we can return the results directly from poll call using a container class. For example: {code:java} class PollResult { ListNetworkSend completedSends; ListNetworkReceive completedReceives; ListInteger disconnected; ListInteger connected; } interface Selectable { PollResult poll(long timeout); } {code} This should get us closer to a thread-safe NetworkClient, which would enable a more granular approach to synchronizing the KafkaConsumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2217) Refactor Client Selectable Interface for Better Concurrency Options
[ https://issues.apache.org/jira/browse/KAFKA-2217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-2217: --- Attachment: KAFKA-2217_2015-05-26_09:37:29.patch Refactor Client Selectable Interface for Better Concurrency Options --- Key: KAFKA-2217 URL: https://issues.apache.org/jira/browse/KAFKA-2217 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Attachments: KAFKA-2217.patch, KAFKA-2217_2015-05-25_10:45:30.patch, KAFKA-2217_2015-05-26_09:37:29.patch The current Selectable interface makes thread-safe usage without external locking unlikely. In particular, the interface requires implementations to store internal lists containing the results from an invocation of poll. This makes dealing with issues such as KAFKA-2168 more difficult since it adds state which must be synchronized. Here are the offending methods: {code:java} interface Selectable { void poll(long timeout); ListNetworkSend completedSends(); ListNetworkReceive completedReceives(); ListInteger disconnected(); ListInteger connected(); // rest excluded } {code} The user is required to invoke poll, then extract the results from the corresponding methods. In order to avoid missing events, the caller must hold an external lock while they access the results of the poll. Instead, we can return the results directly from poll call using a container class. For example: {code:java} class PollResult { ListNetworkSend completedSends; ListNetworkReceive completedReceives; ListInteger disconnected; ListInteger connected; } interface Selectable { PollResult poll(long timeout); } {code} This should get us closer to a thread-safe NetworkClient, which would enable a more granular approach to synchronizing the KafkaConsumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review85180 --- Ship it! Thanks for the latest patch. +1 other than the following minor comments. clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/33065/#comment136709 This is actually not an Illegal state exception. core/src/main/scala/kafka/network/RequestChannel.scala https://reviews.apache.org/r/33065/#comment136703 This forces the request string to be generated independant of the logging mode. Perhaps we can define this as a function? core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment136699 No need to assign this to a val. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment136693 It seems that we standardize on the dash notation in the new metrics. So, instead of SocketServer, it's better to use socket-server. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment136692 Perhaps we can just log the exception. - Jun Rao On May 24, 2015, 4:53 p.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 24, 2015, 4:53 p.m.) Review request for kafka. Bugs: 1928 and KAFKA-1928 https://issues.apache.org/jira/browse/1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well removed reify and remaining from send/recieve API, per Jun. moved maybeCloseOldest() to Selector per Jay added idString to node API, changed written to int in Send API cleaning up MultiSend, added size() to Send interface fixed some issues with multisend Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 fixed metric thingies fixed response order bug error handling for illegal selector state and fix metrics bug optimized selection key lookup with identity hash fix accidental change Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 addressing Jun's comments removed connection-aging for clients fix issues with exception handling and other cleanup Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Revert removed connection-aging for clients This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1. improving exception handling and other minor fixes Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 fixes based on Jun and Guozhang comments. Exposed idle metrics as Gauge, changed Send size to long, and fixed an existing issue where Reporters are not actually loaded Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60
Re: Review Request 34608: Patch for KAFKA-2217
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/ --- (Updated May 26, 2015, 7:58 p.m.) Review request for kafka. Bugs: KAFKA-2217 https://issues.apache.org/jira/browse/KAFKA-2217 Repository: kafka Description (updated) --- KAFKA-2217; updated for review comments KAFKA-2217; add shortcut from poll when there's nothing to do KAFKA-2217; update javadoc for new usage KAFKA-2217; updated for review comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/common/network/PollResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c Diff: https://reviews.apache.org/r/34608/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34608: Patch for KAFKA-2217
On May 24, 2015, 7:16 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 226 https://reviews.apache.org/r/34608/diff/1/?file=969830#file969830line226 One drawback to this is that we're now constantly reallocating these instead of just reusing the same ArrayLists repeatedly. I doubt this has a substantial impact, but given that this is core code, it' might be worth running some simple benchmark (e.g. ProducerPerformance) just to sanity check that we don't see a perf drop over a long run where GC issues might become an issue. Jason Gustafson wrote: I'll check it out. We could also reuse the same PollResult object, but that sort of defeats the point of the change. I had hoped that by making the underlying objects safer for threaded usage, some of the complexity in synchronizing the KafkaConsumer would go away, but perhaps it should start one layer up, at NetworkClient? Ewen Cheslack-Postava wrote: Actually I think this is exactly the right approach and it's likely the allocations don't matter much. They should all be very short lived so they should be cheap -- they'll never make it out of the young generation anyway. Still, it's a good idea to verify that assumption holds in practice. Jay brought this point up in the JIRA ticket as well. I've been running the ProducerPerformance benchmark against trunk and haven't noticed a difference in GC behavior. Additionally, I added an early return from the poll method which returns an empty static PollResult when there is nothing to do. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/#review85101 --- On May 26, 2015, 7:58 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/ --- (Updated May 26, 2015, 7:58 p.m.) Review request for kafka. Bugs: KAFKA-2217 https://issues.apache.org/jira/browse/KAFKA-2217 Repository: kafka Description --- KAFKA-2217; updated for review comments KAFKA-2217; add shortcut from poll when there's nothing to do KAFKA-2217; update javadoc for new usage KAFKA-2217; updated for review comments Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/common/network/PollResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c Diff: https://reviews.apache.org/r/34608/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 34608: Patch for KAFKA-2217
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/#review85245 --- I noticed one minor issue with a metrics sensor, but otherwise LGTM as long as it doesn't seem to affect performance. clients/src/main/java/org/apache/kafka/common/network/Selector.java https://reviews.apache.org/r/34608/#comment136767 This shortcut seems like a good idea, but skips the call to this.sensors.ioTime.record(). - Ewen Cheslack-Postava On May 26, 2015, 7:58 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/ --- (Updated May 26, 2015, 7:58 p.m.) Review request for kafka. Bugs: KAFKA-2217 https://issues.apache.org/jira/browse/KAFKA-2217 Repository: kafka Description --- KAFKA-2217; updated for review comments KAFKA-2217; add shortcut from poll when there's nothing to do KAFKA-2217; update javadoc for new usage KAFKA-2217; updated for review comments Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/common/network/PollResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c Diff: https://reviews.apache.org/r/34608/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Commented] (KAFKA-2217) Refactor Client Selectable Interface for Better Concurrency Options
[ https://issues.apache.org/jira/browse/KAFKA-2217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559739#comment-14559739 ] Jason Gustafson commented on KAFKA-2217: Updated reviewboard https://reviews.apache.org/r/34608/diff/ against branch upstream/trunk Refactor Client Selectable Interface for Better Concurrency Options --- Key: KAFKA-2217 URL: https://issues.apache.org/jira/browse/KAFKA-2217 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Attachments: KAFKA-2217.patch, KAFKA-2217_2015-05-25_10:45:30.patch, KAFKA-2217_2015-05-26_09:37:29.patch, KAFKA-2217_2015-05-26_12:57:29.patch The current Selectable interface makes thread-safe usage without external locking unlikely. In particular, the interface requires implementations to store internal lists containing the results from an invocation of poll. This makes dealing with issues such as KAFKA-2168 more difficult since it adds state which must be synchronized. Here are the offending methods: {code:java} interface Selectable { void poll(long timeout); ListNetworkSend completedSends(); ListNetworkReceive completedReceives(); ListInteger disconnected(); ListInteger connected(); // rest excluded } {code} The user is required to invoke poll, then extract the results from the corresponding methods. In order to avoid missing events, the caller must hold an external lock while they access the results of the poll. Instead, we can return the results directly from poll call using a container class. For example: {code:java} class PollResult { ListNetworkSend completedSends; ListNetworkReceive completedReceives; ListInteger disconnected; ListInteger connected; } interface Selectable { PollResult poll(long timeout); } {code} This should get us closer to a thread-safe NetworkClient, which would enable a more granular approach to synchronizing the KafkaConsumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
RE: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2)
Andryii, I made a few edits to this document as discussed in the KIP-21 thread. https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations With these changes. the only difference between TopicMetadataResponse_V1 and V0 is the removal of the ISR field. I've altered the KIP with the assumption that this is a good enough reason by itself to evolve the request/response protocol. Any concerns there? Thanks, Aditya From: Mayuresh Gharat [gharatmayures...@gmail.com] Sent: Thursday, May 21, 2015 8:29 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations (Thread 2) Hi Jun, Thanks a lot. I get it now. Point 4) will actually enable clients to who don't want to create a topic with default partitions, if it does not exist and then can manually create the topic with their own configs(#partitions). Thanks, Mayuresh On Thu, May 21, 2015 at 6:16 PM, Jun Rao j...@confluent.io wrote: Mayuresh, The current plan is the following. 1. Add TMR v1, which still triggers auto topic creation. 2. Change the consumer client to TMR v1. Change the producer client to use TMR v1 and on UnknownTopicException, issue TopicCreateRequest to explicitly create the topic with the default server side partitions and replicas. 3. At some later time after the new clients are released and deployed, disable auto topic creation in TMR v1. This will make sure consumers never create new topics. 4. If needed, we can add a new config in the producer to control whether TopicCreateRequest should be issued or not on UnknownTopicException. If this is disabled and the topic doesn't exist, send will fail and the user is expected to create the topic manually. Thanks, Jun On Thu, May 21, 2015 at 5:27 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Hi, I had a question about TopicMetadata Request. Currently the way it works is : 1) Suppose a topic T1 does not exist. 2) Client wants to produce data to T1 using producer P1. 3) Since T1 does not exist, P1 issues a TopicMetadata request to kafka. This in turn creates the default number of partition. The number of partitions is a cluster wide config. 4) Same goes for a consumer. If the topic does not exist and new topic will be created when the consumer issues TopicMetadata request. Here are 2 use cases where it might not be suited : The auto creation flag for topics is turned ON. a) Some clients might not want to create topic with default number of partitions but with lower number of partitions. Currently in a multi-tenant environment this is not possible without changing the cluster wide default config. b) Some clients might want to just check if the topic exist or not but currently the topic gets created automatically using default number of partitions. Here are some ideas to address this : 1) The way this can be addressed is that TopicMetadata request should have a way to specify whether it should only check if the topic exist or check and create a topic with given number of partitions. If the number of partitions is not specified use the default cluster wide config. OR 2) We should only allow TopicMetadata Request to get the metadata explicitly and not allow it to create a new topic. We should have another Request that takes in config parameters from the user regarding how he/she wants the topic to be created. This request can be used if we get an empty TopicMetadata Response. Thanks, Mayuresh On Thu, May 14, 2015 at 10:22 AM, Jun Rao j...@confluent.io wrote: For ListTopics, we decided not to add a ListTopics request for now and just rely on passing in an empty list to TMR. We can revisit this in the future if it becomes an issue. Thanks, Jun On Wed, May 13, 2015 at 3:31 PM, Joel Koshy jjkosh...@gmail.com wrote: Just had a few minor questions before I join the vote thread. Apologies if these have been discussed: - Do we need DecreasePartitionsNotAllowed? i.e., can we just return InvalidPartitions instead? - AdminClient.listTopics: should we allow listing all partitions? Or do you intend for the client to issue listTopics followed by describeTopics? - On returning futurevoid for partition reassignments: do we need to return any future especially since you have the verifyReassignPartitions method? For e.g., what happens if the controller moves? The get should fail right? The client will then need to connect to the new controller and reissue the request but will then get ReassignPartitionsInProgress. So in that case the client any way needs to rely in verifyReassignPartitions. - In past hangouts I think either you/Joe were mentioning the need to locate the controller (and possibly other cluster
Re: Review Request 34415: Patch for KAFKA-2195
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34415/#review85223 --- Thanks for the patch. Just one more comment blow. clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java https://reviews.apache.org/r/34415/#comment136737 Could we remove this constructor and just keep the one with errors? - Jun Rao On May 24, 2015, 7:49 p.m., Andrii Biletskyi wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34415/ --- (Updated May 24, 2015, 7:49 p.m.) Review request for kafka. Bugs: KAFKA-2195 https://issues.apache.org/jira/browse/KAFKA-2195 Repository: kafka Description --- KAFKA-2195 - Code review Diffs - clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 5e5308ec0e333179a9abbf4f3b750ea25ab57967 clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java 04b90bfe62456a6739fe0299f1564dbd1850fe58 clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 51d081fa296fd7c170a90a634d432067afcfe772 clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 6795682258e6b329cc3caa245b950b4dbcf0cf45 clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 19267ee8aad5a2f5a84cecd6fc563f00329d5035 clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 7e0ce159a2ddd041fc06116038bd5831bbca278b clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 44e2ce61899889601b6aee71fa7f7ddb4a65a255 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 8bf6cbb79a92b0878096e099ec9169d21e6d7023 clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaApis.scala 387e387998fc3a6c9cb585dab02b5f77b0381fbf Diff: https://reviews.apache.org/r/34415/diff/ Testing --- Thanks, Andrii Biletskyi
RE: [VOTE] KIP-21 Dynamic Configuration
Hey everyone, Completed the changes to KIP-4. After today's hangout, there doesn't appear to be anything remaining to discuss on this KIP. Please vote so we can formally close this. Thanks, Aditya From: Aditya Auradkar Sent: Thursday, May 21, 2015 11:26 AM To: dev@kafka.apache.org Subject: RE: [VOTE] KIP-21 Dynamic Configuration I think we should remove the config part in TopicMetadataResponse. It's probably cleaner if Alter and Describe are the only way to view and modify configs but I don't feel very strongly about it. Re-summarizing the proposed changes to KIP-4: - Change AlterTopic to not allow setting configs. Config changes will flow through AlterConfig. CreateTopic will still allow setting configs as it is nice to be able to specify configs while creating the topic. - TopicMetadataResponse shoudn't return config for the topic. DescribeConfig is the way to go. - Change InvalidTopicConfiguration error code to InvalidEntityConfig as proposed in KIP-21. Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, May 21, 2015 10:50 AM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration What about TopicMetadataResponse in KIP-4? Do we remove the config part in it? Thanks, Jun On Thu, May 21, 2015 at 10:25 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hey Jun, I've added a section on error codes on the KIP-21 wiki. Here are the proposed changes to KIP-4. I'll update the wiki shortly. - Change AlterTopic to not allow setting configs. Config changes will flow through AlterConfig. CreateTopic will still allow setting configs as it is nice to be able to specify configs while creating the topic. - Change InvalidTopicConfiguration error code to InvalidEntityConfig as proposed in KIP-21. Thanks, Aditya From: Jun Rao [j...@confluent.io] Sent: Thursday, May 21, 2015 8:41 AM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Aditya, For completeness, could you list the set of error codes in the wiki? Also, could you summarize the changes that are needed for the requests listed in KIP-4 and update the wiki accordingly? Thanks, Jun On Tue, May 19, 2015 at 10:33 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Thanks Andrii. I'll make the changes. I've also updated KIP-21 to include the new config requests. Take a look and vote. https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration Aditya From: Andrii Biletskyi [andrii.bilets...@stealth.ly] Sent: Tuesday, May 19, 2015 2:26 PM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Hi, Sorry I wasn't able to participate. I don't have objections about removing config changes from AlterTopic (as I understand both AddedConfig and DeletedConfig) - you are welcome to update the KIP page. Thanks, Andrii Biletskyi On Tue, May 19, 2015 at 11:40 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Updating the discussion with the latest comments. 1. We discussed adding 2 new API's (AlterConfig and DescribeConfig). I'll update KIP-21 with details on these. 2. Discussed during the KIP hangout. We are in agreement. (1) has a dependency on KIP-4 being completed. Rest of the work in the KIP can be implemented independently. Any concerns if we tackle it as two separate work items implementation wise? We also discussed changing the AlterTopic command in KIP-4 to not include config changes. Instead, all config changes will pass through the newly proposed AlterConfig. If no-one objects, I can make some changes to KIP-4 to reflect this. Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Tuesday, May 19, 2015 10:51 AM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Hey Aditya, Two comments: 1. Yeah we need to reconcile this with the APIs in KIP-4. I think it does make sense to allow setting config during topic creation. I agree with your summary that having alter topic and alter config may be confusing, but there are also some non-config changes such as replication factor and partition count that alter topic can carry out. What is the final state you are proposing? 2. This is implementation related so probably can be removed from the KIP entirely, but you seem to be proposing a separate config manager for each config override type. Should we just generalize TopicConfigManager to be ConfigOverrideManager and have it handle all the override types we will have? I think I may just be unclear on what you are proposing... -Jay On Mon, May 18, 2015 at 1:34 PM, Aditya Auradkar aaurad...@linkedin.com.invalid
Re: Review Request 33620: Patch for KAFKA-1690
- Michael --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review83993 --- On May 21, 2015, 5:37 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated May 21, 2015, 5:37 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. KAFKA-1690. new java producer needs ssl support as a client. Fixed minor issues with the patch. Diffs - build.gradle cd2aa838fd53e8124f308979b1d70efe0c5725a6 checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 8e336a3aa96c73f52beaeb56b931baf4b026cf21 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 187d0004c8c46b6664ddaffecc6166d4b47351e5 clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java c4fa058692f50abb4f47bd344119d805c60123f5 clients/src/main/java/org/apache/kafka/common/config/SecurityConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Channel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java dab1a94dd29563688b6ecf4eeb0e180b06049d3f clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/common/network/EchoServer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/network/SSLFactoryTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java PRE-CREATION Diff: https://reviews.apache.org/r/33620/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-2217) Refactor Client Selectable Interface for Better Concurrency Options
[ https://issues.apache.org/jira/browse/KAFKA-2217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559646#comment-14559646 ] Jason Gustafson commented on KAFKA-2217: KAFKA-2168 was the main reason. Committing offsets or getting metadata from an external thread can be blocked by long polls with the current synchronization policy in KafkaConsumer. So I was looking into finer-grained synchronization, but this is difficult at present due to all of state in the underlying objects. This was a first attempt to try to make some of these objects safer. I can understand the concern about the extra allocations though. I've been doing some performance testing to see if it introduces noticeable GC overhead, and so far, it hasn't. Refactor Client Selectable Interface for Better Concurrency Options --- Key: KAFKA-2217 URL: https://issues.apache.org/jira/browse/KAFKA-2217 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson Assignee: Jason Gustafson Attachments: KAFKA-2217.patch, KAFKA-2217_2015-05-25_10:45:30.patch, KAFKA-2217_2015-05-26_09:37:29.patch The current Selectable interface makes thread-safe usage without external locking unlikely. In particular, the interface requires implementations to store internal lists containing the results from an invocation of poll. This makes dealing with issues such as KAFKA-2168 more difficult since it adds state which must be synchronized. Here are the offending methods: {code:java} interface Selectable { void poll(long timeout); ListNetworkSend completedSends(); ListNetworkReceive completedReceives(); ListInteger disconnected(); ListInteger connected(); // rest excluded } {code} The user is required to invoke poll, then extract the results from the corresponding methods. In order to avoid missing events, the caller must hold an external lock while they access the results of the poll. Instead, we can return the results directly from poll call using a container class. For example: {code:java} class PollResult { ListNetworkSend completedSends; ListNetworkReceive completedReceives; ListInteger disconnected; ListInteger connected; } interface Selectable { PollResult poll(long timeout); } {code} This should get us closer to a thread-safe NetworkClient, which would enable a more granular approach to synchronizing the KafkaConsumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559650#comment-14559650 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- Some reasons you might want to use the consumer from multiple threads: 1. I don't think it's necessarily addressed by this JIRA, but if processing messages is expensive, or the processing code is easier to write as synchronous calls even if it requires accessing some network resource, you might want multiple threads to be able to call poll(). This should already behave correctly. 2. Manage offset commits in a separate thread from polling. If you need to coordinate some other action with offset commit, your choices are currently to be careful in computing timeouts for poll() in order to get processing in the same thread or to try committing from another thread. The code for doing this is much simpler to write if you can just fire up a thread that does sync commit + whatever other operation you need to do, then sleeps for the next commit interval. If you do this right you can continue processing messages during the offset commit, even if it ends up delayed for some reason. 3. close() is probably the most obvious case given the feedback we've had on the producer's close() method blocking indefinitely -- you want to be able to close() from a separate thread if you keep a thread dedicated to poll()ing. For example, using a shutdown hook requires this. The feedback on the producer made it clear this is important and should also have a timeout. 4. Metrics. MetricsReporter is the right way to get metrics, but that only works if what you care about is already covered. I don't think per topic-partition position() and committed() are currently reported -- not sure what the plan is there since reporting metrics in something like mirrormaker might be too much, but some applications will want to be able to track that info in metrics. This is another case where just firing up a thread to periodically check the state of the consumer and report it via whatever metrics package they use is probably the easiest implementation. 5. Any time you may need to make dynamic changes to the consumer in response to external events. For example, consider a mirrormaker-like service. If you want to be able to dynamically reconfigure the consumer to add new topics to the job, subscribe() will block indefinitely if poll() has a long timeout and new data isn't flowing in to the topics you're already subscribed to. A wakeup() method isn't good enough here since you need to manage the subsequent race between the thread trying to subscribe() and the poll()ing thread. At a minimum, the current state where thread safety is guaranteed in the javadoc but we have indefinite blocking is a problem. If we want it to be a single-threaded API, then we should just leave locking up to the user (although we'd probably still at least want some sort of wakeup() method so they could interrupt long poll() calls). New consumer poll() can block other calls like position(), commit(), and close() indefinitely - Key: KAFKA-2168 URL: https://issues.apache.org/jira/browse/KAFKA-2168 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Jason Gustafson The new consumer is currently using very coarse-grained synchronization. For most methods this isn't a problem since they finish quickly once the lock is acquired, but poll() might run for a long time (and commonly will since polling with long timeouts is a normal use case). This means any operations invoked from another thread may block until the poll() call completes. Some example use cases where this can be a problem: * A shutdown hook is registered to trigger shutdown and invokes close(). It gets invoked from another thread and blocks indefinitely. * User wants to manage offset commit themselves in a background thread. If the commit policy is not purely time based, it's not currently possibly to make sure the call to commit() will be processed promptly. Two possible solutions to this: 1. Make sure a lock is not held during the actual select call. Since we have multiple layers (KafkaConsumer - NetworkClient - Selector - nio Selector) this is probably hard to make work cleanly since locking is currently only performed at the KafkaConsumer level and we'd want it unlocked around a single line of code in Selector. 2. Wake up the selector before synchronizing for certain operations. This would require some additional coordination to make sure the caller of wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() thread being woken up
[jira] [Commented] (KAFKA-2154) MetadataResponse is Empty on a Fresh Cluster
[ https://issues.apache.org/jira/browse/KAFKA-2154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559899#comment-14559899 ] Jason Gustafson commented on KAFKA-2154: This seems like a duplicate of KAFKA-1867, which has been resolved and is scheduled for 0.8.3. MetadataResponse is Empty on a Fresh Cluster Key: KAFKA-2154 URL: https://issues.apache.org/jira/browse/KAFKA-2154 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.1.1 Reporter: Keith Bourgoin When I start a fresh cluster using {{bin/kafka-server-start.sh}} and issue a MetadataRequest to it, the results are blank. It's correct that there are no topics, but there are also no brokers returned. I'm writing a driver for Kafka, so this makes the initial connection to the cluster difficult. To reproduce: * Start Zookeeper with {{bin/zookeeper-server-start.sh config/zookeeper.properties}} and a broker with {{bin/kafka-server-start.sh config/server.properties}}. Be sure there's nothing in {{/tmp}} from a previous run. * Run this {{echo -e \x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00 | nc localhost 9092 | hd}} and observe the output: {noformat} 00 00 00 0c 00 00 00 00 00 00 00 00 00 00 00 00 || 0010 {noformat} * Create a topic using {{bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 2 --replication-factor 1}} * Re-run the same command and now observe the output: {noformat} kfb@parsely-dev:~/src/ct/pykafka$ echo -e \x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00 | nc localhost 9092 | hd 00 00 00 61 00 00 00 00 00 00 00 01 00 00 00 00 |...a| 0010 00 0b 70 61 72 73 65 6c 79 2d 64 65 76 00 00 23 |..parsely-dev..#| 0020 84 00 00 00 01 00 00 00 04 74 65 73 74 00 00 00 |.test...| 0030 02 00 00 00 00 00 01 00 00 00 00 00 00 00 01 00 || 0040 00 00 00 00 00 00 01 00 00 00 00 00 00 00 00 00 || 0050 00 00 00 00 00 00 00 00 01 00 00 00 00 00 00 00 || 0060 01 00 00 00 00|.| 0065 {noformat} In this case, parsely-dev is the name of my work VM and the # following it is the port number. I've verified it's a correctly formatted MetadataResponse. It's the first null result that we've having a hard time dealing with. As for the bytestring, that's a correctly formatted MetadataRequest with no topics specified. Presumably if I specified a topic name it would auto-create the topic and then start returning broker information. It doesn't really change the fact that the initial state is fairly broken. Finally, it's worth noting that if I delete the test topic (after turning on {{delete.topic.enable}}) then the responses still include broker information. It's just the initial state which is causing problems. {noformat} kfb@parsely-dev:~/src/kafka$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test Topic test is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true. kfb@parsely-dev:~/src/ct/pykafka$ echo -e \x00\x00\x00\x15\x00\x03\x00\x01\x00\x00\x00\x00\x00\x07pykafka\x00\x00\x00\x00 | nc localhost 9092 | hd 00 00 00 21 00 00 00 00 00 00 00 01 00 00 00 00 |...!| 0010 00 0b 70 61 72 73 65 6c 79 2d 64 65 76 00 00 23 |..parsely-dev..#| 0020 84 00 00 00 00|.| 0025 {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka KIP hangout May 26
Thanks for sending out the KIP notes, Ashish! On Tue, May 26, 2015 at 11:51 AM, Ashish Singh asi...@cloudera.com wrote: Below are my notes. Feel free to add/ modify the content. Kafka KIP discussion (May 26, 2015) KIP-12 (sasl/ssl authentication): status check Not much discussion this time. KIP-21 (configuration management) - Aditya will make changes to KIP-4 based on last time’s discussion. - Andrei is OK with the changes suggested for KIP-4. KIP-19 (Add a request timeout to NetworkClient) - In last KIP hangout it was suggested to just have two timeouts, blocking time during send and how long to wait to really send the message out. However, it is difficult to map all timeouts we currently provide to users to one of the above mentioned timeout categories. - New proposal - let user know what kind of things they have to wait for once they call send. KIP has been updated with the details. - Usecases will be added for different timeout configs to KIP. - Should timeout be specified in api or in the configs? No strong usecase for having it in APIs. For now, buffer full timeout will be in configs. KIP-25 (system test): quick overview - Initial patch with examples on writing system tests using ducktape will be posted soon. - Roadmaps for system tests using ducktape will be added to wiki. - Should we have all new system tests in ducktape? Probably not at this time as ducktape is still under evaluation. However, eventually if we plan to move over to using ducktape for system tests, all old system tests will be ported to use ducktape. On Tue, May 26, 2015 at 8:50 AM, Jun Rao j...@confluent.io wrote: Hi, Everyone, We will have a KIP hangout at 11 PST on May 26. The following is the agenda. If you want to attend and is not on the invite, please let me know. Agenda: KIP-12 (sasl/ssl authentication): status check KIP-21 (configuration management) KIP-19 (Add a request timeout to NetworkClient) KIP-25 (system test): quick overview Thanks, Jun -- Regards, Ashish -- Thanks, Neha
Kafka KIP hangout May 26
Hi, Everyone, We will have a KIP hangout at 11 PST on May 26. The following is the agenda. If you want to attend and is not on the invite, please let me know. Agenda: KIP-12 (sasl/ssl authentication): status check KIP-21 (configuration management) KIP-19 (Add a request timeout to NetworkClient) KIP-25 (system test): quick overview Thanks, Jun
[jira] [Commented] (KAFKA-1419) cross build for scala 2.11
[ https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559172#comment-14559172 ] Adamos Loizou commented on KAFKA-1419: -- Hello, is there any ETA on including the 2.11 fix? Thanks! cross build for scala 2.11 -- Key: KAFKA-1419 URL: https://issues.apache.org/jira/browse/KAFKA-1419 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 0.8.1 Reporter: Scott Clasen Assignee: Ivan Lyutov Priority: Blocker Fix For: 0.8.1.2, 0.8.2.0 Attachments: KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419-scalaBinaryVersion.patch, KAFKA-1419.patch, KAFKA-1419.patch, KAFKA-1419_2014-07-28_15:05:16.patch, KAFKA-1419_2014-07-29_15:13:43.patch, KAFKA-1419_2014-08-04_14:43:26.patch, KAFKA-1419_2014-08-05_12:51:16.patch, KAFKA-1419_2014-08-07_10:17:34.patch, KAFKA-1419_2014-08-07_10:52:18.patch, KAFKA-1419_cross_build_for_scala_2_11_for_0_8_1_branch.patch Please publish builds for scala 2.11, hopefully just needs a small tweak to the gradle conf? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1737) Document required ZkSerializer for ZkClient used with AdminUtils
[ https://issues.apache.org/jira/browse/KAFKA-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14560014#comment-14560014 ] Vivek Madani edited comment on KAFKA-1737 at 5/26/15 10:38 PM: --- Thanks Guozhang. Did you also get a chance to look at KAFKA-1817. Most probably, the reason for the error mentioned in that ticket is because ZkStringSerializer was not set while creating ZkClient. was (Author: vivekpm): Thanks Guozhang Wang. Did you also get a chance to look at KAFKA-1817. Most probably, the reason for the error mentioned in that ticket is because ZkStringSerializer was not set while creating ZkClient. Document required ZkSerializer for ZkClient used with AdminUtils Key: KAFKA-1737 URL: https://issues.apache.org/jira/browse/KAFKA-1737 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Assignee: Vivek Madani Priority: Minor Attachments: KAFKA-1737.patch, KAFKA-1737.patch {{ZkClient}} instances passed to {{AdminUtils}} calls must have {{kafka.utils.ZKStringSerializer}} set as {{ZkSerializer}}. Otherwise commands executed via {{AdminUtils}} may not be seen/recognizable to broker, producer or consumer. E.g. producer (with auto topic creation turned off) will not be able to send messages to a topic created via {{AdminUtils}}, it will throw {{UnknownTopicOrPartitionException}}. Please consider at least documenting this requirement in {{AdminUtils}} scaladoc. For more info see [related discussion on Kafka user mailing list|http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAUywg-oihNiXuQRYeS%3D8Z3ymsmEHo6ghLs%3Dru4nbm%2BdHVz6TA%40mail.gmail.com%3E]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34608: Patch for KAFKA-2217
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/#review85192 --- Thanks for the patch. Looks good. Just a couple of minor comments below. Also, we probably can wait until KAFKA-1928 is checked in first to avoid the rebase overhead. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/34608/#comment136714 Instead of passing in the complete pollResult, would it be better to just pass in the part that's needed in each of the handling method? clients/src/main/java/org/apache/kafka/common/network/PollResult.java https://reviews.apache.org/r/34608/#comment136715 Could we get rid of the get part in the method name? Instead of getCompletedSends(), we can just have completedSends(). - Jun Rao On May 26, 2015, 4:38 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34608/ --- (Updated May 26, 2015, 4:38 p.m.) Review request for kafka. Bugs: KAFKA-2217 https://issues.apache.org/jira/browse/KAFKA-2217 Repository: kafka Description --- KAFKA-2217; updated for review comments KAFKA-2217; add shortcut from poll when there's nothing to do Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/common/network/PollResult.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/test/MockSelector.java ea89b06a4c9e5bb351201299cd3037f5226f0e6c Diff: https://reviews.apache.org/r/34608/diff/ Testing --- Thanks, Jason Gustafson
[jira] [Commented] (KAFKA-2213) Log cleaner should write compacted messages using configured compression type
[ https://issues.apache.org/jira/browse/KAFKA-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14559422#comment-14559422 ] Joel Koshy commented on KAFKA-2213: --- Case A: Yes. i.e., if broker compression.type is 'producer' which is the default. Case B: Yes I think we it would be good to handle this and write the messages out with the configured compression type. I'm not sure about batching - but I added it to the summary since in general one would want to batch a little before compressing. We could drop that though if people think that's weird. If you retain one out of hundred messages then the size reduction from compaction would likely be good enough to counter any increase due to inadequate compression. Log cleaner should write compacted messages using configured compression type - Key: KAFKA-2213 URL: https://issues.apache.org/jira/browse/KAFKA-2213 Project: Kafka Issue Type: Bug Reporter: Joel Koshy In KAFKA-1374 the log cleaner was improved to handle compressed messages. There were a couple of follow-ups from that: * We write compacted messages using the original compression type in the compressed message-set. We should instead append all retained messages with the configured broker compression type of the topic. * While compressing messages we should ideally do some batching before compression. * Investigate the use of the client compressor. (See the discussion in the RBs for KAFKA-1374) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2107) Script to generate waiting-for-review report
[ https://issues.apache.org/jira/browse/KAFKA-2107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2107: --- Status: Patch Available (was: In Progress) Script to generate waiting-for-review report Key: KAFKA-2107 URL: https://issues.apache.org/jira/browse/KAFKA-2107 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Manikumar Reddy Attachments: kafka-waiting-for-review.py As [~nehanarkhede] described on the mailing list: The other suggestion I'd like to make involves writing a simple JIRA script (whaat?). Currently the weekly email we get from JIRA just tells you what JIRAs have patches. But it doesn't tell you which committers/reviewers have signed up to review those patches. The 2nd piece of information can be really helpful If someone had some time, they can look at my patch-review script that uses some of JIRA's python APIs and write a little script that does this. At Confluent, we are happy to host that script on some EC2 machine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Work started] (KAFKA-2107) Script to generate waiting-for-review report
[ https://issues.apache.org/jira/browse/KAFKA-2107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on KAFKA-2107 started by Manikumar Reddy. -- Script to generate waiting-for-review report Key: KAFKA-2107 URL: https://issues.apache.org/jira/browse/KAFKA-2107 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Manikumar Reddy Attachments: kafka-waiting-for-review.py As [~nehanarkhede] described on the mailing list: The other suggestion I'd like to make involves writing a simple JIRA script (whaat?). Currently the weekly email we get from JIRA just tells you what JIRAs have patches. But it doesn't tell you which committers/reviewers have signed up to review those patches. The 2nd piece of information can be really helpful If someone had some time, they can look at my patch-review script that uses some of JIRA's python APIs and write a little script that does this. At Confluent, we are happy to host that script on some EC2 machine. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer
Hi Bhavesh, I realized I didn’t send any vote closed message on this thread. Although the changes you are proposing related to partitioning but the change proposed in this KIP is minor one and will benefit the users . I appreciate if you can open up another thread for the discussion related to your proposal. Closing this discussion / vote with 3 binding +1s and 1 non-binding +1s. Thanks, Harsha On May 26, 2015 at 9:21:46 AM, Bhavesh Mistry (mistry.p.bhav...@gmail.com) wrote: Hi All, This might be too late about partitioning strategy and use cases to cover. I have had experience with both producer and consumer side. I have different use case on this partition selection strategy. Problem: We have heterogeneous environment of producers (by that I mean we have node js, python, New Java Old Scala Based producers to same topic). I have seen that not all producers employ round-robing strategies for non-keyed message like new producer does. Hence, it creates non uniform data ingestion into partition and delay in overall message processing. How to address uniform distribution/message injection rate to all partitions ? Propose Solution: Let broker cluster decide the next partition for topic to send data rather than producer itself with more intelligence. 1) When sending data to brokers (ProduceResponse) Kafka Protocol over the wire send hint to client which partition to send based on following logic (Or can be customizable) a. Based on overall data injection rate for topic and current producer injection rate b. Ability rank partition based on consumer rate (Advance Use Case as there may be many consumers so weighted average etc... ) Untimely, brokers will coordinate among thousand of producers and divert data injection rate (out-of-box feature) and consumption rate (pluggable interface implementation on brokers’ side). The goal here is to attain uniformity and/or lower delivery rate to consumer. This is similar to consumer coordination moving to brokers. The producer side partition selection would also move to brokers. This will benefit both java and non-java clients. Please let me know feedback on this subject. Thanks, Bhavesh On Mon, May 18, 2015 at 7:25 AM, Sriharsha Chintalapani harsh...@fastmail.fm wrote: Gianmarco, I’ll send the patch soon. Thanks, Harsha On May 18, 2015 at 1:34:50 AM, Gianmarco De Francisci Morales ( g...@apache.org) wrote: Hi, If everything is in order, can we proceed to implement it? Cheers, -- Gianmarco On 13 May 2015 at 03:06, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Harsha, If you open this link https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposa ls All the KIPs are the child page of this page which you can see from the left bar. Only KIP-22 is missing. It looks you created it as a child page of https://cwiki.apache.org/confluence/display/KAFKA/Index Thanks. Jiangjie (Becket) Qin On 5/12/15, 3:12 PM, Sriharsha Chintalapani ka...@harsha.io wrote: Hi Jiangjie, Its under https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22++Expose+a+Partit ioner+interface+in+the+new+producer I checked other KIPS they are under /KAFKA as well. Thanks, Harsha On May 12, 2015 at 2:12:30 PM, Jiangjie Qin (j...@linkedin.com.invalid) wrote: Hey Harsha, It looks you created the KIP page at wrong place. . . Can you move the page to a child page of https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Propos a ls Thanks. Jiangjie (Becket) Qin On 5/6/15, 6:12 PM, Harsha ka...@harsha.io wrote: Thanks for the review Joel. I agree don't need a init method we can use configure. I'll update the KIP. -Harsha On Wed, May 6, 2015, at 04:45 PM, Joel Koshy wrote: +1 with a minor comment: do we need an init method given it extends Configurable? Also, can you move this wiki out of drafts and add it to the table in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Prop o sals? Thanks, Joel On Wed, May 06, 2015 at 07:46:46AM -0700, Sriharsha Chintalapani wrote: Thanks Jay. I removed partitioner.metadata from KIP. I¹ll send an updated patch. -- Harsha Sent with Airmail On May 5, 2015 at 6:31:47 AM, Sriharsha Chintalapani (harsh...@fastmail.fm) wrote: Thanks for the comments everyone. Hi Jay, I do have a question regarding configurable interface on how to pass a MapString, ? properties. I couldn¹t find any other classes using it. JMX reporter overrides it but doesn¹t implement it. So with configurable