[jira] [Commented] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data
[ https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061724#comment-14061724 ] Dmitry Bugaychenko commented on KAFKA-1539: --- It looks like even after flush data are not necesary written to HDD. In XFS by default it could be cached up to 30 secodns, it also can be cached by a disk controller and etc. Wrtiting to temp file is a good idea, but it is better to keep the previous file untouched (do not replace it with the temp one). On a 20 HDD server with XFS it is pretty easy to reproduce - after power failure we got corrupted offset files on 4-5 disks. Due to OS caching Kafka might loose offset files which causes full reset of data Key: KAFKA-1539 URL: https://issues.apache.org/jira/browse/KAFKA-1539 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Seen this while testing power failure and disk failures. Due to chaching on OS level (eg. XFS can cache data for 30 seconds) after failure we got offset files of zero length. This dramatically slows down broker startup (it have to re-check all segments) and if high watermark offsets lost it simply erases all data and start recovering from other brokers (looks funny - first spending 2-3 hours re-checking logs and then deleting them all due to missing high watermark). Proposal: introduce offset files rotation. Keep two version of offset file, write to oldest, read from the newest valid. In this case we would be able to configure offset checkpoint time in a way that at least one file is alway flushed and valid. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061766#comment-14061766 ] Nicolae Marasoiu commented on KAFKA-1535: - Which branch should this be done on? trunk? return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061766#comment-14061766 ] Nicolae Marasoiu edited comment on KAFKA-1535 at 7/15/14 6:46 AM: -- Which branch should this be done on? Seems trunk == 0.9.0 Should this be done in the default branch 0.8.1 ? was (Author: nmarasoiu): Which branch should this be done on? trunk? return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061766#comment-14061766 ] Nicolae Marasoiu edited comment on KAFKA-1535 at 7/15/14 6:48 AM: -- Should this be done in the default branch 0.8.1 ? was (Author: nmarasoiu): Which branch should this be done on? Seems trunk == 0.9.0 Should this be done in the default branch 0.8.1 ? return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061957#comment-14061957 ] Alexey Ozeritskiy commented on KAFKA-1414: -- Anton, you forget to use new config option (log.recovery.threads). Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1325) Fix inconsistent per topic log configs
[ https://issues.apache.org/jira/browse/KAFKA-1325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1325: - Labels: newbie usability (was: usability) Fix inconsistent per topic log configs -- Key: KAFKA-1325 URL: https://issues.apache.org/jira/browse/KAFKA-1325 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1 Reporter: Neha Narkhede Assignee: Manikumar Reddy Labels: newbie, usability Fix For: 0.8.2 Attachments: KAFKA-1325.patch, KAFKA-1325.patch Related thread from the user mailing list - http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3Cd8f6f3857b82c4ccd8725aba6fd68cb8%40cweb01.nmdf.nhnsystem.com%3E Our documentation is a little confusing on the log configs. The log property for retention.ms is in millis but the server default it maps to is in minutes. Same is true for segment.ms as well. We could either improve the docs or change the per-topic configs to be consistent with the server defaults. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062214#comment-14062214 ] Jay Kreps commented on KAFKA-1535: -- Yeah we do all development on trunk and just cut branches as a stable point for critical point fixes needed after the release. return all live brokers in TopicMetadataResponse Key: KAFKA-1535 URL: https://issues.apache.org/jira/browse/KAFKA-1535 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Currently, we only return the brokers that have assigned replicas for a topic in TopicMetadataResponse. The new producer will use those brokers for refreshing metadata. Now suppose that we stop all those brokers, copy all local data to some new hosts and then restart those hosts (with the original broker id). There is no way for the new producer to automatically get the information about the new brokers since all old brokers are gone. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1519) Console consumer: expose configuration option to enable/disable writing the line separator
[ https://issues.apache.org/jira/browse/KAFKA-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1519: - Labels: newbie (was: ) Console consumer: expose configuration option to enable/disable writing the line separator -- Key: KAFKA-1519 URL: https://issues.apache.org/jira/browse/KAFKA-1519 Project: Kafka Issue Type: Improvement Components: consumer Affects Versions: 0.8.1.1 Reporter: Michael Noll Assignee: Gwen Shapira Priority: Minor Labels: newbie Attachments: KAFKA-1519.patch The current console consumer includes a {{DefaultMessageFormatter}}, which exposes a few user-configurable options which can be set on the command line via --property, e.g. --property line.separator=XYZ. Unfortunately, the current implementation does not allow the user to completely disable writing any such line separator. However, this functionality would be helpful to enable users to capture data as is from a Kafka topic to snapshot file. Capturing data as is -- without an artificial line separator -- is particularly nice for data in a binary format (including Avro). *No workaround* A potential workaround would be to pass an empty string as the property value of line.separator, but this doesn't work in the current implementation. The following variants throw an Invalid parser arguments exception: {code} --property line.separator= # nothing --property line.separator= # double quotes --property line.separator='' # single quotes {code} Escape tricks via a backslash don't work either. If there actually is a workaround please let me know. *How to fix* We can introduce a print.line option to enable/disable writing line.separator similar to how the code already uses print.key to enable/disable writing key.separator. This change is trivial. To preserve backwards compatibility, the print.line option would be set to true by default (unlike the print.key option, which defaults to false). *Alternatives* Apart from modifying the built-in {{DefaultMessageFormatter}}, users could of course implement their own custom {{MessageFormatter}}. But given that it's a) a trivial change to the {{DefaultMessageFormatter}} and b) a nice user feature I'd say changing the built-in {{DefaultMessageFormatter}} would be the better approach. This way, Kafka would support writing data as-is to a file out of the box. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1521) Producer Graceful Shutdown issue in Container (Kafka version 0.8.x.x)
[ https://issues.apache.org/jira/browse/KAFKA-1521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede resolved KAFKA-1521. -- Resolution: Won't Fix Closing as per [~junrao]'s comment. [~Bmis13] Please reopen if you have concerns. Producer Graceful Shutdown issue in Container (Kafka version 0.8.x.x) - Key: KAFKA-1521 URL: https://issues.apache.org/jira/browse/KAFKA-1521 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.0, 0.8.1.1 Environment: Tomcat Container or Any other J2EE container Reporter: Bravesh Mistry Assignee: Jun Rao Priority: Minor Hi Kafka Team, We are running multiple webapps in tomcat container, and we have producer which are managed by the ServletContextListener (Lifecycle). Upon contextInitialized we create and on contextDestroyed we call the producer.close() but underlying Metric Lib does not shutdown. So we have thread leak due to this issue. I had to call Metrics.defaultRegistry().shutdown() to resolve this issue. is this know issue ? I know the metric lib have JVM Shutdown hook, but it will not be invoke since the contain thread is un-deploying the web app and class loader goes way and leaking thread does not find the under lying Kafka class. Because of this tomcat, it not shutting down gracefully. Are you guys planing to un-register metrics when Producer close is called or shutdown Metrics pool for client.id ? Here is logs: SEVERE: The web application [ ] appears to have started a thread named [metrics-meter-tick-thread-1] but has failed to stop it. This is very likely to create a memory leak. SEVERE: The web application [] appears to have started a thread named [metrics-meter-tick-thread-2] but has failed to stop it. This is very likely to create a memory leak. Thanks, Bhavesh -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1517) Messages is a required argument to Producer Performance Test
[ https://issues.apache.org/jira/browse/KAFKA-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1517: - Labels: newbie (was: ) Messages is a required argument to Producer Performance Test Key: KAFKA-1517 URL: https://issues.apache.org/jira/browse/KAFKA-1517 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Daniel Compton Priority: Trivial Labels: newbie When running the producer performance test without providing a messages argument, you get an error: {noformat} $bin/kafka-producer-perf-test.sh --topics mirrormirror --broker-list kafka-dc21:9092 Missing required argument [messages] Option Description -- --- .. --messages Long: countThe number of messages to send or consume (default: 9223372036854775807) {noformat} However the [shell command documentation|https://github.com/apache/kafka/blob/c66e408b244de52f1c5c5bbd7627aa1f028f9a87/perf/src/main/scala/kafka/perf/PerfConfig.scala#L25] doesn't say that this is required and implies that [2^63-1|http://en.wikipedia.org/wiki/9223372036854775807] (Long.MaxValue) messages will be sent. It should probably look like the [ConsoleProducer|https://github.com/apache/kafka/blob/c66e408b244de52f1c5c5bbd7627aa1f028f9a87/core/src/main/scala/kafka/producer/ConsoleProducer.scala#L32] and prefix the documentation with REQUIRED. Or should we make this a non-required argument and set the default value to something sane like 100,000 messages. Which option is preferable for this? -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1019) kafka-preferred-replica-election.sh will fail without clear error message if /brokers/topics/[topic]/partitions does not exist
[ https://issues.apache.org/jira/browse/KAFKA-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062323#comment-14062323 ] Neha Narkhede commented on KAFKA-1019: -- [~draiwn] were you able to confirm the issue with zookeeper 3.4.6? In any case, we can at least fix the error message when the path doesn't exist. kafka-preferred-replica-election.sh will fail without clear error message if /brokers/topics/[topic]/partitions does not exist -- Key: KAFKA-1019 URL: https://issues.apache.org/jira/browse/KAFKA-1019 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Guozhang Wang Labels: newbie Fix For: 0.8.2 From Libo Yu: I tried to run kafka-preferred-replica-election.sh on our kafka cluster. But I got this expection: Failed to start preferred replica election org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/uattoqaaa.default/partitions I checked zookeeper and there is no /brokers/topics/uattoqaaa.default/partitions. All I found is /brokers/topics/uattoqaaa.default. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1534) transient unit test failure in testBasicPreferredReplicaElection
[ https://issues.apache.org/jira/browse/KAFKA-1534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1534: - Labels: newbie (was: ) transient unit test failure in testBasicPreferredReplicaElection Key: KAFKA-1534 URL: https://issues.apache.org/jira/browse/KAFKA-1534 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Saw the following transient failure. kafka.admin.AdminTest testBasicPreferredReplicaElection FAILED junit.framework.AssertionFailedError: Timing out after 5000 ms since leader is not elected or changed for partition [test,1] at junit.framework.Assert.fail(Assert.java:47) at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:542) at kafka.admin.AdminTest.testBasicPreferredReplicaElection(AdminTest.scala:310) -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23474: Patch for KAFKA-1483
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23474/#review47785 --- core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/23474/#comment83980 I am wondering if this function will introduce more lock contention on leaderIsrUpdateLock? Every leaderReplicaIfLocal call will need to hold on its read lock while other requests at the same time may try to modify the leader option? - Guozhang Wang On July 15, 2014, 1:18 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23474/ --- (Updated July 15, 2014, 1:18 a.m.) Review request for kafka. Bugs: KAFKA-1483 https://issues.apache.org/jira/browse/KAFKA-1483 Repository: kafka Description --- KAFKA-1483. Split Brain about Leader Partitions. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 Diff: https://reviews.apache.org/r/23474/diff/ Testing --- Thanks, Sriharsha Chintalapani
Review Request 23516: Patch for KAFKA-1462
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23516/ --- Review request for kafka. Bugs: KAFKA-1462 https://issues.apache.org/jira/browse/KAFKA-1462 Repository: kafka Description --- remove partition from all PartitionData since it's redundant minor fixes initial patch Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java a016269512b6d6d6e0fd3fab997e9c8265024eb4 clients/src/main/java/org/apache/kafka/common/Cluster.java c62707ab3aba26771fc4b993df28bf8c44f32309 clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 6fe7573973832615976defa37fe0dfbb8f911939 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 8cecba50bf067713184208552af36469962cd628 clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java f35bd87cf0c52a30ed779b25d60bbe64a60b9502 clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 2652c32f123b3bc4b0456d4bc9fbba52c051724c clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 6036f6af1c55c1b0a15471e79b229b17f50ce31c clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 6cf4fb714916f1a318d788cde8fc0aad9dfb83ca clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 66cc2fea6443968e525419a203dbc4227e0b1cdf clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 257b8287757e40349ea041ed7a651993007a55a8 clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 2f98192b064d1ce7c0779e901293edb8c3801915 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java PRE-CREATION core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala dfad6e6534dd9b00099d110804899080e8d832ab core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala c72ca14708a3625cb89d5fb92630138d2afa2bf0 core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 7dacb2023788064b736df8b775aaf12281d545b5 core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 46ec3db28f88bbf9e0b0de2133807dc552bcae13 core/src/main/scala/kafka/api/FetchRequest.scala a8b73acd1a813284744359e8434cb52d22063c99 core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 3e408174dcc7e8dd9097bae41277ee4f7160afb3 core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala f6368bb5a1d560f79427284ccbac9d46b789 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 630768ab57afb579049bcbc5d44ee6823b0e7cc2 core/src/main/scala/kafka/api/OffsetCommitResponse.scala 4946e9729ecbf3da35bdab5c832d26977c107e9e core/src/main/scala/kafka/api/OffsetFetchRequest.scala a32f8588ff02f5fb3c99fb8e5508f462923e8edc core/src/main/scala/kafka/api/OffsetFetchResponse.scala c1222f422ddb6413bbb2e5da2980903ee70b9156 core/src/main/scala/kafka/api/OffsetRequest.scala
Re: Review Request 23516: Patch for KAFKA-1462
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23516/ --- (Updated July 15, 2014, 6:36 p.m.) Review request for kafka. Bugs: KAFKA-1462 https://issues.apache.org/jira/browse/KAFKA-1462 Repository: kafka Description (updated) --- 1. I kept the request objects in the server separated from those in the client. This is because (1) some of the existing request objects are part of old client api (FetechRequest, OffsetCommitRequest, etc) and we can't remove them until the old clients are removed, (2) changing existing request objects on the server side requires significant refactoring. 2. On the client side, I refactored existing request objects a bit. Now, every request/response object extends from a GenericStruct. GenericStruct provides a standard way for doing serialization and toString so that we don't have to do that on every request. Each request/response can be constructed in two ways: (1) by providing request specific fields; (2) by providing a struct. The latter is used for getting a request/response from its serialized format. 3. On the server side. What I did is to keep the existing requests more or less untouched. For new types of requests, create a thin wrapper on the server side so that it can leverage the request objects created on the client side. This way the server side object will share the serialization and the toString logic with the client side object. In order to do this, I removed correlationId from the RequestOrResponse object. There is only one place where correlationId is directly referenced and it is not necessary. 4. Multi-version support. We now need to support two versions of OffsetCommitRequest since for the new consumer work, we added two extra fields in the request. For simplicity, both versions are converted to the same request object. Since the old version doesn't have the new fields, defaults will be used. 5. The new requests/responses are based on the format described in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats. I made some minor changes to the wiki so that the new requests follow the current standard. 6. Added some missing util functions and added unit test for testing the serialization/deserialization logic. Diffs - clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java a016269512b6d6d6e0fd3fab997e9c8265024eb4 clients/src/main/java/org/apache/kafka/common/Cluster.java c62707ab3aba26771fc4b993df28bf8c44f32309 clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 6fe7573973832615976defa37fe0dfbb8f911939 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 044b03061802ee5e8ea4f1995fb0988e1a70e9a7 clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 8cecba50bf067713184208552af36469962cd628 clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java f35bd87cf0c52a30ed779b25d60bbe64a60b9502 clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 2652c32f123b3bc4b0456d4bc9fbba52c051724c clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 6036f6af1c55c1b0a15471e79b229b17f50ce31c clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 6cf4fb714916f1a318d788cde8fc0aad9dfb83ca
[jira] [Updated] (KAFKA-1462) Add new request and response formats for the new consumer and coordinator communication
[ https://issues.apache.org/jira/browse/KAFKA-1462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1462: --- Attachment: KAFKA-1462.patch Add new request and response formats for the new consumer and coordinator communication --- Key: KAFKA-1462 URL: https://issues.apache.org/jira/browse/KAFKA-1462 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Jun Rao Fix For: 0.9.0 Attachments: KAFKA-1462.patch We need to add the request / response formats according to the new format protocol once their design is final: https://cwiki.apache.org/confluence/display/KAFKA /Kafka+0.9+Consumer+Rewrite+Design -- This message was sent by Atlassian JIRA (v6.2#6252)
Review Request 23521: Fix KAFKA-1533
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23521/ --- Review request for kafka. Bugs: KAFKA-1533 https://issues.apache.org/jira/browse/KAFKA-1533 Repository: kafka Description --- 1. Add the metadataRefreshAttemptMS in NetworkClient for backing off; 2. Refactor Producer API tests using KafkaTestHarness; 3. Change default backoff time to 100ms for test utils Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 15fd5bcbaf175a0f7d7cf0b142e63f705ca9b6ae core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 34a7db4b4ea2b720476c2b1f22a623a997faffbc core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 194dd70919a5f301d3131c56594e40a0ebb27311 core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23521/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062614#comment-14062614 ] Guozhang Wang commented on KAFKA-1533: -- Created reviewboard https://reviews.apache.org/r/23521/ against branch origin/trunk transient unit test failure in ProducerFailureHandlingTest -- Key: KAFKA-1533 URL: https://issues.apache.org/jira/browse/KAFKA-1533 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Attachments: KAFKA-1533.patch Occasionally, saw the test hang on tear down. The following is the stack trace. Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() [10e075000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732) at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91) at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716) at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416) at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184) at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50) at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243) at kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest
[ https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1533: - Attachment: KAFKA-1533.patch transient unit test failure in ProducerFailureHandlingTest -- Key: KAFKA-1533 URL: https://issues.apache.org/jira/browse/KAFKA-1533 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Attachments: KAFKA-1533.patch Occasionally, saw the test hang on tear down. The following is the stack trace. Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() [10e075000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at java.lang.Object.wait(Object.java:485) at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344) - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet) at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732) at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91) at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716) at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416) at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184) at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50) at kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243) at kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90) -- This message was sent by Atlassian JIRA (v6.2#6252)
[GitHub] kafka pull request: KAFKA-1414: Speedup broker startup after hard ...
GitHub user ataraxer opened a pull request: https://github.com/apache/kafka/pull/26 KAFKA-1414: Speedup broker startup after hard reset and shutdown This patch increases speed of both hard reset and shutdown by introducing `log.recovery.threads` and `log.shutdown.threads` properties, which allows to perform work required for them in parallel, grained by log directories. Best performance can be achieved by setting thread count to number of log directories, provided that they are located on dedicated drives. Although that option should be used with caution due to the possibility of native JVM out of memory error. Patch is compiled of changes proposed by Jay Kreps, Alexey Ozeritskiy, Dmitry Bugaychenko by Anton Karamanov. All tests are passing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ataraxer/kafka kafka-1414 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/26.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #26 commit e4a86709d07030c44f077ab20d4329ddb84c4aec Author: Anton Karamanov atara...@gmail.com Date: 2014-07-15T17:42:15Z KAFKA-1414 Speedup broker startup after hard reset and shutdown; patched by Jay Kreps, Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062680#comment-14062680 ] ASF GitHub Bot commented on KAFKA-1414: --- GitHub user ataraxer opened a pull request: https://github.com/apache/kafka/pull/26 KAFKA-1414: Speedup broker startup after hard reset and shutdown This patch increases speed of both hard reset and shutdown by introducing `log.recovery.threads` and `log.shutdown.threads` properties, which allows to perform work required for them in parallel, grained by log directories. Best performance can be achieved by setting thread count to number of log directories, provided that they are located on dedicated drives. Although that option should be used with caution due to the possibility of native JVM out of memory error. Patch is compiled of changes proposed by Jay Kreps, Alexey Ozeritskiy, Dmitry Bugaychenko by Anton Karamanov. All tests are passing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ataraxer/kafka kafka-1414 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/26.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #26 commit e4a86709d07030c44f077ab20d4329ddb84c4aec Author: Anton Karamanov atara...@gmail.com Date: 2014-07-15T17:42:15Z KAFKA-1414 Speedup broker startup after hard reset and shutdown; patched by Jay Kreps, Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062686#comment-14062686 ] Anton Karamanov commented on KAFKA-1414: Indeed, patch was malformed. I've fixed it, added shutdown parallelization, cleaned up a bit and created an [issue|https://github.com/apache/kafka/pull/26] on Kafka GitHub mirror for review of the changes summary. [https://github.com/apache/kafka/pull/26] Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062695#comment-14062695 ] Jun Rao commented on KAFKA-1414: Anton, Thanks for the patch. Could you add the patch as an attachment to this jira? This will take care of the Apache copyright stuff. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch Sure. [^0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch] Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.2, 0.9.0, 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1529) transient unit test failure in testAutoCreateAfterDeleteTopic
[ https://issues.apache.org/jira/browse/KAFKA-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1529. Resolution: Fixed Fix Version/s: 0.8.2 Thanks for the reviews. Committed to trunk. transient unit test failure in testAutoCreateAfterDeleteTopic - Key: KAFKA-1529 URL: https://issues.apache.org/jira/browse/KAFKA-1529 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Jun Rao Fix For: 0.8.2 Attachments: KAFKA-1529.patch Saw the following transient failure. kafka.admin.DeleteTopicTest testAutoCreateAfterDeleteTopic FAILED org.scalatest.junit.JUnitTestFailedError: Topic should have been auto created at org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101) at org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149) at org.scalatest.Assertions$class.fail(Assertions.scala:711) at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149) at kafka.admin.DeleteTopicTest.testAutoCreateAfterDeleteTopic(DeleteTopicTest.scala:222) -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23208: Patch for KAFKA-1512
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23208/#review47841 --- Ship it! Ship It! - Jun Rao On July 14, 2014, 8:28 p.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23208/ --- (Updated July 14, 2014, 8:28 p.m.) Review request for kafka. Bugs: KAFKA-1512 https://issues.apache.org/jira/browse/KAFKA-1512 Repository: kafka Description --- KAFKA-1512 Add per-ip connection limits. Diffs - core/src/main/scala/kafka/network/SocketServer.scala 4976d9c3a66bc965f5870a0736e21c7b32650bab core/src/main/scala/kafka/server/KafkaConfig.scala bb2e654b9bd63daa88a4de14131859b75c00e607 core/src/main/scala/kafka/server/KafkaServer.scala 5a56f57e36c4eab849a0b0e66f20f94690283af2 core/src/test/scala/unit/kafka/network/SocketServerTest.scala 1c492de8fde6582ca2342842a551739575d1f46c Diff: https://reviews.apache.org/r/23208/diff/ Testing --- Thanks, Jay Kreps
[jira] [Commented] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data
[ https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062973#comment-14062973 ] Jun Rao commented on KAFKA-1539: If flush is not guaranteed, will keeping two versions of the file help? At some point, we will have flushed both versions and neither one is guaranteed to persist. Due to OS caching Kafka might loose offset files which causes full reset of data Key: KAFKA-1539 URL: https://issues.apache.org/jira/browse/KAFKA-1539 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Seen this while testing power failure and disk failures. Due to chaching on OS level (eg. XFS can cache data for 30 seconds) after failure we got offset files of zero length. This dramatically slows down broker startup (it have to re-check all segments) and if high watermark offsets lost it simply erases all data and start recovering from other brokers (looks funny - first spending 2-3 hours re-checking logs and then deleting them all due to missing high watermark). Proposal: introduce offset files rotation. Keep two version of offset file, write to oldest, read from the newest valid. In this case we would be able to configure offset checkpoint time in a way that at least one file is alway flushed and valid. -- This message was sent by Atlassian JIRA (v6.2#6252)