Re: Review Request 29091: Patch for KAFKA-1646
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/ --- (Updated Dec. 16, 2014, 8:13 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- truncate off trailing zeros on broker restart if broker is gracefully stopped. Diffs - core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b979c3e6f62ea235bd47bc1587a1291443f9 core/src/main/scala/kafka/log/Log.scala 46df8d99d977a3b010a9b9f4698187fa9bfb2498 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/utils/Utils.scala a89b0463685e6224d263bc9177075e1bb6b93d04 Diff: https://reviews.apache.org/r/29091/diff/ Testing --- Thanks, Qianlin Xia
Re: Review Request 29091: Patch for KAFKA-1646
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/ --- (Updated Dec. 16, 2014, 8:18 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- truncate off trailing zeros on broker restart if broker is gracefully stopped. Diffs (updated) - core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b979c3e6f62ea235bd47bc1587a1291443f9 core/src/main/scala/kafka/log/Log.scala 46df8d99d977a3b010a9b9f4698187fa9bfb2498 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/utils/Utils.scala a89b0463685e6224d263bc9177075e1bb6b93d04 Diff: https://reviews.apache.org/r/29091/diff/ Testing --- Thanks, Qianlin Xia
Re: Review Request 29091: Patch for KAFKA-1646
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/ --- (Updated Dec. 16, 2014, 8:22 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- truncate off trailing zeros on broker restart if broker is gracefully stopped. Diffs (updated) - core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b979c3e6f62ea235bd47bc1587a1291443f9 core/src/main/scala/kafka/log/Log.scala 46df8d99d977a3b010a9b9f4698187fa9bfb2498 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/utils/Utils.scala a89b0463685e6224d263bc9177075e1bb6b93d04 Diff: https://reviews.apache.org/r/29091/diff/ Testing --- Thanks, Qianlin Xia
Re: Review Request 29091: Patch for KAFKA-1646
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/ --- (Updated Dec. 16, 2014, 8:30 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- truncate off trailing zeros on broker restart if broker is gracefully stopped. Diffs (updated) - core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b979c3e6f62ea235bd47bc1587a1291443f9 core/src/main/scala/kafka/log/Log.scala 46df8d99d977a3b010a9b9f4698187fa9bfb2498 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/utils/Utils.scala a89b0463685e6224d263bc9177075e1bb6b93d04 Diff: https://reviews.apache.org/r/29091/diff/ Testing --- Thanks, Qianlin Xia
[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Qianlin Xia updated KAFKA-1646: --- Attachment: KAFKA-1646_20141216_163008.patch Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14247936#comment-14247936 ] Qianlin Xia commented on KAFKA-1646: Updated reviewboard https://reviews.apache.org/r/29091/diff/ against branch origin/0.8.1 Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Optimistic locking
I'm trying to design a system that uses Kafka as its primary data store by persisting immutable events into a topic and keeping a secondary index in another data store. The secondary index would store the entities. Each event would pertain to some entity, e.g. a user, and those entities are stored in an easily queriable way. Kafka seems well suited for this, but there's one thing I'm having problems with. I cannot guarantee that only one process writes events about an entity, which makes the design vulnerable to integrity issues. For example, say that a user can have multiple email addresses assigned, and the EmailAddressRemoved event is published when the user removes one. There's an integrity constraint, though: every user MUST have at least one email address. As far as I can see, there's no way to stop two separate processes from looking up a user entity, seeing that there are two email addresses assigned, and each publish an event. The end result would violate the contraint. If I'm wrong in saying that this isn't possible I'd love some feedback! My current thinking is that Kafka could relatively easily support this kind of application with a small additional API. Kafka already has the abstract notion of entities through its key-based retention policy. If the produce API was modified in order to allow an integer OffsetConstraint, the following algorithm could determine whether the request should proceed: 1. For every key seen, keep track of the offset of the latest message referencing the key. 2. When an OffsetContraint is specified in the produce API call, compare that value with the latest offset for the message key. 2.1. If they're identical, allow the operation to continue. 2.2. If they're not identical, fail with some OptimisticLockingFailure. Would such a feature be completely out of scope for Kafka? Best regards, Daniel Schierbeck
[jira] [Commented] (KAFKA-1650) Mirror Maker could lose data on unclean shutdown.
[ https://issues.apache.org/jira/browse/KAFKA-1650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248407#comment-14248407 ] Jiangjie Qin commented on KAFKA-1650: - Updated reviewboard https://reviews.apache.org/r/25995/diff/ against branch origin/trunk Mirror Maker could lose data on unclean shutdown. - Key: KAFKA-1650 URL: https://issues.apache.org/jira/browse/KAFKA-1650 Project: Kafka Issue Type: Improvement Reporter: Jiangjie Qin Assignee: Jiangjie Qin Attachments: KAFKA-1650.patch, KAFKA-1650_2014-10-06_10:17:46.patch, KAFKA-1650_2014-11-12_09:51:30.patch, KAFKA-1650_2014-11-17_18:44:37.patch, KAFKA-1650_2014-11-20_12:00:16.patch, KAFKA-1650_2014-11-24_08:15:17.patch, KAFKA-1650_2014-12-03_15:02:31.patch, KAFKA-1650_2014-12-03_19:02:13.patch, KAFKA-1650_2014-12-04_11:59:07.patch, KAFKA-1650_2014-12-06_18:58:57.patch, KAFKA-1650_2014-12-08_01:36:01.patch, KAFKA-1650_2014-12-16_08:03:45.patch Currently if mirror maker got shutdown uncleanly, the data in the data channel and buffer could potentially be lost. With the new producer's callback, this issue could be solved. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Dec. 16, 2014, 4:03 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description (updated) --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Merged KAFKA-345 into this patch. Incorporated Joel and Jun's comments. Added consumer rebalance listener to mirror maker, will test it later. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Conflicts: core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala added custom config for consumer rebalance listener Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Add configurable consumer rebalance listener Incorporated Guozhang's comments Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Incorporated Guozhang's comments. Addressed Guozhang's comment. numMessageUnacked should be decremented no matter the send was successful or not. Addressed Jun's comments. Incorporated Jun's comments Incorporated Jun's comments and rebased on trunk Rebased on current trunk Diffs (updated) - core/src/main/scala/kafka/consumer/ConsumerConnector.scala 62c0686e816d2888772d5a911becf625eedee397 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala e991d2187d03241f639eeaf6769fb59c8c99664c core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 9baad34a9793e5067d11289ece2154ba87b388af core/src/main/scala/kafka/tools/MirrorMaker.scala 77d951d13b8d8ad00af40257fe51623cc2caa61a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1499: --- Attachment: KAFKA-1499_2014-12-16_22:39:10.patch Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch, KAFKA-1499_2014-10-27_13:13:55.patch, KAFKA-1499_2014-12-16_22:39:10.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 24704: Patch for KAFKA-1499
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24704/ --- (Updated Dec. 16, 2014, 5:10 p.m.) Review request for kafka. Bugs: KAFKA-1499 https://issues.apache.org/jira/browse/KAFKA-1499 Repository: kafka Description (updated) --- Support given for Broker-side compression Diffs (updated) - core/src/main/scala/kafka/log/Log.scala 4fae2f0d339b256832baa62ca4995d10546716b4 core/src/main/scala/kafka/log/LogConfig.scala ca7a99e99f641b2694848b88bf4ae94657071d03 core/src/main/scala/kafka/message/BrokerCompression.scala PRE-CREATION core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 788c7864bc881b935975ab4a4e877b690e65f1f1 core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/test/scala/kafka/log/LogConfigTest.scala 99b0df7b69c5e0a1b6251c54592c6ef63b1800fe core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala PRE-CREATION core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 4e45d965bc423192ac704883ee75e9727006f89b core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 2377abe4933e065d037828a214c3a87e1773a8ef Diff: https://reviews.apache.org/r/24704/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248520#comment-14248520 ] Manikumar Reddy commented on KAFKA-1499: [~jjkoshy] can you review this patch? Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch, KAFKA-1499_2014-10-27_13:13:55.patch, KAFKA-1499_2014-12-16_22:39:10.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
API Annotations
Hi, Kafka has public APIs in Java and Scala, intended for use by external developers. In addition, Kafka also exposes many public methods that are intended to use within Kafka but are not intended to be called by external developers. Also, some of the external APIs are less stable than others (the new producer for example). In Hadoop we have a similar situation, and to avoid misunderstandings or miscommunications on which APIs are external and which are stable, we use annotations to communicate this information. We find it very useful in preventing our customers from accidentally getting into trouble by using internal methods or unstable APIs. Here are the annotations Hadoop uses: https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html I'm wondering what others think about using something similar in Kafka. Gwen
Re: Review Request 29091: Patch for KAFKA-1646
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/#review65216 --- Ship it! Overall it looks good to me. - Sriharsha Chintalapani On Dec. 16, 2014, 8:30 a.m., Qianlin Xia wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29091/ --- (Updated Dec. 16, 2014, 8:30 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- truncate off trailing zeros on broker restart if broker is gracefully stopped. Diffs - core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b979c3e6f62ea235bd47bc1587a1291443f9 core/src/main/scala/kafka/log/Log.scala 46df8d99d977a3b010a9b9f4698187fa9bfb2498 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/utils/Utils.scala a89b0463685e6224d263bc9177075e1bb6b93d04 Diff: https://reviews.apache.org/r/29091/diff/ Testing --- Thanks, Qianlin Xia
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248597#comment-14248597 ] Sriharsha Chintalapani commented on KAFKA-1646: --- Thanks [~qixia] for the reviewboard patch. [~jkreps] [~junrao] could you please give your feedback on updated patch. Thanks. Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248599#comment-14248599 ] Sriharsha Chintalapani commented on KAFKA-1070: --- [~nehanarkhede] Can you please take a look at the patch and also reply to your comments. Thanks. Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248602#comment-14248602 ] Gwen Shapira commented on KAFKA-1819: - Thanks for pointing the similar issue [~jjkoshy]. The log is indeed removed from the pool in LogManager.deleteLog, and we could remove them in doneCleaning. However, I think we want to be able to force cleaning as part of the topic delete. If we don't do it, the checkpoint file will only get updated some time later when doneCleaning is called. This can be more challenging to troubleshoot and also may not happen before Gian creates a new topic with same name. Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Priority: Blocker Fix For: 0.8.2 I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: API Annotations
This is definitely an area we can improve upon, thanks for bringing it up Gwen. We should also decide on structure for how we publish too that has not been something I have been consistent about for each release that should be standard. We can publish to SVN but I always feel like drift happens there. In 0.8.2-beta I pushed them so they are in dist now https://archive.apache.org/dist/kafka/0.8.2-beta/ as java-doc and scala-doc maybe that is ok for final? The content should be as helpful as it can be to folks, for sure. Gwen, do you want to create a JIRA and work on a patch for this? I feel like it is something important for 0.8.2 that wouldn't impact code (and if it does it is for good reason to best stabilize) not sure what other folks think of that though? 0.8.3 for sure +1 /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Dec 16, 2014 at 1:04 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, Kafka has public APIs in Java and Scala, intended for use by external developers. In addition, Kafka also exposes many public methods that are intended to use within Kafka but are not intended to be called by external developers. Also, some of the external APIs are less stable than others (the new producer for example). In Hadoop we have a similar situation, and to avoid misunderstandings or miscommunications on which APIs are external and which are stable, we use annotations to communicate this information. We find it very useful in preventing our customers from accidentally getting into trouble by using internal methods or unstable APIs. Here are the annotations Hadoop uses: https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html I'm wondering what others think about using something similar in Kafka. Gwen
Re: Review Request 29030: KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29030/ --- (Updated Dec. 16, 2014, 6:27 p.m.) Review request for kafka. Changes --- Have exception print whole path for dir causing issue, rather than just name of dir. Bugs: KAFKA-742 https://issues.apache.org/jira/browse/KAFKA-742 Repository: kafka Description --- KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start Diffs (updated) - core/src/main/scala/kafka/log/Log.scala 4fae2f0d339b256832baa62ca4995d10546716b4 core/src/main/scala/kafka/log/LogManager.scala 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 core/src/test/scala/unit/kafka/log/LogTest.scala d670ba76acd54e3e88855c56c152c7cc36dddfdc Diff: https://reviews.apache.org/r/29030/diff/ Testing --- Thanks, Ashish Singh
[jira] [Updated] (KAFKA-742) Existing directories under the Kafka data directory without any data cause process to not start
[ https://issues.apache.org/jira/browse/KAFKA-742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashish Kumar Singh updated KAFKA-742: - Attachment: KAFKA-742.1.patch Makes sense. Updated patch. Existing directories under the Kafka data directory without any data cause process to not start --- Key: KAFKA-742 URL: https://issues.apache.org/jira/browse/KAFKA-742 Project: Kafka Issue Type: Bug Components: config Affects Versions: 0.8.0 Reporter: Chris Curtin Assignee: Ashish Kumar Singh Fix For: 0.8.3 Attachments: KAFKA-742.1.patch, KAFKA-742.patch I incorrectly setup the configuration file to have the metrics go to /var/kafka/metrics while the logs were in /var/kafka. On startup I received the following error then the daemon exited: 30 [main] INFO kafka.log.LogManager - [Log Manager on Broker 0] Loading log 'metrics' 32 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during KafkaServerStable startup. Prepare to shutdown java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1937) at kafka.log.LogManager.kafka$log$LogManager$$parseTopicPartitionName(LogManager.scala:335) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:112) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.log.LogManager.loadLogs(LogManager.scala:101) at kafka.log.LogManager.init(LogManager.scala:62) at kafka.server.KafkaServer.startup(KafkaServer.scala:59) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) 34 [main] INFO kafka.server.KafkaServer - [Kafka Server 0], shutting down This was on a brand new cluster so no data or metrics logs existed yet. Moving the metrics to their own directory (not a child of the logs) allowed the daemon to start. Took a few minutes to figure out what was wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 24704: Patch for KAFKA-1499
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24704/#review65221 --- core/src/main/scala/kafka/log/Log.scala https://reviews.apache.org/r/24704/#comment108254 I wouldn't block on this since there are lots of other cases of this that aren't new but if you happen to do another patch remove this whitespace error. - Jonathan Creasy On Dec. 16, 2014, 5:10 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24704/ --- (Updated Dec. 16, 2014, 5:10 p.m.) Review request for kafka. Bugs: KAFKA-1499 https://issues.apache.org/jira/browse/KAFKA-1499 Repository: kafka Description --- Support given for Broker-side compression Diffs - core/src/main/scala/kafka/log/Log.scala 4fae2f0d339b256832baa62ca4995d10546716b4 core/src/main/scala/kafka/log/LogConfig.scala ca7a99e99f641b2694848b88bf4ae94657071d03 core/src/main/scala/kafka/message/BrokerCompression.scala PRE-CREATION core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 788c7864bc881b935975ab4a4e877b690e65f1f1 core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/test/scala/kafka/log/LogConfigTest.scala 99b0df7b69c5e0a1b6251c54592c6ef63b1800fe core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala PRE-CREATION core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 4e45d965bc423192ac704883ee75e9727006f89b core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 2377abe4933e065d037828a214c3a87e1773a8ef Diff: https://reviews.apache.org/r/24704/diff/ Testing --- Thanks, Manikumar Reddy O
Re: Review Request 24704: Patch for KAFKA-1499
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24704/#review65223 --- Ship it! Ship It! - Jonathan Creasy On Dec. 16, 2014, 5:10 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24704/ --- (Updated Dec. 16, 2014, 5:10 p.m.) Review request for kafka. Bugs: KAFKA-1499 https://issues.apache.org/jira/browse/KAFKA-1499 Repository: kafka Description --- Support given for Broker-side compression Diffs - core/src/main/scala/kafka/log/Log.scala 4fae2f0d339b256832baa62ca4995d10546716b4 core/src/main/scala/kafka/log/LogConfig.scala ca7a99e99f641b2694848b88bf4ae94657071d03 core/src/main/scala/kafka/message/BrokerCompression.scala PRE-CREATION core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 788c7864bc881b935975ab4a4e877b690e65f1f1 core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/test/scala/kafka/log/LogConfigTest.scala 99b0df7b69c5e0a1b6251c54592c6ef63b1800fe core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala PRE-CREATION core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala 4e45d965bc423192ac704883ee75e9727006f89b core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 2377abe4933e065d037828a214c3a87e1773a8ef Diff: https://reviews.apache.org/r/24704/diff/ Testing --- Thanks, Manikumar Reddy O
Re: [DISCUSSION] adding the serializer api back to the new java producer
Joel, With a byte array interface, of course there is nothing that one can't do. However, the real question is that whether we want to encourage people to use it this way or not. Being able to flow just bytes is definitely easier to get started. That's why many early adopters choose to do it that way. However, it's often the case that they start feeling the pain later when some producers change the data format. Their Hive/Pig queries start to break and it's a painful process to have the issue fixed. So, the purpose of this api change is really to encourage people to standardize on a single serializer/deserializer that supports things like data validation and schema evolution upstream in the producer. Now, suppose there is an Avro serializer/deserializer implementation. How do we make it easy for people to adopt? If the serializer is part of the api, we can just say, wire in the Avro serializer for key and/or value in the config and then you can start sending Avro records to the producer. If the serializer is not part of the api, we have to say, first instantiate a key and/or value serializer this way, send the key to the key serializer to get the key bytes, send the value to the value serializer to get the value bytes, and finally send the bytes to the producer. The former will be simpler and likely makes the adoption easier. Thanks, Jun On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy jjkosh...@gmail.com wrote: Documentation is inevitable even if the serializer/deserializer is part of the API - since the user has to set it up in the configs. So again, you can only encourage people to use it through documentation. The simpler byte-oriented API seems clearer to me because anyone who needs to send (or receive) a specific data type will _be forced to_ (or actually, _intuitively_) select a serializer (or deserializer) and will definitely pick an already available implementation if a good one already exists. Sorry I still don't get it and this is really the only sticking point for me, albeit a minor one (which is why I have been +0 all along on the change). I (and I think many others) would appreciate it if someone can help me understand this better. So I will repeat the question: What usage pattern cannot be supported by easily by the simpler API without adding burden on the user? Thanks, Joel On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote: Joel, It's just that if the serializer/deserializer is not part of the API, you can only encourage people to use it through documentation. However, not everyone will read the documentation if it's not directly used in the API. Thanks, Jun On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy jjkosh...@gmail.com wrote: (sorry about the late follow-up late - I'm traveling most of this month) I'm likely missing something obvious, but I find the following to be a somewhat vague point that has been mentioned more than once in this thread without a clear explanation. i.e., why is it hard to share a serializer/deserializer implementation and just have the clients call it before a send/receive? What usage pattern cannot be supported by the simpler API? 1. Can we keep the serialization semantics outside the Producer interface and have simple bytes in / bytes out for the interface (This is what we have today). The points for this is to keep the interface simple and usage easy to understand. The points against this is that it gets hard to share common usage patterns around serialization/message validations for the future. On Tue, Dec 09, 2014 at 03:51:08AM +, Sriram Subramanian wrote: Thank you Jay. I agree with the issue that you point w.r.t paired serializers. I also think having mix serialization types is rare. To get the current behavior, one can simply use a ByteArraySerializer. This is best understood by talking with many customers and you seem to have done that. I am convinced about the change. For the rest who gave -1 or 0 for this proposal, does the answers for the three points(updated) below seem reasonable? Are these explanations convincing? 1. Can we keep the serialization semantics outside the Producer interface and have simple bytes in / bytes out for the interface (This is what we have today). The points for this is to keep the interface simple and usage easy to understand. The points against this is that it gets hard to share common usage patterns around serialization/message validations for the future. 2. Can we create a wrapper producer that does the serialization and have different variants of it for different data formats? The points for this is again to keep the main API clean. The points against this is that it duplicates the API, increases the surface area and creates redundancy for a minor addition. 3. Do we need to
Re: API Annotations
Hey Gwen, We discussed this a bit about this when starting on the new clients. We were super sloppy about this in initial Kafka development--single jar, no real differentiation between public and private apis. The plan was something like the following: 1. Start to consider this with the new clients. 2. Do the public/private designation at the package level. The public packages are o.a.k.common, o.a.k.errors, o.a.k.producer, o.a.k.consumer, o.a.k.tools. This makes javadoc and things like that easier, and it makes it easy to see at a glance all the public classes. It would be even better to enforce this in the build if that is possible (i.e. no class from a non-public package is leaked) but we haven't done this. This approach obviously wasn't possible in Hadoop since they started without a clear delineation as we did in the original scala code. Thoughts? -Jay On Tue, Dec 16, 2014 at 10:04 AM, Gwen Shapira gshap...@cloudera.com wrote: Hi, Kafka has public APIs in Java and Scala, intended for use by external developers. In addition, Kafka also exposes many public methods that are intended to use within Kafka but are not intended to be called by external developers. Also, some of the external APIs are less stable than others (the new producer for example). In Hadoop we have a similar situation, and to avoid misunderstandings or miscommunications on which APIs are external and which are stable, we use annotations to communicate this information. We find it very useful in preventing our customers from accidentally getting into trouble by using internal methods or unstable APIs. Here are the annotations Hadoop uses: https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceStability.html https://hadoop.apache.org/docs/current/api/src-html/org/apache/hadoop/classification/InterfaceAudience.html I'm wondering what others think about using something similar in Kafka. Gwen
[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer
[ https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248764#comment-14248764 ] Evan Huus commented on KAFKA-1806: -- Sarama client maintainer here (via https://github.com/Shopify/sarama/issues/226); this looks like a kafka bug to me since the error in the log message is from a ReplicaFetcherThread, but I'm happy to provide extra information on the behaviour of the client if you think it's relevant. broker can still expose uncommitted data to a consumer -- Key: KAFKA-1806 URL: https://issues.apache.org/jira/browse/KAFKA-1806 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: lokesh Birla Assignee: Neha Narkhede Although following issue: https://issues.apache.org/jira/browse/KAFKA-727 is marked fixed but I still see this issue in 0.8.1.1. I am able to reproducer the issue consistently. [2014-08-18 06:43:58,356] ERROR [KafkaApi-1] Error when processing fetch request for partition [mmetopic4,2] offset 1940029 from consumer with correlation id 21 (kafka.server.Kaf kaApis) java.lang.IllegalArgumentException: Attempt to read with a maximum offset (1818353) less than the start offset (1940029). at kafka.log.LogSegment.read(LogSegment.scala:136) at kafka.log.Log.read(Log.scala:386) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:233) at scala.collection.immutable.Map$Map1.foreach(Map.scala:119) at scala.collection.TraversableLike$class.map(TraversableLike.scala:233) at scala.collection.immutable.Map$Map1.map(Map.scala:107) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1806) broker can still expose uncommitted data to a consumer
[ https://issues.apache.org/jira/browse/KAFKA-1806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14248993#comment-14248993 ] lokesh Birla commented on KAFKA-1806: - This problem occurs multiple times in server.log. Currently I am using: #added replica fetchers num.replica.fetchers=4 [2014-08-30 04:00:58,419] ERROR [ReplicaFetcherThread-1-2], Current offset 7343326909 for partition [mmetopic1,0] out of range; reset offset to 7351079341 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:01:58,351] ERROR [ReplicaFetcherThread-1-2], Current offset 7352830699 for partition [mmetopic1,0] out of range; reset offset to 7360600212 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:01:58,398] ERROR [ReplicaFetcherThread-2-2], Current offset 7362122784 for partition [mmetopic1,1] out of range; reset offset to 7369788902 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:01:58,428] ERROR [ReplicaFetcherThread-3-2], Current offset 7349217662 for partition [mmetopic1,2] out of range; reset offset to 7356979468 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:02:58,380] ERROR [ReplicaFetcherThread-3-2], Current offset 7358748697 for partition [mmetopic1,2] out of range; reset offset to 7366511359 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:02:58,431] ERROR [ReplicaFetcherThread-2-2], Current offset 7371546217 for partition [mmetopic1,1] out of range; reset offset to 7379322019 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:02:58,491] ERROR [ReplicaFetcherThread-1-2], Current offset 7362381355 for partition [mmetopic1,0] out of range; reset offset to 7370131818 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:03:58,553] ERROR [ReplicaFetcherThread-3-2], Current offset 7368280588 for partition [mmetopic1,2] out of range; reset offset to 7376042337 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:03:58,606] ERROR [ReplicaFetcherThread-1-2], Current offset 7371895090 for partition [mmetopic1,0] out of range; reset offset to 7379659373 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:03:58,745] ERROR [ReplicaFetcherThread-2-2], Current offset 7381073377 for partition [mmetopic1,1] out of range; reset offset to 7388856060 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:04:58,377] ERROR [ReplicaFetcherThread-2-2], Current offset 7390601461 for partition [mmetopic1,1] out of range; reset offset to 7398383811 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:04:58,378] ERROR [ReplicaFetcherThread-1-2], Current offset 7381410731 for partition [mmetopic1,0] out of range; reset offset to 7389193402 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:04:58,462] ERROR [ReplicaFetcherThread-3-2], Current offset 7377936663 for partition [mmetopic1,2] out of range; reset offset to 7385573885 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:05:58,440] ERROR [ReplicaFetcherThread-2-2], Current offset 7400170911 for partition [mmetopic1,1] out of range; reset offset to 7407915357 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:05:58,441] ERROR [ReplicaFetcherThread-1-2], Current offset 7390968588 for partition [mmetopic1,0] out of range; reset offset to 7398725995 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:05:58,442] ERROR [ReplicaFetcherThread-3-2], Current offset 7387325243 for partition [mmetopic1,2] out of range; reset offset to 7395096361 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:06:58,326] ERROR [ReplicaFetcherThread-1-2], Current offset 7400572665 for partition [mmetopic1,0] out of range; reset offset to 7411422730 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:06:58,346] ERROR [ReplicaFetcherThread-2-2], Current offset 7409827554 for partition [mmetopic1,1] out of range; reset offset to 7417436416 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:06:58,511] ERROR [ReplicaFetcherThread-3-2], Current offset 7396889418 for partition [mmetopic1,2] out of range; reset offset to 7404620618 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:07:58,328] ERROR [ReplicaFetcherThread-2-2], Current offset 7419467753 for partition [mmetopic1,1] out of range; reset offset to 7420615385 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:07:58,362] ERROR [ReplicaFetcherThread-3-2], Current offset 7406461331 for partition [mmetopic1,2] out of range; reset offset to 7410977640 (kafka.server.ReplicaFetcherThread) [2014-08-30 04:07:58,588] ERROR [ReplicaFetcherThread-1-2], Current offset 7413376626 for partition [mmetopic1,0] out of range; reset offset to 7414599975 (kafka.server.ReplicaFetcherThread) broker can still expose uncommitted data to a consumer -- Key: KAFKA-1806 URL: https://issues.apache.org/jira/browse/KAFKA-1806 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8.1.1 Reporter: lokesh Birla
[jira] [Commented] (KAFKA-1479) Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics
[ https://issues.apache.org/jira/browse/KAFKA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249071#comment-14249071 ] Arup Malakar commented on KAFKA-1479: - For people who may stumble upon this JIRA, the steps mentioned by [~manasi] in https://issues.apache.org/jira/browse/KAFKA-1479?focusedCommentId=14017044page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14017044 worked for me as well. Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics Key: KAFKA-1479 URL: https://issues.apache.org/jira/browse/KAFKA-1479 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1 Environment: CentOS Reporter: Manasi Manasi Started noticing that logs are filling up fast with lines like this: {quote} [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,26] failed due to Topic sams_2014-05-27 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-28,38] failed due to Topic sams_2014-05-28 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,20] failed due to Topic sams_2014-05-30 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-22,46] failed due to Topic sams_2014-05-22 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,8] failed due to Topic sams_2014-05-27 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) {quote} The above is from kafkaServer.out. Also seeing errors in server.log: {quote} [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-26,19] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,4] failed due to Topic sams_2014-05-30 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-24,34] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-26,41] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [2014-05-21,0] failed due to Topic 2014-05-21 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-28,42] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-22,21] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-20,26] failed due to Topic sams_2014-05-20 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) {quote} All these partitions belong to deleted topics. Nothing changed on our end when we started noticing these logs filling up. Any ideas what is going on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249131#comment-14249131 ] Ewen Cheslack-Postava commented on KAFKA-1788: -- [~bpot] that sounds right, I'm pretty sure metadata never gets cleared if all brokers become unavailable -- it's only updated when the producer starts and when it gets a metadataResponse message. You can actually get into the state you're talking about for a long time without losing all the brokers. Metadata update requests use NetworkClient.leastLoadedNode to select which node to send the request to, which means requests may repeatedly go to the same node even if its connection isn't getting any data through but the TCP connection hasn't timed out yet. That can result in waiting for many minutes even though the metadata might be retrievable from a different node. But I'm not sure it's really a distinct problem, just another variant -- the batch stays in the RecordAccumulator eating up bufferpool space until there's a network error or response to the request that included the batch. This means any failure to make progress sending data would trigger the same issue. I think a proper fix for this bug would add a timeout for messages as soon as send() is called, and would need to be able to remove them from any point in the pipeline after that timeout, cleaning up any resources they use. The metadata issue is another interesting problem. If you reset the metadata, the current implementation will block on any subsequent send() calls since the first thing send() does is waitOnMetadata(). Arguably, given the interface of send() I'm not sure that blocking that way should ever be allowed, although at least now its restricted to the initial send() call and probably simplifies a bunch of code. Resetting the metadata could also be counterproductive since the set of bootstrap nodes could be smaller than the subset of the cluster you had metadata for. One alternative idea: change the use of leastLoadedNode and after a certain amount of time, allow it to start considering the bootstrap nodes as well as the set currently in the metadata. producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-1822) Add echo request
Gwen Shapira created KAFKA-1822: --- Summary: Add echo request Key: KAFKA-1822 URL: https://issues.apache.org/jira/browse/KAFKA-1822 Project: Kafka Issue Type: Improvement Components: core Reporter: Gwen Shapira Assignee: Gwen Shapira Currently there is no simple way to generate a request and validate we receive a response without adding a lot of dependencies for the test. Kafka request classes have quite a few dependencies, so they are not really usable when testing infrastructure components or clients. Generating a byte-array with meaningless request key id as it is done in SocketServerTest results in unknown request exception that must be handled. I suggest adding an EchoRequest, EchoResponse and EchoHandler. The Request will be the usual header and a bytearray. The Response will be a response header and the same bytearray echoed back. Should be useful for client developers and when testing infrastructure changes. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249255#comment-14249255 ] Jay Kreps commented on KAFKA-1788: -- Currently the producer supports either blocking or dropping when it cannot send to the cluster as fast as data is arriving. This could occur because the cluster is down, or just because it isn't fast enough to keep up. Kafka provides high availability for partitions so the case where a partition is permanently unavailable should be rare. Timing out requests might be nice, but it's not 100% clear that is better than the current strategy. The current strategy is just to buffer as long as possible and then either block or drop data when the buffer is exhausted. Arguably dropping when you are out of space is better than dropping after a fixed time (since in any case you have to drop when you are out of space). As Ewen says we can't reset the metadata because the bootstrap servers may no longer exist and if they do they are by definition a subset of the current cluster metadata. I think Ewen solution of just making sure leastLoadedNode eventually tries all nodes is the right way to go. We'll have to be careful, though, as that method is pretty constrained. producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249266#comment-14249266 ] Bhavesh Mistry commented on KAFKA-1788: --- [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of al Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method). Thanks, Bhavesh producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1788) producer record can stay in RecordAccumulator forever if leader is no available
[ https://issues.apache.org/jira/browse/KAFKA-1788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249266#comment-14249266 ] Bhavesh Mistry edited comment on KAFKA-1788 at 12/17/14 1:26 AM: - [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of all Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method for IO thread and interrupted if it does not get killed ). Thanks, Bhavesh was (Author: bmis13): [~jkreps], Can we just take quick look at the NodeConnectionState ? If all registered Nodes are down, then exit it quickly or attempt to connect ? This will have accurate status of al Nodes registered... (may we can do TCP ping for all nodes). I am not sure if producer key is fixed to only one brokers then does it still have all Node status ? Here is reference code: https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java I did this in experimental path for o KAFKA-1642 (but used hard coded timeout for join method). Thanks, Bhavesh producer record can stay in RecordAccumulator forever if leader is no available --- Key: KAFKA-1788 URL: https://issues.apache.org/jira/browse/KAFKA-1788 Project: Kafka Issue Type: Bug Components: core, producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Labels: newbie++ Fix For: 0.8.3 In the new producer, when a partition has no leader for a long time (e.g., all replicas are down), the records for that partition will stay in the RecordAccumulator until the leader is available. This may cause the bufferpool to be full and the callback for the produced message to block for a long time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249443#comment-14249443 ] Neha Narkhede edited comment on KAFKA-1819 at 12/17/14 4:08 AM: [~jjkoshy] Shouldn't we remove the deleted topic from all files that maintain either a cleaner or recovery checkpoint before delete topic is considered completed? was (Author: nehanarkhede): [~jjkoshy] Shouldn't we remove the deleted topic from all files that maintain either a cleaner or recovery checkpoint? Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Priority: Blocker Fix For: 0.8.2 I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249443#comment-14249443 ] Neha Narkhede commented on KAFKA-1819: -- [~jjkoshy] Shouldn't we remove the deleted topic from all files that maintain either a cleaner or recovery checkpoint? Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Priority: Blocker Fix For: 0.8.2 I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29030: KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29030/#review65287 --- Ship it! Ship It! - Neha Narkhede On Dec. 16, 2014, 6:27 p.m., Ashish Singh wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29030/ --- (Updated Dec. 16, 2014, 6:27 p.m.) Review request for kafka. Bugs: KAFKA-742 https://issues.apache.org/jira/browse/KAFKA-742 Repository: kafka Description --- KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start Diffs - core/src/main/scala/kafka/log/Log.scala 4fae2f0d339b256832baa62ca4995d10546716b4 core/src/main/scala/kafka/log/LogManager.scala 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 core/src/test/scala/unit/kafka/log/LogTest.scala d670ba76acd54e3e88855c56c152c7cc36dddfdc Diff: https://reviews.apache.org/r/29030/diff/ Testing --- Thanks, Ashish Singh
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249451#comment-14249451 ] Gwen Shapira commented on KAFKA-1819: - The recovery checkpoints are currently handled correctly (By LogManager, I think?), the only issue is with the cleaner file. Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Priority: Blocker Fix For: 0.8.2 I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-742) Existing directories under the Kafka data directory without any data cause process to not start
[ https://issues.apache.org/jira/browse/KAFKA-742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-742: Resolution: Fixed Status: Resolved (was: Patch Available) Thanks for the patch, [~singhashish]. Pushed to trunk Existing directories under the Kafka data directory without any data cause process to not start --- Key: KAFKA-742 URL: https://issues.apache.org/jira/browse/KAFKA-742 Project: Kafka Issue Type: Bug Components: config Affects Versions: 0.8.0 Reporter: Chris Curtin Assignee: Ashish Kumar Singh Fix For: 0.8.3 Attachments: KAFKA-742.1.patch, KAFKA-742.patch I incorrectly setup the configuration file to have the metrics go to /var/kafka/metrics while the logs were in /var/kafka. On startup I received the following error then the daemon exited: 30 [main] INFO kafka.log.LogManager - [Log Manager on Broker 0] Loading log 'metrics' 32 [main] FATAL kafka.server.KafkaServerStartable - Fatal error during KafkaServerStable startup. Prepare to shutdown java.lang.StringIndexOutOfBoundsException: String index out of range: -1 at java.lang.String.substring(String.java:1937) at kafka.log.LogManager.kafka$log$LogManager$$parseTopicPartitionName(LogManager.scala:335) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:112) at kafka.log.LogManager$$anonfun$loadLogs$1$$anonfun$apply$3.apply(LogManager.scala:109) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:109) at kafka.log.LogManager$$anonfun$loadLogs$1.apply(LogManager.scala:101) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32) at kafka.log.LogManager.loadLogs(LogManager.scala:101) at kafka.log.LogManager.init(LogManager.scala:62) at kafka.server.KafkaServer.startup(KafkaServer.scala:59) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:46) at kafka.Kafka.main(Kafka.scala) 34 [main] INFO kafka.server.KafkaServer - [Kafka Server 0], shutting down This was on a brand new cluster so no data or metrics logs existed yet. Moving the metrics to their own directory (not a child of the logs) allowed the daemon to start. Took a few minutes to figure out what was wrong. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review65289 --- Overall, this looks good. How about adding some unit tests before checking it in? - Neha Narkhede On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Nov. 26, 2014, 4:29 a.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 0da774d0ed015bdc0461b854e3540ee6e48d1838 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23702: Patch for KAFKA-1070
On Dec. 17, 2014, 4:58 a.m., Neha Narkhede wrote: Overall, this looks good. How about adding some unit tests before checking it in? Please ignore this. I was looking at only part of the patch. - Neha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review65289 --- On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Nov. 26, 2014, 4:29 a.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 0da774d0ed015bdc0461b854e3540ee6e48d1838 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review65293 --- core/src/main/scala/kafka/common/GenerateBrokerIdException.scala https://reviews.apache.org/r/23702/#comment108372 Can you fix the doc? core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala https://reviews.apache.org/r/23702/#comment108373 typo core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala https://reviews.apache.org/r/23702/#comment108379 Please include more constructors in both exception classes that allow passing in (message, cause), just message, just cause or nothing. core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala https://reviews.apache.org/r/23702/#comment108382 version shouldn't be passed in. It should just live in this file. core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala https://reviews.apache.org/r/23702/#comment108376 If this crashes before syncing the data, it might lead to a corrupted meta.properties file. We should probably write to a tmp file and atomically swap it in, similar to how we handle writes in other checkpoint files (eg. OffsetCheckpoint). Also, how about naming this BrokerMetadata? core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment108377 Can you document the broker ids that don't match? Configured brokerId %d doesn't match stored brokerId %d in meta.properties core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment108378 Can you use error(..., e)? core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment108380 Please include the cause in the exception constructor. throw new GenerateBrokerIdException(Failed..., e); core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala https://reviews.apache.org/r/23702/#comment108381 offset checkpoint? - Neha Narkhede On Nov. 26, 2014, 4:29 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated Nov. 26, 2014, 4:29 a.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/GenerateBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/BrokerMetadataFileHandler.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 6e26c5436feb4629d17f199011f3ebb674aa767f core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/utils/ZkUtils.scala 56e3e88e0cc6d917b0ffd1254e173295c1c4aabd core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 0da774d0ed015bdc0461b854e3540ee6e48d1838 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Build failed in Jenkins: Kafka-trunk #355
See https://builds.apache.org/job/Kafka-trunk/355/changes Changes: [neha.narkhede] KAFKA-742: Existing directories under the Kafka data directory without any data cause process to not start; reviewed by Neha Narkhede -- [...truncated 1516 lines...] at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36) at kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39) at kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39) kafka.integration.PrimitiveApiTest testConsumerEmptyTopic FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36) at kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39) at kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39) kafka.integration.PrimitiveApiTest testPipelinedProduceRequests FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.kafka$integration$KafkaServerTestHarness$$super$setUp(PrimitiveApiTest.scala:39) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36) at kafka.integration.PrimitiveApiTest.kafka$integration$ProducerConsumerTestHarness$$super$setUp(PrimitiveApiTest.scala:39) at kafka.integration.ProducerConsumerTestHarness$class.setUp(ProducerConsumerTestHarness.scala:33) at kafka.integration.PrimitiveApiTest.setUp(PrimitiveApiTest.scala:39) kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooHigh FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59) at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52) at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:95) at kafka.zk.EmbeddedZookeeper.init(EmbeddedZookeeper.scala:33) at kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33) at kafka.integration.AutoOffsetResetTest.kafka$integration$KafkaServerTestHarness$$super$setUp(AutoOffsetResetTest.scala:32) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:36) at kafka.integration.AutoOffsetResetTest.setUp(AutoOffsetResetTest.scala:46) kafka.integration.AutoOffsetResetTest testResetToEarliestWhenOffsetTooLow FAILED java.net.BindException: Address already in use at sun.nio.ch.Net.bind(Native Method) at
[jira] [Commented] (KAFKA-1070) Auto-assign node id
[ https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249492#comment-14249492 ] Neha Narkhede commented on KAFKA-1070: -- Thanks for the updated patch [~sriharsha]. Almost there. Left a few more cleanup comments. Auto-assign node id --- Key: KAFKA-1070 URL: https://issues.apache.org/jira/browse/KAFKA-1070 Project: Kafka Issue Type: Bug Reporter: Jay Kreps Assignee: Sriharsha Chintalapani Labels: usability Fix For: 0.8.3 Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, KAFKA-1070_2014-07-24_21:05:33.patch, KAFKA-1070_2014-08-21_10:26:20.patch, KAFKA-1070_2014-11-20_10:50:04.patch, KAFKA-1070_2014-11-25_20:29:37.patch It would be nice to have Kafka brokers auto-assign node ids rather than having that be a configuration. Having a configuration is irritating because (1) you have to generate a custom config for each broker and (2) even though it is in configuration, changing the node id can cause all kinds of bad things to happen. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 29072: rebased for 0.8.2 branch
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29072/#review65296 --- I found the same review comments I had on trunk. Can you update this patch after addressing the review comments on trunk? clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java https://reviews.apache.org/r/29072/#comment108383 minor nit pick to be addressed on checkin: do nothing clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java https://reviews.apache.org/r/29072/#comment108384 Same as the review on trunk. Can you also make sure the examples are updated? Actually, maybe the review comments from trunk all apply here. clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java https://reviews.apache.org/r/29072/#comment108385 ditto clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java https://reviews.apache.org/r/29072/#comment108386 same as review on trunk - Neha Narkhede On Dec. 16, 2014, 12:49 a.m., Jun Rao wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29072/ --- (Updated Dec. 16, 2014, 12:49 a.m.) Review request for kafka. Bugs: kafka-1797 https://issues.apache.org/jira/browse/kafka-1797 Repository: kafka Description --- fix imports Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/ByteArrayDeserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 227f5646ee708af1b861c15237eda2140cfd4900 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 46efc0c8483acacf42b2984ac3f3b9e0a4566187 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 436d8a479166eda29f2672b50fc99f288bbe3fa9 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 2ecfc8aaea90a7353bd0dabc4c0ebcc6fd9535ec clients/src/main/java/org/apache/kafka/clients/consumer/Deserializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java fe93afa24fc20b03830f1d190a276041d15bd3b9 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java c3aad3b4d6b677f759583f309061193f2f109250 clients/src/main/java/org/apache/kafka/clients/producer/ByteArraySerializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 32f444ebbd27892275af7a0947b86a6b8317a374 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 36e8398416036cab84faad1f07159e5adefd8086 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 9095caf0db1e41a4acb4216fb197626fbd85b806 clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java c3181b368b6cf15e7134b04e8ff5655a9321ee0b clients/src/main/java/org/apache/kafka/clients/producer/Serializer.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java 40e8234f8771098b097bf757a86d5ac98604df5e clients/src/main/java/org/apache/kafka/common/errors/DeserializationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/SerializationException.java PRE-CREATION core/src/main/scala/kafka/producer/BaseProducer.scala b0207930dd0543f2c51f0b35002e13bf104340ff core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala 4b5b823b85477394cd50eb2a66877a3b8b35b57f core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a core/src/main/scala/kafka/tools/ReplayLogProducer.scala 3393a3dd574ac45a27bf7eda646b737146c55038 core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 67196f30af1cfcd40ded20ca970082b78504f6af core/src/main/scala/kafka/tools/TestLogCleaning.scala 1d4ea93f2ba8d4d4d47a307cd47f54a15d3d30dd core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala 6379f2b60af797b084981c94fd84b3d7740aa8a5 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/integration/kafka/api/ProducerSendTest.scala d407af9144ef6930d737a6dcf23591c1f6342f87 core/src/test/scala/unit/kafka/utils/TestUtils.scala 0da774d0ed015bdc0461b854e3540ee6e48d1838 Diff: https://reviews.apache.org/r/29072/diff/ Testing --- Thanks, Jun Rao
[jira] [Commented] (KAFKA-1819) Cleaner gets confused about deleted and re-created topics
[ https://issues.apache.org/jira/browse/KAFKA-1819?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249530#comment-14249530 ] Joel Koshy commented on KAFKA-1819: --- [~gwenshap] that's right - that's what I meant by force write the checkpoints if cleaning was not in progress. As you said, it needs to happen proactively when deleting a log, but we probably don't need to force a cleaning for that since we just need to update the cleaner checkpoint file. So I was thinking we could refactor the code a tiny bit to have a helper write out the checkpoint file and call that from both doneCleaning as well as when deleting logs. Cleaner gets confused about deleted and re-created topics - Key: KAFKA-1819 URL: https://issues.apache.org/jira/browse/KAFKA-1819 Project: Kafka Issue Type: Bug Reporter: Gian Merlino Priority: Blocker Fix For: 0.8.2 I get an error like this after deleting a compacted topic and re-creating it. I think it's because the brokers don't remove cleaning checkpoints from the cleaner-offset-checkpoint file. This is from a build based off commit bd212b7. java.lang.IllegalArgumentException: requirement failed: Last clean offset is 587607 but segment base offset is 0 for log foo-6. at scala.Predef$.require(Predef.scala:233) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:502) at kafka.log.Cleaner.clean(LogCleaner.scala:300) at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:214) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:192) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1499) Broker-side compression configuration
[ https://issues.apache.org/jira/browse/KAFKA-1499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14249536#comment-14249536 ] Joel Koshy commented on KAFKA-1499: --- Thank you for the ping, and sorry about the delayed review. I should be able to get to this within a day. Broker-side compression configuration - Key: KAFKA-1499 URL: https://issues.apache.org/jira/browse/KAFKA-1499 Project: Kafka Issue Type: New Feature Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1499.patch, KAFKA-1499.patch, KAFKA-1499_2014-08-15_14:20:27.patch, KAFKA-1499_2014-08-21_21:44:27.patch, KAFKA-1499_2014-09-21_15:57:23.patch, KAFKA-1499_2014-09-23_14:45:38.patch, KAFKA-1499_2014-09-24_14:20:33.patch, KAFKA-1499_2014-09-24_14:24:54.patch, KAFKA-1499_2014-09-25_11:05:57.patch, KAFKA-1499_2014-10-27_13:13:55.patch, KAFKA-1499_2014-12-16_22:39:10.patch Original Estimate: 72h Remaining Estimate: 72h A given topic can have messages in mixed compression codecs. i.e., it can also have a mix of uncompressed/compressed messages. It will be useful to support a broker-side configuration to recompress messages to a specific compression codec. i.e., all messages (for all topics) on the broker will be compressed to this codec. We could have per-topic overrides as well. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] adding the serializer api back to the new java producer
Jun, Thanks for summarizing this - it helps confirm for me that I did not misunderstand anything in this thread so far; and that I disagree with the premise that the steps in using the current byte-oriented API is cumbersome or inflexible. It involves instantiating the K-V serializers in code (as opposed to config) and a extra (but explicit - i.e., making it very clear to the user) but simple call to serialize before sending. The point about downstream queries breaking can happen just as well with the implicit serializers/deserializers - since ultimately people have to instantiate the specific type in their code and if they want to send it they will. I think adoption is also equivalent since people will just instantiate whatever serializer/deserializer they want in one line. Plugging in a new serializer implementation does require a code change, but that can also be avoided via a config driven factory. So I'm still +0 on the change but I'm definitely not against moving forward with the changes. i.e., unless there is any strong -1 on the proposal from anyone else. Thanks, Joel With a byte array interface, of course there is nothing that one can't do. However, the real question is that whether we want to encourage people to use it this way or not. Being able to flow just bytes is definitely easier to get started. That's why many early adopters choose to do it that way. However, it's often the case that they start feeling the pain later when some producers change the data format. Their Hive/Pig queries start to break and it's a painful process to have the issue fixed. So, the purpose of this api change is really to encourage people to standardize on a single serializer/deserializer that supports things like data validation and schema evolution upstream in the producer. Now, suppose there is an Avro serializer/deserializer implementation. How do we make it easy for people to adopt? If the serializer is part of the api, we can just say, wire in the Avro serializer for key and/or value in the config and then you can start sending Avro records to the producer. If the serializer is not part of the api, we have to say, first instantiate a key and/or value serializer this way, send the key to the key serializer to get the key bytes, send the value to the value serializer to get the value bytes, and finally send the bytes to the producer. The former will be simpler and likely makes the adoption easier. Thanks, Jun On Mon, Dec 15, 2014 at 7:20 PM, Joel Koshy jjkosh...@gmail.com wrote: Documentation is inevitable even if the serializer/deserializer is part of the API - since the user has to set it up in the configs. So again, you can only encourage people to use it through documentation. The simpler byte-oriented API seems clearer to me because anyone who needs to send (or receive) a specific data type will _be forced to_ (or actually, _intuitively_) select a serializer (or deserializer) and will definitely pick an already available implementation if a good one already exists. Sorry I still don't get it and this is really the only sticking point for me, albeit a minor one (which is why I have been +0 all along on the change). I (and I think many others) would appreciate it if someone can help me understand this better. So I will repeat the question: What usage pattern cannot be supported by easily by the simpler API without adding burden on the user? Thanks, Joel On Mon, Dec 15, 2014 at 11:59:34AM -0800, Jun Rao wrote: Joel, It's just that if the serializer/deserializer is not part of the API, you can only encourage people to use it through documentation. However, not everyone will read the documentation if it's not directly used in the API. Thanks, Jun On Mon, Dec 15, 2014 at 2:11 AM, Joel Koshy jjkosh...@gmail.com wrote: (sorry about the late follow-up late - I'm traveling most of this month) I'm likely missing something obvious, but I find the following to be a somewhat vague point that has been mentioned more than once in this thread without a clear explanation. i.e., why is it hard to share a serializer/deserializer implementation and just have the clients call it before a send/receive? What usage pattern cannot be supported by the simpler API? 1. Can we keep the serialization semantics outside the Producer interface and have simple bytes in / bytes out for the interface (This is what we have today). The points for this is to keep the interface simple and usage easy to understand. The points against this is that it gets hard to share common usage patterns around serialization/message validations for the future. On Tue, Dec 09, 2014 at 03:51:08AM +, Sriram Subramanian wrote: Thank you Jay. I agree with the issue that you point w.r.t paired serializers. I also think having mix serialization