Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure
Hey Gouzhang, Yes, I spotted that commit and had updated my working copy to that, but the test is still hanging. If it's any help, the test looks like it's doing _something_ as the CPU usage ramps up significantly and stays there until I kill the process. /Dave On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Dave, KAFKA-1533 has just been committed targeting at this issue. Did your update on trunk include this commit? commit ff05e9b3616a222e29a42f6e8fdf41945a417f41 Author: Guozhang Wang guw...@linkedin.com Date: Tue Jul 22 14:14:19 2014 -0700 Guozhang kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com wrote: Hey all, I'm trying my hand at writing some patches for open issues, but I'm running into issues with running gradlew test. It hangs every time when trying to run testBrokerFailure in the ProducerFailureHangingTest suite. It was working for a time, but I updated to trunk HEAD and it's no longer working. I'm running on OSX with JDK 1.6.0_65. I tried increasing the HeapSize for the test target, but hit the same issue. Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb Would appreciate any help. Regards, Dave -- -- Guozhang
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074119#comment-14074119 ] Dmitry Bugaychenko commented on KAFKA-1414: --- I'm not a scala expert, but the threading schema (at least how I understood it) looks fine. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074122#comment-14074122 ] Joe Stein commented on KAFKA-1555: -- The simplification and guarantee provided with ack=-1 works properly if the client implements it in a way that works best for them. Kafka guarantees durability on R-1 failures (with replication factor R) without message loss if you use ack=-1 ... so... that is the foundation we build upon. When we (the client application f.k.a. the producer) do not want message loss then setup at least 3 data centers (since you need a majority of data centers for a Zookeeper ensemble to work which requires at least three. Then you write to two topics each with ack=-1. The broker leaders for the partitions of each of these topics that you are writing to MUST be in different data centers. After sync writing to them them join their result (I say join since your specific implementation may use another function name like subscribe or onComplete or something) then you get... a durable write from the client perspective without loosing data after it has had a successful transaction. If that doesn't work for you then you must accept data loss as a failure of at least one data center (which has nothing todo with Kafka). This can be (should be) further extended to make sure that at least two racks in each data center get the data. If you are not concerned with being able to sustain a loss of 1 data center + a loss of 1 rack in another available data center (at the same time) then this is not a solution for you. Now, all of what I just said is manual (to make sure replicas and partitions are in different data centers and racks) and static but it works with Kafka tools out of the box and some (lots) of software engineering power (scripting effort with a couple of late nights burning the midnight oil). As far as producing this to multiple topics there are lots of ways to-do this on the client side running in parallel without much (if any) latency cost (with a little bit more software engineering). You can use Akka or anything else where you can get a future after sending off multiple events and then subscribing to them onComplete before deciding (returning to your caller or fulfilling the promise) that the message has been written. Hopefully this make sense and I appreciate that not all (even most) use cases need this multi data center + multi rack type of sustainability but it works with Kafka if you go by what Kafka guarantees without trying to change it unnecessarily. If there are defects we should fix them but going up and down this thread I am getting a bit lost in what we should be doing (if anything) to the current code now. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074122#comment-14074122 ] Joe Stein edited comment on KAFKA-1555 at 7/25/14 6:11 AM: --- The simplification and guarantee provided with ack=-1 works properly if the client implements it in a way that works best for them. Kafka guarantees durability on R-1 failures (with replication factor R) without message loss if you use ack=-1 ... so... that is the foundation we build upon. When we (the client application f.k.a. the producer) do not want message loss then setup at least 3 data centers (since you need a majority of data centers for a Zookeeper ensemble to work which requires at least three). Then you write to two topics each with ack=-1. The broker leaders for the partitions of each of these topics that you are writing to MUST be in different data centers. After sync writing to them them join their result (I say join since your specific implementation may use another function name like subscribe or onComplete or something) then you get... a durable write from the client perspective without loosing data after it has had a successful transaction. If that doesn't work for you then you must accept data loss as a failure of at least one data center (which has nothing todo with Kafka). This can be (should be) further extended to make sure that at least two racks in each data center get the data. If you are not concerned with being able to sustain a loss of 1 data center + a loss of 1 rack in another available data center (at the same time) then this is not a solution for you. Now, all of what I just said is manual (to make sure replicas and partitions are in different data centers and racks) and static but it works with Kafka tools out of the box and some (lots) of software engineering power (scripting effort with a couple of late nights burning the midnight oil). As far as producing this to multiple topics there are lots of ways to-do this on the client side running in parallel without much (if any) latency cost (with a little bit more software engineering). You can use Akka or anything else where you can get a future after sending off multiple events and then subscribing to them onComplete before deciding (returning to your caller or fulfilling the promise) that the message has been written. Hopefully this make sense and I appreciate that not all (even most) use cases need this multi data center + multi rack type of sustainability but it works with Kafka if you go by what Kafka guarantees without trying to change it unnecessarily. If there are defects we should fix them but going up and down this thread I am getting a bit lost in what we should be doing (if anything) to the current code now. was (Author: joestein): The simplification and guarantee provided with ack=-1 works properly if the client implements it in a way that works best for them. Kafka guarantees durability on R-1 failures (with replication factor R) without message loss if you use ack=-1 ... so... that is the foundation we build upon. When we (the client application f.k.a. the producer) do not want message loss then setup at least 3 data centers (since you need a majority of data centers for a Zookeeper ensemble to work which requires at least three. Then you write to two topics each with ack=-1. The broker leaders for the partitions of each of these topics that you are writing to MUST be in different data centers. After sync writing to them them join their result (I say join since your specific implementation may use another function name like subscribe or onComplete or something) then you get... a durable write from the client perspective without loosing data after it has had a successful transaction. If that doesn't work for you then you must accept data loss as a failure of at least one data center (which has nothing todo with Kafka). This can be (should be) further extended to make sure that at least two racks in each data center get the data. If you are not concerned with being able to sustain a loss of 1 data center + a loss of 1 rack in another available data center (at the same time) then this is not a solution for you. Now, all of what I just said is manual (to make sure replicas and partitions are in different data centers and racks) and static but it works with Kafka tools out of the box and some (lots) of software engineering power (scripting effort with a couple of late nights burning the midnight oil). As far as producing this to multiple topics there are lots of ways to-do this on the client side running in parallel without much (if any) latency cost (with a little bit more software engineering). You can use Akka or anything else where you can get a future after sending off multiple events and then subscribing to them onComplete before deciding
[jira] [Updated] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Corley updated KAFKA-1542: Attachment: KAFKA-1542.patch Attaching patch normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Corley updated KAFKA-1542: Status: Patch Available (was: Open) normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1502) source jar is empty
[ https://issues.apache.org/jira/browse/KAFKA-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074158#comment-14074158 ] David Corley commented on KAFKA-1502: - Can't reproduce this on trunk or on the 0.8.1 branch. Seems to be resolved. source jar is empty --- Key: KAFKA-1502 URL: https://issues.apache.org/jira/browse/KAFKA-1502 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Joel Koshy Labels: newbie When doing a local publish, kafka_2.8.0-0.8.1.1-sources.jar only contains the following files. META-INF/ META-INF/MANIFEST.MF LICENSE NOTICE -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074275#comment-14074275 ] Daniel Compton commented on KAFKA-960: -- What are the plans for Metrics 3.0? I was thinking at taking a crack at this but it looks like there might be a private fork where this has been done already? Also, because it will change the name of the Mbeans to stop wrapping them in quotes, would you want this to be in 0.9 or is a breaking change like this OK in 0.8.2? Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074343#comment-14074343 ] Jiang Wu commented on KAFKA-1555: - Jun, We may need to tolerate more than R-1 failures that are spread over a long time period, for example, all brokers are restarted in turn in 20 minutes. Our concern with acks=-1 is that, it can happen that only 1 broker is in ISR, therefore a published message may have only one copy. Lossing that copy results in the message loss permanently. For ack=2, I'm considering two approaches to avoid a lagged replica to be elected as new leader: 1. set replica.lag.max.messages=0. When new leader election happens, first update the ISR list. The replica without the lastest message will be removed out of ISR and has not chance to be new leader. 2. When new leader election happens, choose the replica with more messages as the new leader. Could you comment on the approaches? Regards, Jiang provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074460#comment-14074460 ] Jun Rao commented on KAFKA-1555: Jiang, If you use controlled shutdown, we will make sure that there is at least another replica in ISR before shutting the broker. Two comments on your proposal. 1. In some of the corner cases, the replica with the most message shouldn't be the leader. Consider the following example. Suppose that we have 2 replicas A and B. At some point the messages in the logs look like the following. offset A B 0 m1 m1 1 m2 2 m3 Suppose that m1 is committed and m2 and m3 are not. At this point, A dies. B takes over as the new leader. B then accepts and commits message m4. The log will now like the following. offset A B 0 m1 m1 1 m2 m4 2 m3 At this point, let's say both A and B go down and come up again. Now, you can't pick replica A as the new leader even though it has more message. Otherwise, you will lose the committed message m4. Note that it's ok to lose message m2 and m3 since they are never committed. The correct way to pick the longest log is a bit more involved and would require knowing the leader generation id of each message. For details, see KAFKA-1211. 2. A second thing that you need to worry about is coordination. Suppose that we have the same above scenario when both A and B die. If B comes up first, we actually can elect B as the new leader without waiting for A. However, if you need to compare the length of the log, you need to wait for A to come back since A could be having the longest log. Then the question is that if A never comes back, will you block the writes forever or do you risk losing committed messages. ISR is sort of our way for remembering the new leader candidates. Therefore, without coordination among replicas, we can easily figure out who can be the new leader. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure
Could you get a thread dump when it hangs? On Thu, Jul 24, 2014 at 11:01 PM, David Corley davidcor...@gmail.com wrote: Hey Gouzhang, Yes, I spotted that commit and had updated my working copy to that, but the test is still hanging. If it's any help, the test looks like it's doing _something_ as the CPU usage ramps up significantly and stays there until I kill the process. /Dave On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Dave, KAFKA-1533 has just been committed targeting at this issue. Did your update on trunk include this commit? commit ff05e9b3616a222e29a42f6e8fdf41945a417f41 Author: Guozhang Wang guw...@linkedin.com Date: Tue Jul 22 14:14:19 2014 -0700 Guozhang kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com wrote: Hey all, I'm trying my hand at writing some patches for open issues, but I'm running into issues with running gradlew test. It hangs every time when trying to run testBrokerFailure in the ProducerFailureHangingTest suite. It was working for a time, but I updated to trunk HEAD and it's no longer working. I'm running on OSX with JDK 1.6.0_65. I tried increasing the HeapSize for the test target, but hit the same issue. Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb Would appreciate any help. Regards, Dave -- -- Guozhang -- -- Guozhang
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074465#comment-14074465 ] Jun Rao commented on KAFKA-960: --- This is probably going to affect many existing clients. The new clients will no longer depend on metrics. Perhaps we can defer the upgrade until then. What are the main benefits from upgrading to metrics 3.0? Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074474#comment-14074474 ] Guozhang Wang commented on KAFKA-1542: -- Thanks for the patch. LGTM. normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication
[ https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074513#comment-14074513 ] Joe Stein commented on KAFKA-1477: -- [~junrao] While I think it make sense to not try to add too much to the existing producer and consumer they are being used in the wild quite heavily right now. There is real need for this feature (besides Salesforce others are doing security with Kafka outside of the upstream project and starting to-do things differently enough that the solution is becoming inconsistent) and a lot of this is really about the changes to the broker... Once the patch is committed then any client (Go, Python, etc) can implement SSL with the Broker. The existing producer/consumer can also be used out of the box for folks that want to flip the feature on now. Waiting for the new producer won't work IMHO from a product perspective because then we only delivered half the solution (the other half being consuming over SSL). We can get the feature in now for something that folks are using already (this is an upgrade to Raja's original work to latest trunk). I understand this may extend the life of the existing producer/consumer a little bit until the new producer and consumer support the feature too. I think the reality though is that existing producer / consumer will be around as folks will be slow to upgrade though I do expect them to eventually move to shiny new once it becomes less new or they have a real need for the change (some do not). If there are no objections (-1 s) to the patch Ivan posted he should have this done finalized within the next 2-3 weeks (minor things). Once done with review and testing I expect to +1 and commit the patch. I also appreciate the extent of the change and also appreciate any effort that can be made to help vet this change to make sure Kafka keeps doing everything that everyone expects Kafka to be doing when the feature is not on. add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication -- Key: KAFKA-1477 URL: https://issues.apache.org/jira/browse/KAFKA-1477 Project: Kafka Issue Type: New Feature Reporter: Joe Stein Assignee: Ivan Lyutov Fix For: 0.8.2 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, KAFKA-1477_2014-06-03_13:46:17.patch -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074522#comment-14074522 ] Jiang Wu commented on KAFKA-1555: - Jun, In your example, what are tthe replication and acks values? When you say m1 is committed, dies that mean the producer's send(m1) returns successfully? Could you also comment on approach 1: 1. set replica.lag.max.messages=0. When new leader election happens, first update the ISR list. The replica without the lastest message will be removed out of ISR and has not chance to be new leader. Thanks, Jiang provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074532#comment-14074532 ] Jun Rao commented on KAFKA-1555: In our current design, a message is considered committed if it's written to all replicas in ISR. If ack=-1 is used, the producer will only be acked when the message is committed. It seems that approach 1 also needs coordination among multiple replicas during leader election. So both my previous comments also apply. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1123) Broker IPv6 addresses parsed incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-1123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1123: --- Priority: Major (was: Minor) Affects Version/s: (was: 0.8.0) 0.8.2 Labels: newbie (was: ) We probably should add the IPV6 support asap. It seems that for a given string of host:port, we can just look up for the last : to separate the host and port. This should cover both IPV4 and IPV6. We will need to do that for both the old and the new producer. Broker IPv6 addresses parsed incorrectly Key: KAFKA-1123 URL: https://issues.apache.org/jira/browse/KAFKA-1123 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2 Reporter: Andrew Otto Assignee: Jun Rao Labels: newbie It seems that broker addresses are parsed incorrectly when IPv6 addresses are supplied. IPv6 addresses have colons in them, and Kafka seems to be interpreting the first : as the address:port separator. I have only tried this with the console-producer --broker-list option, so I don't know if this affects anything deeper than the CLI. -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Review Request 23767: Fix KAFKA-1430
On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, lines 120-130 https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line120 We may not be able to remove the readlock here. The issue is that this method accesses not only leaderReplicaIdOpt, but other internal data structures like assignedReplicaMap. Without the lock, the read from the Map could fail even it's being concurrently modified. In general, we can get away with the lock only if we want to read a single internal value. Perhaps we can introduce another function isLeaderLocal() that returns a boolean. This method will only need to access leaderReplicaIdOpt. Then all callers will first call leaderReplicaIfLocal and hold onto the leader replica. They can then use isLeaderLocal to see if the leader has changed subsequently. Would this be the same as what we did now? In getReplica(localBrokerId), if the replica map has changed and the id is no longer in the map, it will return None; if the replica map has changed and the id is no longer the leader, it is just the same as when we callled leaderReplicaIfLocal() to get the leader, and then immediately the leader changed? On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, lines 268-269 https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line268 Instead of using negation, could we do leaderHW.precedes(replica.logEndOffset)? Also, could we move to the previous line? We can not simply leaderHW.precedes(replica.logEndOffset) since we need to consider =, will use messagesDiff instead. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Replica.scala, lines 33-36 https://reviews.apache.org/r/23767/diff/2/?file=637562#file637562line33 Should we rename highWatermarkValue and logEndOffsetValue to highWatermarkMetadata and logEndOffsetMetadata? Fixed. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, lines 372-373 https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line372 Move to previous line? Fixed. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, lines 324-325 https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line324 Perhaps we could create TopicPartionRequestKey just once? Fixed. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, lines 296-297 https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line296 This seems to be an existing problem. If ack=-1, a safer check is HW = requiredOffset. This way, we will be sure that if ISR expands, the acked message is guaranteed to be in the replicas newly added to ISR. The following is an example that shows the issue with the existing check. Suppose that all replicas in ISR are at offset 10, but HW is still at 8 and we call checkEnoughReplicasReachOffset on offset 9. The check will be satisfied and the message is considered committed. We will be updating HW to 10 pretty soon. However, before that happens, another replica whose LEO is at 8 can be added to ISR. This replica won't have message 9, which is acked as committed. Fixed. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, lines 194-195 https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line194 It's possible that we get an UnknownOffsetMetadata during the conversion. In this case, we probably should set HW to logEndOffset. Fixed. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/log/Log.scala, lines 173-174 https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line173 It would be a bit confusing to reason about the consistency btw nextOffset and nextOffsetMetadata since they are not updated atomically. Could we just keep nextOffsetMetadata? Good point. Fixed. On July 22, 2014, 5:02 p.m., Jun Rao wrote: core/src/main/scala/kafka/log/Log.scala, lines 429-431 https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line429 Perhaps we can add a bit details to the comment. So, we are in the situation that the startOffset is in range, but we can't find a single message whose offset is = startOffset. One possibility seems to be that all messages after startOffset have been deleted due to compaction. Is that the only case? Let's describe all situations when this can happen. Great catch. I was originally thinking this can only happen when the regular consumer's fetch request is issued with a max offset set to the HW, and all messages beyond start offset is also beyond max offset. But later I realized in this case LogSegment.read() will not actually return
Re: Review Request 23767: Fix KAFKA-1430
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/23767/ --- (Updated July 25, 2014, 4:52 p.m.) Review request for kafka. Bugs: KAFKA-1430 https://issues.apache.org/jira/browse/KAFKA-1430 Repository: kafka Description (updated) --- Address Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b Diff: https://reviews.apache.org/r/23767/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-1430) Purgatory redesign
[ https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1430: - Attachment: KAFKA-1430_2014-07-25_09:52:43.patch Purgatory redesign -- Key: KAFKA-1430 URL: https://issues.apache.org/jira/browse/KAFKA-1430 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Guozhang Wang Attachments: KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430_2014-06-05_17:07:37.patch, KAFKA-1430_2014-06-05_17:13:13.patch, KAFKA-1430_2014-06-05_17:40:53.patch, KAFKA-1430_2014-06-10_11:22:06.patch, KAFKA-1430_2014-06-10_11:26:02.patch, KAFKA-1430_2014-07-11_10:59:13.patch, KAFKA-1430_2014-07-21_12:53:39.patch, KAFKA-1430_2014-07-25_09:52:43.patch We have seen 2 main issues with the Purgatory. 1. There is no atomic checkAndWatch functionality. So, a client typically first checks whether a request is satisfied or not and then register the watcher. However, by the time the watcher is registered, the registered item could already be satisfied. This item won't be satisfied until the next update happens or the delayed time expires, which means the watched item could be delayed. 2. FetchRequestPurgatory doesn't quite work. This is because the current design tries to incrementally maintain the accumulated bytes ready for fetch. However, this is difficult since the right time to check whether a fetch (for regular consumer) request is satisfied is when the high watermark moves. At that point, it's hard to figure out how many bytes we should incrementally add to each pending fetch request. The problem has been reported in KAFKA-1150 and KAFKA-703. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1430) Purgatory redesign
[ https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074567#comment-14074567 ] Guozhang Wang commented on KAFKA-1430: -- Updated reviewboard https://reviews.apache.org/r/23767/ against branch origin/trunk Purgatory redesign -- Key: KAFKA-1430 URL: https://issues.apache.org/jira/browse/KAFKA-1430 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 0.8.2 Reporter: Jun Rao Assignee: Guozhang Wang Attachments: KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430_2014-06-05_17:07:37.patch, KAFKA-1430_2014-06-05_17:13:13.patch, KAFKA-1430_2014-06-05_17:40:53.patch, KAFKA-1430_2014-06-10_11:22:06.patch, KAFKA-1430_2014-06-10_11:26:02.patch, KAFKA-1430_2014-07-11_10:59:13.patch, KAFKA-1430_2014-07-21_12:53:39.patch, KAFKA-1430_2014-07-25_09:52:43.patch We have seen 2 main issues with the Purgatory. 1. There is no atomic checkAndWatch functionality. So, a client typically first checks whether a request is satisfied or not and then register the watcher. However, by the time the watcher is registered, the registered item could already be satisfied. This item won't be satisfied until the next update happens or the delayed time expires, which means the watched item could be delayed. 2. FetchRequestPurgatory doesn't quite work. This is because the current design tries to incrementally maintain the accumulated bytes ready for fetch. However, this is difficult since the right time to check whether a fetch (for regular consumer) request is satisfied is when the high watermark moves. At that point, it's hard to figure out how many bytes we should incrementally add to each pending fetch request. The problem has been reported in KAFKA-1150 and KAFKA-703. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074579#comment-14074579 ] Jun Rao commented on KAFKA-1542: Thanks for the patch. Perhaps we can just log the inetaddress. Getting the hostname on the client may not always be possible. normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074582#comment-14074582 ] David Corley commented on KAFKA-1542: - Sure. Will revise the patch to get the address instead. normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074625#comment-14074625 ] Jun Rao commented on KAFKA-1414: Thanks for patch v4. Looks good. Some minor comments below. 50. LogManager.checkpointLogsInDir(dir: File): I am not sure that we need a for loop there since we are just writing a single checkpoint file. 51. ReplicaMangerTest: unused imports 52. KafkaConfig: Would num.recovery.threads.per.data.dir be better than log.threads.per.data.dir? In the comment, we can explain that this is only used during startup and shutdown of the log. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure
Sure. Attaching here. On Fri, Jul 25, 2014 at 4:16 PM, Guozhang Wang wangg...@gmail.com wrote: Could you get a thread dump when it hangs? On Thu, Jul 24, 2014 at 11:01 PM, David Corley davidcor...@gmail.com wrote: Hey Gouzhang, Yes, I spotted that commit and had updated my working copy to that, but the test is still hanging. If it's any help, the test looks like it's doing _something_ as the CPU usage ramps up significantly and stays there until I kill the process. /Dave On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Dave, KAFKA-1533 has just been committed targeting at this issue. Did your update on trunk include this commit? commit ff05e9b3616a222e29a42f6e8fdf41945a417f41 Author: Guozhang Wang guw...@linkedin.com Date: Tue Jul 22 14:14:19 2014 -0700 Guozhang kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com wrote: Hey all, I'm trying my hand at writing some patches for open issues, but I'm running into issues with running gradlew test. It hangs every time when trying to run testBrokerFailure in the ProducerFailureHangingTest suite. It was working for a time, but I updated to trunk HEAD and it's no longer working. I'm running on OSX with JDK 1.6.0_65. I tried increasing the HeapSize for the test target, but hit the same issue. Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb Would appreciate any help. Regards, Dave -- -- Guozhang -- -- Guozhang
Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure
Did you miss the attachment? On Fri, Jul 25, 2014 at 10:44 AM, David Corley davidcor...@gmail.com wrote: Sure. Attaching here. On Fri, Jul 25, 2014 at 4:16 PM, Guozhang Wang wangg...@gmail.com wrote: Could you get a thread dump when it hangs? On Thu, Jul 24, 2014 at 11:01 PM, David Corley davidcor...@gmail.com wrote: Hey Gouzhang, Yes, I spotted that commit and had updated my working copy to that, but the test is still hanging. If it's any help, the test looks like it's doing _something_ as the CPU usage ramps up significantly and stays there until I kill the process. /Dave On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Dave, KAFKA-1533 has just been committed targeting at this issue. Did your update on trunk include this commit? commit ff05e9b3616a222e29a42f6e8fdf41945a417f41 Author: Guozhang Wang guw...@linkedin.com Date: Tue Jul 22 14:14:19 2014 -0700 Guozhang kafka-1533; transient unit test failure in ProducerFailureHandlingTest; reviewed by Guozhang Wang; reviewed by Jun Rao On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com wrote: Hey all, I'm trying my hand at writing some patches for open issues, but I'm running into issues with running gradlew test. It hangs every time when trying to run testBrokerFailure in the ProducerFailureHangingTest suite. It was working for a time, but I updated to trunk HEAD and it's no longer working. I'm running on OSX with JDK 1.6.0_65. I tried increasing the HeapSize for the test target, but hit the same issue. Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb Would appreciate any help. Regards, Dave -- -- Guozhang -- -- Guozhang -- -- Guozhang
[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
[ https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Lvovitch updated KAFKA-1557: - Attachment: server2.properties server1.properties ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) -- Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: consumer, controller, core, replication Affects Versions: 0.8.0, 0.8.1 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Fix For: 0.8.1.1, 0.8.2 Attachments: server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yeild consistent repro -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
Oleg Lvovitch created KAFKA-1557: Summary: ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: consumer, controller, core, replication Affects Versions: 0.8.1, 0.8.0 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Fix For: 0.8.2, 0.8.1.1 Attachments: server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yeild consistent repro -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
[ https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Lvovitch updated KAFKA-1557: - Attachment: BrokenKafkaLink.scala ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) -- Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: consumer, controller, core, replication Affects Versions: 0.8.0, 0.8.1 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Fix For: 0.8.1.1, 0.8.2 Attachments: BrokenKafkaLink.scala, server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yeild consistent repro -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
[ https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Lvovitch updated KAFKA-1557: - Description: TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yield consistent repro Basically it appears that brokers have incorrect ISR information for the metadata cache. Our production servers exhibit the same problem - after a topic gets created everything looks fine, but as brokers get restarted, ISR reported by the brokers is wrong, whereas the one in ZK appears to report the truth (it shrinks as brokers get shut down and grows back up after they get restarted) I'm not sure if this has wider impact on the functioning of the cluster - bad metadata information is bad - but so far there has been no evidence of that was: TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yeild consistent repro ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) -- Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: consumer, controller, core, replication Affects Versions: 0.8.0, 0.8.1 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Fix For: 0.8.1.1, 0.8.2 Attachments: BrokenKafkaLink.scala, server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074699#comment-14074699 ] Anton Karamanov commented on KAFKA-1414: 50. in this case `for` used for safe and easy extraction of values from two {{Option}}'al values. The semantic here is that body will only be called if both {{Option}}'s are defined. It may be replaced with if's or pattern matching, although current version is more idiomatic by Scala standarts. Speedup broker startup after hard reset --- Key: KAFKA-1414 URL: https://issues.apache.org/jira/browse/KAFKA-1414 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1, 0.8.2, 0.9.0 Reporter: Dmitry Bugaychenko Assignee: Jay Kreps Fix For: 0.8.2 Attachments: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, parallel-dir-loading-trunk-fixed-threadpool.patch, parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally: {code} /** * Recover and load all logs in the given data directories */ private def loadLogs(dirs: Seq[File]) { val threads : Array[Thread] = new Array[Thread](dirs.size) var i: Int = 0 val me = this for(dir - dirs) { val thread = new Thread( new Runnable { def run() { val recoveryPoints = me.recoveryPointCheckpoints(dir).read /* load the logs */ val subDirs = dir.listFiles() if(subDirs != null) { val cleanShutDownFile = new File(dir, Log.CleanShutdownFile) if(cleanShutDownFile.exists()) info(Found clean shutdown file. Skipping recovery for all logs in data directory '%s'.format(dir.getAbsolutePath)) for(dir - subDirs) { if(dir.isDirectory) { info(Loading log ' + dir.getName + ') val topicPartition = Log.parseTopicPartitionName(dir.getName) val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig) val log = new Log(dir, config, recoveryPoints.getOrElse(topicPartition, 0L), scheduler, time) val previous = addLogWithLock(topicPartition, log) if(previous != null) throw new IllegalArgumentException(Duplicate log directories found: %s, %s!.format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath)) } } cleanShutDownFile.delete() } } }) thread.start() threads(i) = thread i = i + 1 } for(thread - threads) { thread.join() } } def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = { logCreationOrDeletionLock synchronized { this.logs.put(topicPartition, log) } } {code} -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Comment Edited] (KAFKA-1542) normal IOException in the new producer is logged as ERROR
[ https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074582#comment-14074582 ] David Corley edited comment on KAFKA-1542 at 7/25/14 6:07 PM: -- Hey Jun, the current patch returns the IP address. getHostAddress() returns the address as a string, whereas getHostName() would be used if we wanted the hostname was (Author: heavydawson): Sure. Will revise the patch to get the address instead. normal IOException in the new producer is logged as ERROR - Key: KAFKA-1542 URL: https://issues.apache.org/jira/browse/KAFKA-1542 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2 Reporter: Jun Rao Labels: newbie Attachments: KAFKA-1542.patch Saw the following error in the log. It seems this can happen if the broker is down. So, this probably should be logged as WARN, instead ERROR. 2014/07/16 00:12:51.799 [Selector] Error in I/O: java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60) at org.apache.kafka.common.network.Selector.poll(Selector.java:241) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114) at java.lang.Thread.run(Thread.java:744) -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
[ https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-1557: -- Component/s: (was: consumer) Labels: newbie++ (was: ) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) -- Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: controller, core, replication Affects Versions: 0.8.0, 0.8.1 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Labels: newbie++ Fix For: 0.8.1.1, 0.8.2 Attachments: BrokenKafkaLink.scala, server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yield consistent repro Basically it appears that brokers have incorrect ISR information for the metadata cache. Our production servers exhibit the same problem - after a topic gets created everything looks fine, but as brokers get restarted, ISR reported by the brokers is wrong, whereas the one in ZK appears to report the truth (it shrinks as brokers get shut down and grows back up after they get restarted) I'm not sure if this has wider impact on the functioning of the cluster - bad metadata information is bad - but so far there has been no evidence of that -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper
[ https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Neha Narkhede updated KAFKA-1367: - Labels: newbie++ (was: ) Broker topic metadata not kept in sync with ZooKeeper - Key: KAFKA-1367 URL: https://issues.apache.org/jira/browse/KAFKA-1367 Project: Kafka Issue Type: Bug Affects Versions: 0.8.0, 0.8.1 Reporter: Ryan Berdeen Labels: newbie++ Attachments: KAFKA-1367.txt When a broker is restarted, the topic metadata responses from the brokers will be incorrect (different from ZooKeeper) until a preferred replica leader election. In the metadata, it looks like leaders are correctly removed from the ISR when a broker disappears, but followers are not. Then, when a broker reappears, the ISR is never updated. I used a variation of the Vagrant setup created by Joe Stein to reproduce this with latest from the 0.8.1 branch: https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
[ https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074766#comment-14074766 ] Joel Koshy commented on KAFKA-1557: --- For the first question: I don't think cluster health is at risk here since the ISR information is important for: * leadership decisions * ack'ing producers when required.acks = -1 Both these use-cases don't rely directly on the metadata cache. For the second question: given that this is not critical enough for a hot-fix and shouldn't be too difficult to fix it would be a good opportunity for someone new to the codebase to take a stab at it - that's why we label jiras with newbie/newbie++ tags. ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) -- Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: controller, core, replication Affects Versions: 0.8.0, 0.8.1 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Labels: newbie++ Fix For: 0.8.1.1, 0.8.2 Attachments: BrokenKafkaLink.scala, server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yield consistent repro Basically it appears that brokers have incorrect ISR information for the metadata cache. Our production servers exhibit the same problem - after a topic gets created everything looks fine, but as brokers get restarted, ISR reported by the brokers is wrong, whereas the one in ZK appears to report the truth (it shrinks as brokers get shut down and grows back up after they get restarted) I'm not sure if this has wider impact on the functioning of the cluster - bad metadata information is bad - but so far there has been no evidence of that -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Resolved] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)
[ https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Oleg Lvovitch resolved KAFKA-1557. -- Resolution: Duplicate ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth) -- Key: KAFKA-1557 URL: https://issues.apache.org/jira/browse/KAFKA-1557 Project: Kafka Issue Type: Bug Components: controller, core, replication Affects Versions: 0.8.0, 0.8.1 Environment: OSX 10.9.3, Linux Scientific 6.5 It actually doesn't seem to matter and appears to be OS-agnostic Reporter: Oleg Lvovitch Assignee: Neha Narkhede Labels: newbie++ Fix For: 0.8.2, 0.8.1.1 Attachments: BrokenKafkaLink.scala, server1.properties, server2.properties TL;DR - after a topic is created, and at least one broker in the ISR is restarted, the ISR reported by the TopicMetadataResponse is incorrect. Specific steps to repro: - Download 0.8.1 Kafka - Copy server.properties twice into server1.properties and server2.properties (attached) - basically just ports and log paths changed to allow brokers to co-exist - Start zookeper using sh bin/zookeeper-server-start.sh config/zookeper.properties - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --replication-factor 2 --partitions 3 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test - note that all ISRs are of length 2 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic state. Observer that all ISRs are of length 2 and match the information output by the script - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds - Restart broker 2 using the original command - Check the status of the topic again. Observe that the leader for all topics is 0 (as expected), and all ISRs contain both brokers (as expected) - Run the attached Scala snippet again. EXPECTED: - The ISR information are of length 2 ACTUAL: - ALL ISRs contain just broker 0 NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain the full list, but shutting it down for 15+ secs seem to always yield consistent repro Basically it appears that brokers have incorrect ISR information for the metadata cache. Our production servers exhibit the same problem - after a topic gets created everything looks fine, but as brokers get restarted, ISR reported by the brokers is wrong, whereas the one in ZK appears to report the truth (it shrinks as brokers get shut down and grows back up after they get restarted) I'm not sure if this has wider impact on the functioning of the cluster - bad metadata information is bad - but so far there has been no evidence of that -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1420: - Description: This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK was: This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests -- Key: KAFKA-1420 URL: https://issues.apache.org/jira/browse/KAFKA-1420 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Fix For: 0.8.2 This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1420: - Description: This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use was:This is a follow-up JIRA from KAFKA-1389 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests -- Key: KAFKA-1420 URL: https://issues.apache.org/jira/browse/KAFKA-1420 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Fix For: 0.8.2 This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1420: - Summary: Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests (was: Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests -- Key: KAFKA-1420 URL: https://issues.apache.org/jira/browse/KAFKA-1420 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Fix For: 0.8.2 This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests
[ https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1420: - Description: This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we misuse AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK to create topics, where TestUtils.createTopic needs to be used instead. was: This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we mis-use AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests -- Key: KAFKA-1420 URL: https://issues.apache.org/jira/browse/KAFKA-1420 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Labels: newbie Fix For: 0.8.2 This is a follow-up JIRA from KAFKA-1389. There are a bunch of places in the unit tests where we misuse AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK to create topics, where TestUtils.createTopic needs to be used instead. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075038#comment-14075038 ] saurabh agarwal commented on KAFKA-1555: Jun, I agree that ack=-1 works fine for the most of the use cases. Here is another suggestion that might address our use case. Can we add an additional option in producer property - min.isr.required ( similar to dfs.replication.min) for durability. ack=-1 ensures that every replicas in ISR will receive the message before producer get the ack. And min.isr.required=2 ensures that there are minimum two replicas in ISR to publish a message. Otherwise it will throw the exception that Number of the required replicas is not in ISR. Here is the example where this will be very useful. Take a scenario where the producer was publishing at very high rate. We bring down two follower replicas. Now when we bring back up those replicas, it took a while for those replicas to catch up as there are still messages getting published at the higher rate. So there is no replicas in ISR for a while. During this time, if the disk at the leader replica fail, then we will not have any replica who has those messages. It would be good if we have more than one copy in ISR all the time. And this will address our usecase where we need strong consistency, high durability with reasonable availability. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075215#comment-14075215 ] Daniel Compton commented on KAFKA-960: -- The main (only?) benefit for us is fixing the issue where Mbean names are wrapped in quotes. It looks like there are some feature and performance benefits as well. What do you mean by 'client'? Is that a producer or consumer library? Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075244#comment-14075244 ] Jun Rao commented on KAFKA-960: --- Yes. Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability
[ https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075263#comment-14075263 ] Jun Rao commented on KAFKA-1555: So, you are suggesting to reject a message if current replicas in ISR is less than 2? However, immediately after a message is successfully published, some replicas could go down, which could bring the ISR to below 2. So, I am not sure if this is any better than just running things with a larger replication factor, say 4. My understanding is that with dfs.replication.min in HDFS is that as you are writing data to HDFS, you can actually write data to replicas fewer than that min value. For example, suppose that you are writing 100 bytes to 3 replicas in HDFS with dfs.replication.min=2. If after the 100 bytes are written to first replica, the other 2 replicas die, HDFS will complete the write with just 1 replica. However, in the background, HDFS will try to create new replicas to make sure the total # of replicas reaches the min value. This is sth that you can do with Kafka admin tools too. We can potentially automate this somehow. provide strong consistency with reasonable availability --- Key: KAFKA-1555 URL: https://issues.apache.org/jira/browse/KAFKA-1555 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Jiang Wu Assignee: Neha Narkhede In a mission critical application, we expect a kafka cluster with 3 brokers can satisfy two requirements: 1. When 1 broker is down, no message loss or service blocking happens. 2. In worse cases such as two brokers are down, service can be blocked, but no message loss happens. We found that current kafka versoin (0.8.1.1) cannot achieve the requirements due to its three behaviors: 1. when choosing a new leader from 2 followers in ISR, the one with less messages may be chosen as the leader. 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has less messages than the leader. 3. ISR can contains only 1 broker, therefore acknowledged messages may be stored in only 1 broker. The following is an analytical proof. We consider a cluster with 3 brokers and a topic with 3 replicas, and assume that at the beginning, all 3 replicas, leader A, followers B and C, are in sync, i.e., they have the same messages and are all in ISR. According to the value of request.required.acks (acks for short), there are the following cases. 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement. 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this time, although C hasn't received m, C is still in ISR. If A is killed, C can be elected as the new leader, and consumers will miss m. 3. acks=-1. B and C restart and are removed from ISR. Producer sends a message m to A, and receives an acknowledgement. Disk failure happens in A before B and C replicate m. Message m is lost. In summary, any existing configuration cannot satisfy the requirements. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x
[ https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075266#comment-14075266 ] Daniel Compton commented on KAFKA-960: -- Probably worth parking for now then. Upgrade Metrics to 3.x -- Key: KAFKA-960 URL: https://issues.apache.org/jira/browse/KAFKA-960 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.1 Reporter: Cosmin Lehene Now that metrics 3.0 has been released (http://metrics.codahale.com/about/release-notes/) we can upgrade back -- This message was sent by Atlassian JIRA (v6.2#6252)