[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anton Karamanov updated KAFKA-1414: --- Attachment: KAFKA-1414-rev4.patch I've updated the patch. * replaced {{log.io.parallelism}} with {{log.threads.per.data.dir}}; * added separate thread pools for each directory to simplify thread control; * rebased. [Patch v4|^KAFKA-1414-rev4.patch] Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071573#comment-14071573 ] Anton Karamanov commented on KAFKA-1414: [~junrao], yes, caches were dropped before each run. No compression is used. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiang Wu updated KAFKA-1555: Description: In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. was: In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its two behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases: 3.1 M0. Suppose C be killed. C will be out of ISR after replica.lag.time.max.ms. Then the producer publishes M messages to A and B. C restarts. C will join in ISR since it is M messages behind A and B. Before C replicates all messages, A is killed, and C becomes leader, then message loss happens. 3.2 M=0. In this case, when the producer publishes at a high speed, B and C will fail out of ISR; only A keeps receiving messages. Then A is killed. Either message loss or service blocking will happen, depending on whether unclean leader election is disabled (a new feature will be in 0.8.2, see jira KAFKA-1028). In summary, any existing configuration cannot satisfy the requirements. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071718#comment-14071718 ] Nicolae Marasoiu commented on KAFKA-1282: - [~junrao] You agree with the approach, do you? Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071792#comment-14071792 ] Jun Rao commented on KAFKA-1555: In case 3.1, when C restarts, the protocol is that C can only join ISR if it has received all messages up to the current high watermark. For example, let's assume that M is 10. Let's say A, B, C all have messages at offset 100 and all those messages are committed (therefore high watermark is at 100). Then C dies. After that, we commit 5 more messages with both A and B (high watermark is at 105). Now, C is restarted. C is actually not allowed to rejoin ISR until its log end offset has passed 105. This means that C must first fetch the 5 newly committed messages before being added to ISR. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector
[ https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071803#comment-14071803 ] Jun Rao commented on KAFKA-1282: Yes. Thanks for picking it up. Disconnect idle socket connection in Selector - Key: KAFKA-1282 URL: https://issues.apache.org/jira/browse/KAFKA-1282 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: nicu marasoiu Labels: newbie++ Fix For: 0.9.0 To reduce # socket connections, it would be useful for the new producer to close socket connections that are idle. We can introduce a new producer config for the idle time. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23705: Addressing Jun's comments
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/ --- (Updated July 23, 2014, 2:55 p.m.) Review request for kafka. Bugs: KAFKA-1192 https://issues.apache.org/jira/browse/KAFKA-1192 Repository: kafka Description --- Support given for custom deserialization of messages and keys Diffs (updated) - core/src/main/scala/kafka/tools/DumpLogSegments.scala 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f Diff: https://reviews.apache.org/r/23705/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1192) Enable DumpLogSegments tool to deserialize messages
[ https://issues.apache.org/jira/browse/KAFKA-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071812#comment-14071812 ] Manikumar Reddy commented on KAFKA-1192: Updated reviewboard https://reviews.apache.org/r/23705/diff/ against branch origin/trunk Enable DumpLogSegments tool to deserialize messages --- Key: KAFKA-1192 URL: https://issues.apache.org/jira/browse/KAFKA-1192 Project: Kafka Issue Type: Bug Components: tools Reporter: Guozhang Wang Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1192_2014-07-21_20:44:08.patch, KAFKA-1192_2014-07-23_20:23:08.patch Currently the DumpLogSegments tool reads the message payloads as strings by default, which will not display the messages correctly if the messages are deserialized with another class. By enablding deserialization with a customized class we can use this tool to debug more issues where I need to read the message content. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1192) Enable DumpLogSegments tool to deserialize messages
[ https://issues.apache.org/jira/browse/KAFKA-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1192: --- Attachment: KAFKA-1192_2014-07-23_20:23:08.patch Enable DumpLogSegments tool to deserialize messages --- Key: KAFKA-1192 URL: https://issues.apache.org/jira/browse/KAFKA-1192 Project: Kafka Issue Type: Bug Components: tools Reporter: Guozhang Wang Assignee: Guozhang Wang Labels: newbie Attachments: KAFKA-1192_2014-07-21_20:44:08.patch, KAFKA-1192_2014-07-23_20:23:08.patch Currently the DumpLogSegments tool reads the message payloads as strings by default, which will not display the messages correctly if the messages are deserialized with another class. By enablding deserialization with a customized class we can use this tool to debug more issues where I need to read the message content. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23705: Addressing Jun's comments
On July 21, 2014, 6:16 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/DumpLogSegments.scala, line 165 https://reviews.apache.org/r/23705/diff/3/?file=636778#file636778line165 Would key also be null possibly? Manikumar Reddy O wrote: This line is surrounded by if(msg.hasKey) check. msg.hasKey returns true for non-zero length keys. So we don't need any additional null check. Guozhang Wang wrote: I see. Instead of not printing anything if key is null, could you still print key: null to be aligned with other keyed messages. This will also make text editing for analysis like grep/awk easy. We need to use if(msg.hasKey) to differentiate keyed messages with non-keyed messages. Key will be null/zero length for all non-keyed messages. So it is not appropriate to print key: null for all non-keyed messages. If msg.hasKey true means it is a keyed-message, So it key will not be null. Are you saying that there can be a message with non-zero key length but with null reference? - Manikumar Reddy --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/#review48265 --- On July 23, 2014, 2:55 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/ --- (Updated July 23, 2014, 2:55 p.m.) Review request for kafka. Bugs: KAFKA-1192 https://issues.apache.org/jira/browse/KAFKA-1192 Repository: kafka Description --- Support given for custom deserialization of messages and keys Diffs - core/src/main/scala/kafka/tools/DumpLogSegments.scala 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f Diff: https://reviews.apache.org/r/23705/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071886#comment-14071886 ] Robert Withers commented on KAFKA-1555: --- In military satellite communications, when the statically assigned bandwidth is exceeded, a dynamic block is available from which to grab some leased bandwidth. If we apply this idea to data production into Kafka, though different due to data replication, could we have replicas to the replicas: replica shadows? Say we have 10 brokers with replication 2. So partition 1 has a leader on broker 1 and a follower in ISR on both broker 2 and 3. If we have replica shadows on brokers 4, 5 and 6 not in ISR but receiving msg production opportunistically, then we could have the option to dynamically assign a new follower into ISR if an ISR follower fails. - Rob provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23705: Addressing Jun's comments
On July 21, 2014, 6:16 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/tools/DumpLogSegments.scala, line 165 https://reviews.apache.org/r/23705/diff/3/?file=636778#file636778line165 Would key also be null possibly? Manikumar Reddy O wrote: This line is surrounded by if(msg.hasKey) check. msg.hasKey returns true for non-zero length keys. So we don't need any additional null check. Guozhang Wang wrote: I see. Instead of not printing anything if key is null, could you still print key: null to be aligned with other keyed messages. This will also make text editing for analysis like grep/awk easy. Manikumar Reddy O wrote: We need to use if(msg.hasKey) to differentiate keyed messages with non-keyed messages. Key will be null/zero length for all non-keyed messages. So it is not appropriate to print key: null for all non-keyed messages. If msg.hasKey true means it is a keyed-message, So it key will not be null. Are you saying that there can be a message with non-zero key length but with null reference? Any keyed messages' key should be not null, what I originally was suggesting is that we can still print key: null for non-keyed messages. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/#review48265 --- On July 23, 2014, 2:55 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23705/ --- (Updated July 23, 2014, 2:55 p.m.) Review request for kafka. Bugs: KAFKA-1192 https://issues.apache.org/jira/browse/KAFKA-1192 Repository: kafka Description --- Support given for custom deserialization of messages and keys Diffs - core/src/main/scala/kafka/tools/DumpLogSegments.scala 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f Diff: https://reviews.apache.org/r/23705/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible
[ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071919#comment-14071919 ] Simon Cooper commented on KAFKA-1150: - Yes, looks like that patch fixes the issue. Fetch on a replicated topic does not return as soon as possible --- Key: KAFKA-1150 URL: https://issues.apache.org/jira/browse/KAFKA-1150 Project: Kafka Issue Type: Bug Components: core, replication Affects Versions: 0.8.0 Reporter: Andrey Balmin Assignee: Neha Narkhede Attachments: Test.java I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. However, with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did: one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms. Here is what happens with no replication: Produced 1 key: key1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:53.823 Consumed up to 1 at time: 15:33:54.825 Produced 2 key: key2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:56.326 Consumed up to 2 at time: 15:33:57.328 Produced 3 key: key3 at time: 15:33:57.827 Consumed up to 3 at time: 15:33:57.827 The are no delays between the message being produced and consumed -- this is the behavior I expected. Here is the same test, but for a topic with two replicas: Consumed up to 0 at time: 15:50:29.575 Produced 1 key: key1 at time: 15:50:29.575 Consumed up to 1 at time: 15:50:30.577 Consumed up to 1 at time: 15:50:31.579 Consumed up to 1 at time: 15:50:32.078 Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 Consumed up to 2 at time: 15:50:34.081 Consumed up to 2 at time: 15:50:34.581 Produced 3 key: key3 at time: 15:50:34.581 Consumed up to 3 at time: 15:50:35.584 Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later. Below is the request log snippet for this part: Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away? [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576) (kafka.network.RequestChannel$) [2013-11-25 15:50:32,581] TRACE
Review Request 23858: Patch for KAFKA-1544
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23858/ --- Review request for kafka. Bugs: KAFKA-1544 https://issues.apache.org/jira/browse/KAFKA-1544 Repository: kafka Description --- Implemented Jun's solution to decrease LogCleaner shutdown time during shutdown Diffs - core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b Diff: https://reviews.apache.org/r/23858/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Commented] (KAFKA-1544) LogCleaner may take a long time to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071937#comment-14071937 ] Manikumar Reddy commented on KAFKA-1544: Created reviewboard https://reviews.apache.org/r/23858/diff/ against branch origin/trunk LogCleaner may take a long time to shutdown --- Key: KAFKA-1544 URL: https://issues.apache.org/jira/browse/KAFKA-1544 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1544.patch We have the following code in LogCleaner. Since the cleaner thread is shutdown w/o interrupt. If may take up to backoff time for the cleaner thread to detect the shutdown flag. private def cleanOrSleep() { cleanerManager.grabFilthiestLog() match { case None = // there are no cleanable logs, sleep a while time.sleep(config.backOffMs) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1544) LogCleaner may take a long time to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-1544: --- Attachment: KAFKA-1544.patch LogCleaner may take a long time to shutdown --- Key: KAFKA-1544 URL: https://issues.apache.org/jira/browse/KAFKA-1544 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1544.patch We have the following code in LogCleaner. Since the cleaner thread is shutdown w/o interrupt. If may take up to backoff time for the cleaner thread to detect the shutdown flag. private def cleanOrSleep() { cleanerManager.grabFilthiestLog() match { case None = // there are no cleanable logs, sleep a while time.sleep(config.backOffMs) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible
[ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071949#comment-14071949 ] Guozhang Wang commented on KAFKA-1150: -- Hi Simon, just wondering did you apply the patch and re-do the test ? Fetch on a replicated topic does not return as soon as possible --- Key: KAFKA-1150 URL: https://issues.apache.org/jira/browse/KAFKA-1150 Project: Kafka Issue Type: Bug Components: core, replication Affects Versions: 0.8.0 Reporter: Andrey Balmin Assignee: Neha Narkhede Attachments: Test.java I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. However, with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did: one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms. Here is what happens with no replication: Produced 1 key: key1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:52.822 Consumed up to 1 at time: 15:33:53.823 Consumed up to 1 at time: 15:33:54.825 Produced 2 key: key2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:55.324 Consumed up to 2 at time: 15:33:56.326 Consumed up to 2 at time: 15:33:57.328 Produced 3 key: key3 at time: 15:33:57.827 Consumed up to 3 at time: 15:33:57.827 The are no delays between the message being produced and consumed -- this is the behavior I expected. Here is the same test, but for a topic with two replicas: Consumed up to 0 at time: 15:50:29.575 Produced 1 key: key1 at time: 15:50:29.575 Consumed up to 1 at time: 15:50:30.577 Consumed up to 1 at time: 15:50:31.579 Consumed up to 1 at time: 15:50:32.078 Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 Consumed up to 2 at time: 15:50:34.081 Consumed up to 2 at time: 15:50:34.581 Produced 3 key: key3 at time: 15:50:34.581 Consumed up to 3 at time: 15:50:35.584 Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later. Below is the request log snippet for this part: Produced 2 key: key2 at time: 15:50:32.078 Consumed up to 2 at time: 15:50:33.081 You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away? [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$) [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger) [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576) (kafka.network.RequestChannel$)
Re: Review Request 23858: Patch for KAFKA-1544
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23858/#review48532 --- Is this better than using a condition? I was imagining replacing sleep with shutdownCondition.await(backoffMs, TimeUnit.MILLISECONDS) and then in shutdown() we call shutdownCondition.signal(). - Jay Kreps On July 23, 2014, 4:44 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23858/ --- (Updated July 23, 2014, 4:44 p.m.) Review request for kafka. Bugs: KAFKA-1544 https://issues.apache.org/jira/browse/KAFKA-1544 Repository: kafka Description --- Implemented Jun's solution to decrease LogCleaner shutdown time during shutdown Diffs - core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b Diff: https://reviews.apache.org/r/23858/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Assigned] (KAFKA-1544) LogCleaner may take a long time to shutdown
[ https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy reassigned KAFKA-1544: -- Assignee: Manikumar Reddy LogCleaner may take a long time to shutdown --- Key: KAFKA-1544 URL: https://issues.apache.org/jira/browse/KAFKA-1544 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Manikumar Reddy Labels: newbie Attachments: KAFKA-1544.patch We have the following code in LogCleaner. Since the cleaner thread is shutdown w/o interrupt. If may take up to backoff time for the cleaner thread to detect the shutdown flag. private def cleanOrSleep() { cleanerManager.grabFilthiestLog() match { case None = // there are no cleanable logs, sleep a while time.sleep(config.backOffMs) -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23858: Patch for KAFKA-1544
On July 23, 2014, 4:56 p.m., Jay Kreps wrote: Is this better than using a condition? I was imagining replacing sleep with shutdownCondition.await(backoffMs, TimeUnit.MILLISECONDS) and then in shutdown() we call shutdownCondition.signal(). Actually a condition isn't quite right as there is a subtle race between signal and await. Better would be the same approach but using CountDownLatch.await(backoff, TimeUnit.MS) - Jay --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23858/#review48532 --- On July 23, 2014, 4:44 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23858/ --- (Updated July 23, 2014, 4:44 p.m.) Review request for kafka. Bugs: KAFKA-1544 https://issues.apache.org/jira/browse/KAFKA-1544 Repository: kafka Description --- Implemented Jun's solution to decrease LogCleaner shutdown time during shutdown Diffs - core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b Diff: https://reviews.apache.org/r/23858/diff/ Testing --- Thanks, Manikumar Reddy O
question about message sets and produce requests/responses
Hello, I have a question about produce requests and responses. Suppose I create a produce request consisting of a single topic T which contains two message sets S1 and S2, with S1 preceding S2. In other words, the request looks like this: ( request header ( T ( S1, S2 ) ) ) Is it possible that I may get a response in which the order of the ACKs for the individual message sets is reversed? In other words, a response that looks like this: ( response header ( T ( ack for S2, ack for S1 ) ) ) I know that if a request contains multiple topics, then the topics may arrive out of order in the response. So I was wondering if ACKs for individual message sets within a topic can likewise arrive out of order in the response. Thanks, Dave
[jira] [Updated] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1510: -- Summary: Force offset commits when migrating consumer offsets from zookeeper to kafka (was: Force offset commits at a minimum interval when migrating consumer offsets from zookeeper to kafka) Force offset commits when migrating consumer offsets from zookeeper to kafka Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka
[ https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072437#comment-14072437 ] Joel Koshy commented on KAFKA-1510: --- [~nmarasoiu] do you think you will be able to take this on in the next couple days? Force offset commits when migrating consumer offsets from zookeeper to kafka Key: KAFKA-1510 URL: https://issues.apache.org/jira/browse/KAFKA-1510 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Joel Koshy Assignee: nicu marasoiu Labels: newbie Fix For: 0.8.2 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on dual-commit (i.e., the consumers will commit offsets to both zookeeper and kafka) in addition to setting offsets.storage to kafka. However, when we commit offsets we only commit offsets if they have changed (since the last commit). For low-volume topics or for topics that receive data in bursts offsets may not move for a long period of time. Therefore we may want to force the commit (even if offsets have not changed) when migrating (i.e., when dual-commit is enabled) - we can add a minimum interval threshold (say force commit after every 10 auto-commits) as well as on rebalance and shutdown. Also, I think it is safe to switch the default for offsets.storage from zookeeper to kafka and set the default to dual-commit (for people who have not migrated yet). We have deployed this to the largest consumers at linkedin and have not seen any issues so far (except for the migration caveat that this jira will resolve). -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23655: Patch for KAFKA-687
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23655/#review48454 --- core/src/main/scala/kafka/consumer/ConsumerConfig.scala https://reviews.apache.org/r/23655/#comment85261 Will do. core/src/main/scala/kafka/consumer/PartitionAllocator.scala https://reviews.apache.org/r/23655/#comment85262 Will fix. core/src/main/scala/kafka/consumer/PartitionAllocator.scala https://reviews.apache.org/r/23655/#comment85265 After thinking about it and discussing off-thread, we are thinking of: def allocate(partitions, consumerIdsPerTopic) At the same time, this code is not particularly complex so reusability is nice-have - worst case we can just adapt it to use in the new consumer. core/src/main/scala/kafka/consumer/PartitionAllocator.scala https://reviews.apache.org/r/23655/#comment85268 Thanks for catching that - will fix. core/src/main/scala/kafka/consumer/PartitionAllocator.scala https://reviews.apache.org/r/23655/#comment85269 After thinking about it I think we can actually merge the two and get rid of symmetric. So we will just have range and roundrobin (or maybe name it uniform). Will see how that turns out after I update the patch. core/src/main/scala/kafka/consumer/PartitionAllocator.scala https://reviews.apache.org/r/23655/#comment85074 This uses the operator defined in StringOps - so it probably translates to compareTo underneath the hood. That said, I can change it to compareTo just to be sure. core/src/main/scala/kafka/consumer/PartitionAllocator.scala https://reviews.apache.org/r/23655/#comment85075 Yeah I didn't bother with describing the old allocator as it was copied/moved from the consumer connector - I will add brief comments. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/23655/#comment85271 Will do. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/23655/#comment85077 I think it is very useful. While the offset checker is useful, it can take a while to run if you want to (say) get a count of all partitions owned by the consumer. An mbean on the other hand helps continuously monitor how even your partitions are distributed across all your consumers. core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala https://reviews.apache.org/r/23655/#comment85272 Will add it. - Joel Koshy On July 18, 2014, 10:57 p.m., Joel Koshy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23655/ --- (Updated July 18, 2014, 10:57 p.m.) Review request for kafka. Bugs: KAFKA-687 https://issues.apache.org/jira/browse/KAFKA-687 Repository: kafka Description --- The updated diff contains the mbeans for ownership counts. The comments in the code and the summary are pretty self-explanatory. Things to think about: * Naming - do symmetric/range/roundrobin make sense? * The comments briefly summarize why we needed a separate symmetric mode but let me know if that is unclear. * Rebalance time will be slightly higher - I have not measured (will do that) Diffs - core/src/main/scala/kafka/consumer/ConsumerConfig.scala 1cf2f62ba02e4aa66bfa7575865e5d57baf82212 core/src/main/scala/kafka/consumer/PartitionAllocator.scala PRE-CREATION core/src/main/scala/kafka/consumer/TopicCount.scala c79311097c5bd6718cb6a7fc403f804a1a939353 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 65f518d47c7555c42c4bff39c211814831f4b8b6 core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala a20ab90165cc7ebb1cf44078efe23a53938c8df6 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/23655/diff/ Testing --- * I did the unit tests (including the new one) as well as mirror maker system test suite with roundrobin. While this is being reviewed I will run the system tests with symmetric Thanks, Joel Koshy
Re: question about message sets and produce requests/responses
Not sure if I followed the question correctly, but S1 and S2 would be message-sets to different partitions of T - the producer request object contains a map of topic-partition - messageset). So each partition would get its own ack. The response object is just a map of partition - response status so the order of acks in the response is the response status map's order. Joel On Wed, Jul 23, 2014 at 02:09:40PM -0700, Dave Peterson wrote: Hello, I have a question about produce requests and responses. Suppose I create a produce request consisting of a single topic T which contains two message sets S1 and S2, with S1 preceding S2. In other words, the request looks like this: ( request header ( T ( S1, S2 ) ) ) Is it possible that I may get a response in which the order of the ACKs for the individual message sets is reversed? In other words, a response that looks like this: ( response header ( T ( ack for S2, ack for S1 ) ) ) I know that if a request contains multiple topics, then the topics may arrive out of order in the response. So I was wondering if ACKs for individual message sets within a topic can likewise arrive out of order in the response. Thanks, Dave
Re: question about message sets and produce requests/responses
Ok, that's all I need to know. Thanks! On Wed, Jul 23, 2014 at 4:33 PM, Joel Koshy jjkosh...@gmail.com wrote: Not sure if I followed the question correctly, but S1 and S2 would be message-sets to different partitions of T - the producer request object contains a map of topic-partition - messageset). So each partition would get its own ack. The response object is just a map of partition - response status so the order of acks in the response is the response status map's order. Joel On Wed, Jul 23, 2014 at 02:09:40PM -0700, Dave Peterson wrote: Hello, I have a question about produce requests and responses. Suppose I create a produce request consisting of a single topic T which contains two message sets S1 and S2, with S1 preceding S2. In other words, the request looks like this: ( request header ( T ( S1, S2 ) ) ) Is it possible that I may get a response in which the order of the ACKs for the individual message sets is reversed? In other words, a response that looks like this: ( response header ( T ( ack for S2, ack for S1 ) ) ) I know that if a request contains multiple topics, then the topics may arrive out of order in the response. So I was wondering if ACKs for individual message sets within a topic can likewise arrive out of order in the response. Thanks, Dave
Re: question about message sets and produce requests/responses
Also, to be clear, by ack I mean an error code as described here: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceResponse On Wed, Jul 23, 2014 at 04:33:29PM -0700, Joel Koshy wrote: Not sure if I followed the question correctly, but S1 and S2 would be message-sets to different partitions of T - the producer request object contains a map of topic-partition - messageset). So each partition would get its own ack. The response object is just a map of partition - response status so the order of acks in the response is the response status map's order. Joel On Wed, Jul 23, 2014 at 02:09:40PM -0700, Dave Peterson wrote: Hello, I have a question about produce requests and responses. Suppose I create a produce request consisting of a single topic T which contains two message sets S1 and S2, with S1 preceding S2. In other words, the request looks like this: ( request header ( T ( S1, S2 ) ) ) Is it possible that I may get a response in which the order of the ACKs for the individual message sets is reversed? In other words, a response that looks like this: ( response header ( T ( ack for S2, ack for S1 ) ) ) I know that if a request contains multiple topics, then the topics may arrive out of order in the response. So I was wondering if ACKs for individual message sets within a topic can likewise arrive out of order in the response. Thanks, Dave
Re: Review Request 23339: Patch for KAFKA-1507
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/ --- (Updated July 24, 2014, 12:07 a.m.) Review request for kafka. Bugs: KAFKA-1507 https://issues.apache.org/jira/browse/KAFKA-1507 Repository: kafka Description (updated) --- KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic unintentionally. Changes as per Jun's suggestions. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b22ca1dce65f665d84c2a980fd82f816e93d9960 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/client/ClientUtils.scala ce7ede3f6d60e756e252257bd8c6fedc21f21e1c core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b9e2bea7b442a19bcebd1b350d39541a8c9dd068 core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be14d494ae8c87f4443b52db69d273c20316 core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 core/src/main/scala/kafka/tools/GetOffsetShell.scala 9c6064e201eebbcd5b276a0dedd02937439edc94 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 36314f412a8281aece2789fd2b74a106b82c57d2 core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 1bf2667f47853585bc33ffb3e28256ec5f24ae84 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 35dc071b1056e775326981573c9618d8046e601d Diff: https://reviews.apache.org/r/23339/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1507) Using GetOffsetShell against non-existent topic creates the topic unintentionally
[ https://issues.apache.org/jira/browse/KAFKA-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1507: -- Attachment: KAFKA-1507_2014-07-23_17:07:20.patch Using GetOffsetShell against non-existent topic creates the topic unintentionally - Key: KAFKA-1507 URL: https://issues.apache.org/jira/browse/KAFKA-1507 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: centos Reporter: Luke Forehand Assignee: Sriharsha Chintalapani Priority: Minor Labels: newbie Attachments: KAFKA-1507.patch, KAFKA-1507_2014-07-22_10:27:45.patch, KAFKA-1507_2014-07-23_17:07:20.patch A typo in using GetOffsetShell command can cause a topic to be created which cannot be deleted (because deletion is still in progress) ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka10:9092,kafka11:9092,kafka12:9092,kafka13:9092 --topic typo --time 1 ./kafka-topics.sh --zookeeper stormqa1/kafka-prod --describe --topic typo Topic:typo PartitionCount:8ReplicationFactor:1 Configs: Topic: typo Partition: 0Leader: 10 Replicas: 10 Isr: 10 ... -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1507) Using GetOffsetShell against non-existent topic creates the topic unintentionally
[ https://issues.apache.org/jira/browse/KAFKA-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072603#comment-14072603 ] Sriharsha Chintalapani commented on KAFKA-1507: --- Updated reviewboard https://reviews.apache.org/r/23339/diff/ against branch origin/trunk Using GetOffsetShell against non-existent topic creates the topic unintentionally - Key: KAFKA-1507 URL: https://issues.apache.org/jira/browse/KAFKA-1507 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Environment: centos Reporter: Luke Forehand Assignee: Sriharsha Chintalapani Priority: Minor Labels: newbie Attachments: KAFKA-1507.patch, KAFKA-1507_2014-07-22_10:27:45.patch, KAFKA-1507_2014-07-23_17:07:20.patch A typo in using GetOffsetShell command can cause a topic to be created which cannot be deleted (because deletion is still in progress) ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka10:9092,kafka11:9092,kafka12:9092,kafka13:9092 --topic typo --time 1 ./kafka-topics.sh --zookeeper stormqa1/kafka-prod --describe --topic typo Topic:typo PartitionCount:8ReplicationFactor:1 Configs: Topic: typo Partition: 0Leader: 10 Replicas: 10 Isr: 10 ... -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1192) Enable DumpLogSegments tool to deserialize messages
[ https://issues.apache.org/jira/browse/KAFKA-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1192. Resolution: Fixed Fix Version/s: 0.8.2 Assignee: Manikumar Reddy (was: Guozhang Wang) Thanks for the patch. +1. Committed to trunk with the following minor change. * change the property name decoder-class to value-decoder-class Enable DumpLogSegments tool to deserialize messages --- Key: KAFKA-1192 URL: https://issues.apache.org/jira/browse/KAFKA-1192 Project: Kafka Issue Type: Bug Components: tools Reporter: Guozhang Wang Assignee: Manikumar Reddy Labels: newbie Fix For: 0.8.2 Attachments: KAFKA-1192_2014-07-21_20:44:08.patch, KAFKA-1192_2014-07-23_20:23:08.patch Currently the DumpLogSegments tool reads the message payloads as strings by default, which will not display the messages correctly if the messages are deserialized with another class. By enablding deserialization with a customized class we can use this tool to debug more issues where I need to read the message content. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23339: Patch for KAFKA-1507
On July 22, 2014, 6:52 p.m., Jun Rao wrote: core/src/main/scala/kafka/client/ClientUtils.scala, lines 86-87 https://reviews.apache.org/r/23339/diff/2/?file=639404#file639404line86 Perhaps this method can be named better since it may not just be for the consumer. Changed it to fetchTopicMetadataForNonProducer please let me know if you have a suggestion. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/#review48397 --- On July 24, 2014, 12:07 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/ --- (Updated July 24, 2014, 12:07 a.m.) Review request for kafka. Bugs: KAFKA-1507 https://issues.apache.org/jira/browse/KAFKA-1507 Repository: kafka Description --- KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic unintentionally. Changes as per Jun's suggestions. Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b22ca1dce65f665d84c2a980fd82f816e93d9960 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/client/ClientUtils.scala ce7ede3f6d60e756e252257bd8c6fedc21f21e1c core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b9e2bea7b442a19bcebd1b350d39541a8c9dd068 core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be14d494ae8c87f4443b52db69d273c20316 core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 core/src/main/scala/kafka/tools/GetOffsetShell.scala 9c6064e201eebbcd5b276a0dedd02937439edc94 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 36314f412a8281aece2789fd2b74a106b82c57d2 core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 1bf2667f47853585bc33ffb3e28256ec5f24ae84 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 35dc071b1056e775326981573c9618d8046e601d Diff: https://reviews.apache.org/r/23339/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23339: Patch for KAFKA-1507
On July 22, 2014, 6:52 p.m., Jun Rao wrote: Thanks for the patch and doing the rebasing. Some comments below. Thanks for the review. I've noticed NetworkClient.java always uses the latestVersion of an api. There doesn't look like the way I can send specific version of a request since the nextRequestHeader doesn't accept a version. Even if I am creating a version 0 of a request the requestheader will go with latest version of that particular request. Is there any reason to keep version as part of request header rather than inside of a request structure and we expect NetworkClient to use the latest version?. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/#review48397 --- On July 24, 2014, 12:07 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23339/ --- (Updated July 24, 2014, 12:07 a.m.) Review request for kafka. Bugs: KAFKA-1507 https://issues.apache.org/jira/browse/KAFKA-1507 Repository: kafka Description --- KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic unintentionally. Changes as per Jun's suggestions. Diffs - clients/src/main/java/org/apache/kafka/clients/NetworkClient.java d8f9ce663ee24d2b0852c974136741280c39f8f8 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 4aa5b01d611631db72df47d50bbe30edb8c478db clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b22ca1dce65f665d84c2a980fd82f816e93d9960 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/TopicMetadataRequest.scala 7dca09ce637a40e125de05703dc42e8b611971ac core/src/main/scala/kafka/client/ClientUtils.scala ce7ede3f6d60e756e252257bd8c6fedc21f21e1c core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b9e2bea7b442a19bcebd1b350d39541a8c9dd068 core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala b0b7be14d494ae8c87f4443b52db69d273c20316 core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 core/src/main/scala/kafka/tools/GetOffsetShell.scala 9c6064e201eebbcd5b276a0dedd02937439edc94 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 36314f412a8281aece2789fd2b74a106b82c57d2 core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 1bf2667f47853585bc33ffb3e28256ec5f24ae84 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 35dc071b1056e775326981573c9618d8046e601d Diff: https://reviews.apache.org/r/23339/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1549: - Assignee: nicu marasoiu (was: Jun Rao) dead brokers coming in the TopicMetadataResponse Key: KAFKA-1549 URL: https://issues.apache.org/jira/browse/KAFKA-1549 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Environment: trunk Reporter: nicu marasoiu Assignee: nicu marasoiu Attachments: kafka-1549__only_last_seen_alive_brokers_to_be_responded_part_of_the_topic_metadata_refres.patch, kafka-1549__without_implicit__only_last_seen_alive_brokers_to_be_responded_part_of_the_top.patch JunRao confirming my observation that brokers are only added to the metadataCache, never removed: The way that we update liveBrokers in MetadataCache.updateCache() doesn't seem right. We only add newly received live brokers to the list. However, there could be existing brokers in that list that are now dead. Those dead brokers shouldn't be returned to the clients. We should probably just take the new live broker list and cache it. -- This message was sent by Atlassian JIRA (v6.2#6252)
Build failed in Jenkins: Kafka-trunk #231
See https://builds.apache.org/job/Kafka-trunk/231/changes Changes: [junrao] kafka-1192; Enable DumpLogSegments tool to deserialize messages; patched by Manikumar Reddy; reviewed by Guozhang Wang and Jun Rao -- [...truncated 534 lines...] org.apache.kafka.common.record.RecordTest testChecksum[65] PASSED org.apache.kafka.common.record.RecordTest testEquality[65] PASSED org.apache.kafka.common.record.RecordTest testFields[66] PASSED org.apache.kafka.common.record.RecordTest testChecksum[66] PASSED org.apache.kafka.common.record.RecordTest testEquality[66] PASSED org.apache.kafka.common.record.RecordTest testFields[67] PASSED org.apache.kafka.common.record.RecordTest testChecksum[67] PASSED org.apache.kafka.common.record.RecordTest testEquality[67] PASSED org.apache.kafka.common.record.RecordTest testFields[68] PASSED org.apache.kafka.common.record.RecordTest testChecksum[68] PASSED org.apache.kafka.common.record.RecordTest testEquality[68] PASSED org.apache.kafka.common.record.RecordTest testFields[69] PASSED org.apache.kafka.common.record.RecordTest testChecksum[69] PASSED org.apache.kafka.common.record.RecordTest testEquality[69] PASSED org.apache.kafka.common.record.RecordTest testFields[70] PASSED org.apache.kafka.common.record.RecordTest testChecksum[70] PASSED org.apache.kafka.common.record.RecordTest testEquality[70] PASSED org.apache.kafka.common.record.RecordTest testFields[71] PASSED org.apache.kafka.common.record.RecordTest testChecksum[71] PASSED org.apache.kafka.common.record.RecordTest testEquality[71] PASSED org.apache.kafka.common.record.RecordTest testFields[72] PASSED org.apache.kafka.common.record.RecordTest testChecksum[72] PASSED org.apache.kafka.common.record.RecordTest testEquality[72] PASSED org.apache.kafka.common.record.RecordTest testFields[73] PASSED org.apache.kafka.common.record.RecordTest testChecksum[73] PASSED org.apache.kafka.common.record.RecordTest testEquality[73] PASSED org.apache.kafka.common.record.RecordTest testFields[74] PASSED org.apache.kafka.common.record.RecordTest testChecksum[74] PASSED org.apache.kafka.common.record.RecordTest testEquality[74] PASSED org.apache.kafka.common.record.RecordTest testFields[75] PASSED org.apache.kafka.common.record.RecordTest testChecksum[75] PASSED org.apache.kafka.common.record.RecordTest testEquality[75] PASSED org.apache.kafka.common.record.RecordTest testFields[76] PASSED org.apache.kafka.common.record.RecordTest testChecksum[76] PASSED org.apache.kafka.common.record.RecordTest testEquality[76] PASSED org.apache.kafka.common.record.RecordTest testFields[77] PASSED org.apache.kafka.common.record.RecordTest testChecksum[77] PASSED org.apache.kafka.common.record.RecordTest testEquality[77] PASSED org.apache.kafka.common.record.RecordTest testFields[78] PASSED org.apache.kafka.common.record.RecordTest testChecksum[78] PASSED org.apache.kafka.common.record.RecordTest testEquality[78] PASSED org.apache.kafka.common.record.RecordTest testFields[79] PASSED org.apache.kafka.common.record.RecordTest testChecksum[79] PASSED org.apache.kafka.common.record.RecordTest testEquality[79] PASSED org.apache.kafka.common.record.MemoryRecordsTest testIterator[0] PASSED org.apache.kafka.common.record.MemoryRecordsTest testIterator[1] PASSED org.apache.kafka.common.record.MemoryRecordsTest testIterator[2] PASSED org.apache.kafka.common.record.MemoryRecordsTest testIterator[3] PASSED org.apache.kafka.common.record.MemoryRecordsTest testIterator[4] PASSED org.apache.kafka.common.config.ConfigDefTest testBasicTypes PASSED org.apache.kafka.common.config.ConfigDefTest testInvalidDefault PASSED org.apache.kafka.common.config.ConfigDefTest testNullDefault PASSED org.apache.kafka.common.config.ConfigDefTest testMissingRequired PASSED org.apache.kafka.common.config.ConfigDefTest testDefinedTwice PASSED org.apache.kafka.common.config.ConfigDefTest testBadInputs PASSED :contrib:compileJava UP-TO-DATE :contrib:processResources UP-TO-DATE :contrib:classes UP-TO-DATE :contrib:compileTestJava UP-TO-DATE :contrib:processTestResources UP-TO-DATE :contrib:testClasses UP-TO-DATE :contrib:test UP-TO-DATE :clients:jar :core:compileJava UP-TO-DATE :core:compileScala https://builds.apache.org/job/Kafka-trunk/ws/core/src/main/scala/kafka/admin/AdminUtils.scala:253: non-variable type argument String in type pattern scala.collection.Map[String,_] is unchecked since it is eliminated by erasure case Some(map: Map[String, _]) = ^ https://builds.apache.org/job/Kafka-trunk/ws/core/src/main/scala/kafka/admin/AdminUtils.scala:256: non-variable type argument String in type pattern scala.collection.Map[String,String] is unchecked since it is eliminated by erasure case Some(config: Map[String, String]) = ^
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48598 --- core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment85309 any reason you want to move this here from the beginning of the class? core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment85311 uses it *as* the brokerId core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment85319 If there is another exception (other than FileNotFound which we should ignore) while reading the file, it will be good to log a meaningful error before rethrowing it. core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment85314 we should also fsync the file to disk and close out the file output stream core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/23702/#comment85320 It will be good to refer to 1000 as a static variable inside KafkaConfig eg. KafkaConfig.MinimumBrokerSequenceId core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/23702/#comment85322 this logic should ideally be encapsulated under updatePersistentSequentialPath inside ZkUtils. core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/23702/#comment85323 Shouldn't we be using createPersistentSequential? core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala https://reviews.apache.org/r/23702/#comment85325 can we add more test cases that cover all possible combinations of using the config based broker id and the zk based one? It will also help to explicitly separate some of these test cases. - Neha Narkhede On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 22, 2014, 6:34 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1421) Error: Could not find or load main class kafka.perf.SimpleConsumerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072677#comment-14072677 ] Jun Rao commented on KAFKA-1421: You will need to include kafka-perf jar too. Error: Could not find or load main class kafka.perf.SimpleConsumerPerformance - Key: KAFKA-1421 URL: https://issues.apache.org/jira/browse/KAFKA-1421 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.0 Reporter: Daryl Erwin Assignee: Jun Rao Priority: Minor Labels: performance Original Estimate: 24h Remaining Estimate: 24h Did a base install with sbt update sbt package I am able to successfully run the console-producer, consumer. I am trying to run the perf scripts (./kafka-simple-consumer-perf-test.sh) but it appears the jar file is not generated. Are the steps that I need to run to create this jar file? .. same as: Error: Could not find or load main class kafka.perf.ProducerPerformance -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23702: Patch for KAFKA-1070
On July 24, 2014, 1:21 a.m., Neha Narkhede wrote: core/src/main/scala/kafka/utils/ZkUtils.scala, line 718 https://reviews.apache.org/r/23702/diff/3/?file=639483#file639483line718 Shouldn't we be using createPersistentSequential? if I used createPersistentSequential it keeps creating a new node in zk with a sequential number. It will leave too many nodes in zk depending on the number of brokers that the user run. Instead I used a path each time updating with a expected version -1 and using returned stat.getVersion for the sequential number. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48598 --- On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 22, 2014, 6:34 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23702: Patch for KAFKA-1070
On July 24, 2014, 1:21 a.m., Neha Narkhede wrote: core/src/main/scala/kafka/server/KafkaServer.scala, line 96 https://reviews.apache.org/r/23702/diff/3/?file=639482#file639482line96 any reason you want to move this here from the beginning of the class? if the user didn't provide a brokerId in server.properties I am setting config.brokerId=-1 in KafkaConfig and in KafkaServer I check and generate a new sequence id from zookeeper assign it to config.brokerId. If this line at the start of the class it won't get the newly generated sequenceId. ex: [Kafka Server -1], started (kafka.server.KafkaServer) New leader is 1001 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [Kafka Server -1], shut down completed (kafka.server.KafkaServer) - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48598 --- On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 22, 2014, 6:34 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 23702: Patch for KAFKA-1070
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48608 --- core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment85328 Does this cover the case for metaBrokerIdSet.size == 1 and brokerId 0? In this case, we should use the brokerId in the metadata file. core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/23702/#comment85327 This probably only needs to be done after line 365? - Jun Rao On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 22, 2014, 6:34 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072690#comment-14072690 ] Jun Rao commented on KAFKA-1555: Rob, Is that any different from just running with a higher replication factor? provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23702: Patch for KAFKA-1070
On July 24, 2014, 1:41 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaServer.scala, lines 370-371 https://reviews.apache.org/r/23702/diff/3/?file=639482#file639482line370 This probably only needs to be done after line 365? I was thinking of a case where a user has one logDir initially and we stored the brokerId in that dir and later if the user added few dirs to the config and to make sure that we copied the meta.properties in all of those new dirs I added it at the bottom. I am not sure if that case exists please let me know. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/#review48608 --- On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23702/ --- (Updated July 22, 2014, 6:34 p.m.) Review request for kafka. Bugs: KAFKA-1070 https://issues.apache.org/jira/browse/KAFKA-1070 Repository: kafka Description --- KAFKA-1070. Auto assign node id. KAFKA-1070. Auto-assign node id. Diffs - core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaConfig.scala 50b09edb73af1b45f88f919ac8c46ae056878c8e core/src/main/scala/kafka/server/KafkaServer.scala def1dc2a5818d45d9ee0881137ff989cec4eb9b1 core/src/main/scala/kafka/utils/ZkUtils.scala dcdc1ce2b02c996294e19cf480736106aaf29511 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala 3faa884f8eb83c7c00baab416d0acfb488dc39c1 Diff: https://reviews.apache.org/r/23702/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: [jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
Hi Jun, Yes, that's what I am thinking. It allows maintaining a pool of offline, but current and consistent replica shadows, ready to be flipped into ISR. Due to their being out of ISR prevents them being counted in quorum, yet ready to go, so no impact to the producers. Looking at it through algebra sunglasses means we would establish a secondary space of replication but with a different dimensional projection into the parent meta space, which is the current ISR replication space, itself projected into consumers' meta space as the leader partition. I am thinking it adds another layer of depth, to shore the defenses. - Rob On Jul 23, 2014, at 7:46 PM, Jun Rao (JIRA) j...@apache.org wrote: [ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072690#comment-14072690 ] Jun Rao commented on KAFKA-1555: Rob, Is that any different from just running with a higher replication factor? provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072700#comment-14072700 ] Robert Withers commented on KAFKA-1555: --- Hi Jun, Yes, that's what I am thinking. It allows maintaining a pool of offline, but current and consistent replica shadows, ready to be flipped into ISR. Due to their being out of ISR prevents them being counted in quorum, yet ready to go, so no impact to the producers. Looking at it through algebra sunglasses means we would establish a secondary space of replication but with a different dimensional projection into the parent meta space, which is the current ISR replication space, itself projected into consumers' meta space as the leader partition. I am thinking it adds another layer of depth, to shore the defenses. - Rob provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072756#comment-14072756 ] Jun Rao commented on KAFKA-1555: Rob, In order for those shadow replicas to be ready to be flipped into ISR, they have to be very in-sync with the leader. If that's the case, even if you count them in acking the producer, it won't delay the produce request much. So, I still don't see a clear benefit of this approach vs just having a larger replication factor. I am also not sure how easy the implementation will be. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23474: Patch for KAFKA-1483
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23474/#review48624 --- Ship it! Ship It! - Neha Narkhede On July 16, 2014, 6:07 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23474/ --- (Updated July 16, 2014, 6:07 p.m.) Review request for kafka. Bugs: KAFKA-1483 https://issues.apache.org/jira/browse/KAFKA-1483 Repository: kafka Description --- KAFKA-1483. Split Brain about Leader Partitions. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 Diff: https://reviews.apache.org/r/23474/diff/ Testing --- Thanks, Sriharsha Chintalapani