[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481521#comment-14481521 ] Sriharsha Chintalapani commented on KAFKA-2082: --- [~eapache] [~junrao] These are my observations so far 1) producer test connects to all the 5 brokers and sends messages. These brokers are connected to their respective zookeepers. I.e broker1 connects to localhost:2181 and broker2 connects to localhost:2182 2) producer test disables zookeeper connections for 2181 and 2183 (zookeeper1 , zookeeper2). broker1 and broker3 are deregistered from /broker/ids . 3) But broker1 still listening on their respective ports and sending responses. since they don't have any active connections to zookeeper they are returning stale data and still has older controller as the controller. 4) Hence the producer gets wrong leaders for the topic partitions from broker1 Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of
[jira] [Created] (KAFKA-2097) Implement request delays for quota violations
Aditya Auradkar created KAFKA-2097: -- Summary: Implement request delays for quota violations Key: KAFKA-2097 URL: https://issues.apache.org/jira/browse/KAFKA-2097 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar As defined in the KIP, implement delays on a per-request basis for both producer and consumer. This involves either modifying the existing purgatory or adding a new delay queue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dmitry Bugaychenko updated KAFKA-2029: -- Attachment: KAFKA-2029.patch Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10, TimeUnit.SECONDS)) { throw new IllegalStateException(Failed to acquire controller lock in 10 seconds.); }
Review Request 32890: Patch for conrolled shutdown rebase on trunk
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32890/ --- Review request for kafka. Bugs: KAFKA-2029 https://issues.apache.org/jira/browse/KAFKA-2029 Repository: kafka Description --- Controlled shutdown patch Diffs - core/src/main/scala/kafka/controller/ControllerChannelManager.scala 97acdb23f6e95554c3e0357aa112eddfc875efbc core/src/main/scala/kafka/controller/KafkaController.scala 3a09377611b48198c4c3cd1a118fc12eda0543d4 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 3b15ab4eef22c6f50a7483e99a6af40fb55aca9f core/src/main/scala/kafka/server/ReplicaManager.scala 144a15e3a53d47fa972bef277c335da484842cf9 Diff: https://reviews.apache.org/r/32890/diff/ Testing --- Thanks, Dmitry Bugaychenko
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481465#comment-14481465 ] Dmitry Bugaychenko commented on KAFKA-2029: --- Created reviewboard https://reviews.apache.org/r/32890/diff/ against branch origin/trunk Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10,
RE: [KIP-DISCUSSION] KIP-13 Quotas
Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer implementation would want to know how much to back off. So perhaps this and config-management should be moved to a separate discussion, but it would be good to have this discussion going and incorporated into the first quota implementation. 3. Config--I think we need to generalize the topic stuff so we can override at multiple levels. We have topic and client, but I suspect user and
[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates
[ https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481477#comment-14481477 ] Dmitry Bugaychenko commented on KAFKA-2029: --- Rebased on trunk. Improving controlled shutdown for rolling updates - Key: KAFKA-2029 URL: https://issues.apache.org/jira/browse/KAFKA-2029 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 0.8.1.1 Reporter: Dmitry Bugaychenko Assignee: Neha Narkhede Priority: Critical Attachments: KAFKA-2029.patch, KAFKA-2029.patch Controlled shutdown as implemented currently can cause numerous problems: deadlocks, local and global datalos, partitions without leader and etc. In some cases the only way to restore cluster is to stop it completelly using kill -9 and start again. Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase queue size makes things much worse (see discussion there). Note 2: The problems described here can occure in any setup, but they are extremly painful in setup with large brokers (36 disks, 1000+ assigned partitions per broker in our case). Note 3: These improvements are actually workarounds and it is worth to consider global refactoring of the controller (make it single thread, or even get rid of it in the favour of ZK leader elections for partitions). The problems and improvements are: # Controlled shutdown takes a long time (10+ minutes), broker sends multiple shutdown requests and finally considers it as failed and procedes to unclean shutdow, controller got stuck for a while (holding a lock waiting for free space in controller-to-broker queue). After broker starts back it receives followers request and erases highwatermarks (with a message that replica does not exists - controller hadn't yet sent a request with replica assignment), then controller starts replicas on the broker it deletes all local data (due to missing highwatermarks). Furthermore, controller starts processing pending shutdown request and stops replicas on the broker letting it in a non-functional state. Solution to the problem might be to increase time broker waits for controller reponse to shutdown request, but this timeout is taken from controller.socket.timeout.ms which is global for all broker-controller communication and setting it to 30 minutes is dangerous. *Proposed solution: introduce dedicated config parameter for this timeout with a high default*. # If a broker gets down during controlled shutdown and did not come back controller got stuck in a deadlock (one thread owns the lock and tries to add message to the dead broker's queue, send thread is a infinite loop trying to retry message to the dead broker, and the broker failure handler is waiting for a lock). There are numerous partitions without a leader and the only way out is to kill -9 the controller. *Proposed solution: add timeout for adding message to broker's queue*. ControllerChannelManager.sendRequest: {code} def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) = Unit = null) { brokerLock synchronized { val stateInfoOpt = brokerStateInfo.get(brokerId) stateInfoOpt match { case Some(stateInfo) = // ODKL Patch: prevent infinite hang on trying to send message to a dead broker. // TODO: Move timeout to config if (!stateInfo.messageQueue.offer((request, callback), 10, TimeUnit.SECONDS)) { error(Timed out trying to send message to broker + brokerId.toString) // Do not throw, as it brings controller into completely non-functional state // Controller to broker state change requests batch is not empty while creating a new one //throw new IllegalStateException(Timed out trying to send message to broker + brokerId.toString) } case None = warn(Not sending request %s to broker %d, since it is offline..format(request, brokerId)) } } } {code} # When broker which is a controler starts shut down if auto leader rebalance is running it deadlocks in the end (shutdown thread owns the lock and waits for rebalance thread to exit and rebalance thread wait for lock). *Proposed solution: use bounded wait in rebalance thread*. KafkaController.scala: {code} // ODKL Patch to prevent deadlocks in shutdown. /** * Execute the given function inside the lock */ def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = { if (isRunning || lock.isHeldByCurrentThread) { // TODO: Configure timeout. if (!lock.tryLock(10, TimeUnit.SECONDS)) { throw new IllegalStateException(Failed to acquire
[jira] [Commented] (KAFKA-1621) Standardize --messages option in perf scripts
[ https://issues.apache.org/jira/browse/KAFKA-1621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481865#comment-14481865 ] Rekha Joshi commented on KAFKA-1621: Hi.missed updating earlier. everything else was setup as https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review but the jira-client could not be installed (Mac 10.9.5, Python 2.7.5, pip 1.5.6), hence could not update review board. suggestion: why not review directly on git as rest of the apache projects? python kafka-patch-review.py --help Traceback (most recent call last): File kafka-patch-review.py, line 10, in module from jira.client import JIRA ImportError: No module named jira.client pip install jira-python Downloading/unpacking jira-python Could not find any downloads that satisfy the requirement jira-python Cleaning up... No distributions at all found for jira-python Storing debug log for failure in /usr/local/.pip/pip.log sudo easy_install jira-python Searching for jira-python Reading http://pypi.python.org/simple/jira-python/ Couldn't find index page for 'jira-python' (maybe misspelled?) Scanning index of all packages (this may take a while) Reading http://pypi.python.org/simple/ No local packages or download links found for jira-python error: Could not find suitable distribution for Requirement.parse('jira-python') Think it could be related to security enabled on pip?, but I would prefer not to downgrade pip as used for my other projects as well. Standardize --messages option in perf scripts - Key: KAFKA-1621 URL: https://issues.apache.org/jira/browse/KAFKA-1621 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1.1 Reporter: Jay Kreps Labels: newbie This option is specified in PerfConfig and is used by the producer, consumer and simple consumer perf commands. The docstring on the argument does not list it as required but the producer performance test requires it--others don't. We should standardize this so that either all the commands require the option and it is marked as required in the docstring or none of them list it as required. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
Rekha Joshi created KAFKA-2098: -- Summary: Gradle Wrapper Jar gone missing in 0.8.2.1 Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481843#comment-14481843 ] Jun Rao commented on KAFKA-2082: After step 2), which broker is the new controller? The new controller should be able to detect that broker 1 and 3 are dead and move the leaders to other brokers. So, was the issue that the new broker didn't detect the failure of broker 1 and 3 properly or it didn't propagate the new metadata properly. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster
[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append
[ https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2043: --- Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to trunk. CompressionType is passed in each RecordAccumulator append -- Key: KAFKA-2043 URL: https://issues.apache.org/jira/browse/KAFKA-2043 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.8.2.0 Reporter: Grant Henke Assignee: Grant Henke Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2043.patch, KAFKA-2043_2015-03-25_13:28:52.patch Currently org.apache.kafka.clients.producer.internals.RecordAccumulator append method accepts the compressionType on a per record basis. It looks like the code would only work on a per batch basis because the CompressionType is only used when creating a new RecordBatch. My understanding is this should only support setting per batch at most. public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] value, CompressionType compression, Callback callback) throws InterruptedException; The compression type is a producer level config. Instead of passing it in for each append, we probably should just pass it in once during the creation RecordAccumulator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2078) Getting Selector [WARN] Error in I/O with host java.io.EOFException
[ https://issues.apache.org/jira/browse/KAFKA-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481617#comment-14481617 ] Aravind commented on KAFKA-2078: Yes, I could reproduce this with just 1 broker as well. As I have created a topic with single partition and no replication factor. Thanks! Getting Selector [WARN] Error in I/O with host java.io.EOFException --- Key: KAFKA-2078 URL: https://issues.apache.org/jira/browse/KAFKA-2078 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Environment: OS Version: 2.6.39-400.209.1.el5uek and Hardware: 8 x Intel(R) Xeon(R) CPU X5660 @ 2.80GHz/44GB Reporter: Aravind Assignee: Jun Rao When trying to Produce 1000 (10 MB) messages, getting this below error some where between 997 to 1000th message. There is no pattern but able to reproduce. [PDT] 2015-03-31 13:53:50 Selector [WARN] Error in I/O with our host java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:724) This error I am getting some times @ 997th message or 999th message. There is no pattern but able to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2078) Getting Selector [WARN] Error in I/O with host java.io.EOFException
[ https://issues.apache.org/jira/browse/KAFKA-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481861#comment-14481861 ] Jun Rao commented on KAFKA-2078: What kind of network do you have? Do you have any load balancer or firewalls? Could you reproduce the issue on a localhost? Getting Selector [WARN] Error in I/O with host java.io.EOFException --- Key: KAFKA-2078 URL: https://issues.apache.org/jira/browse/KAFKA-2078 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Environment: OS Version: 2.6.39-400.209.1.el5uek and Hardware: 8 x Intel(R) Xeon(R) CPU X5660 @ 2.80GHz/44GB Reporter: Aravind Assignee: Jun Rao When trying to Produce 1000 (10 MB) messages, getting this below error some where between 997 to 1000th message. There is no pattern but able to reproduce. [PDT] 2015-03-31 13:53:50 Selector [WARN] Error in I/O with our host java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:724) This error I am getting some times @ 997th message or 999th message. There is no pattern but able to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2096) Enable keepalive socket option for broker to prevent socket leak
[ https://issues.apache.org/jira/browse/KAFKA-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481890#comment-14481890 ] Jun Rao commented on KAFKA-2096: [~allenxwang], that seems to be a good fix. Do you want to submit a patch? Enable keepalive socket option for broker to prevent socket leak Key: KAFKA-2096 URL: https://issues.apache.org/jira/browse/KAFKA-2096 Project: Kafka Issue Type: Improvement Components: network Affects Versions: 0.8.2.1 Reporter: Allen Wang Assignee: Jun Rao Priority: Critical We run a Kafka 0.8.2.1 cluster in AWS with large number of producers ( 1). Also the number of producer instances scale up and down significantly on a daily basis. The issue we found is that after 10 days, the open file descriptor count will approach the limit of 32K. An investigation of these open file descriptors shows that a significant portion of these are from client instances that are terminated during scaling down. Somehow they still show as ESTABLISHED in netstat. We suspect that the AWS firewall between the client and broker causes this issue. We attempted to use keepalive socket option to reduce this socket leak on broker and it appears to be working. Specifically, we added this line to kafka.network.Acceptor.accept(): socketChannel.socket().setKeepAlive(true) It is confirmed during our experiment of this change that entries in netstat where the client instance is terminated were probed as configured in operating system. After configured number of probes, the OS determined that the peer is no longer alive and the entry is removed, possibly after an error in Kafka to read from the channel and closing the channel. Also, our experiment shows that after a few days, the instance was able to keep a stable low point of open file descriptor count, compared with other instances where the low point keeps increasing day to day. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Build failed in Jenkins: KafkaPreCommit #59
See https://builds.apache.org/job/KafkaPreCommit/59/changes Changes: [junrao] kafka-2043; CompressionType is passed in each RecordAccumulator append; patched by Grant Henke; reviewed by Jun Rao -- [...truncated 2129 lines...] kafka.api.test.ProducerCompressionTest testCompression[1] PASSED kafka.api.test.ProducerCompressionTest testCompression[2] PASSED kafka.api.test.ProducerCompressionTest testCompression[3] PASSED kafka.network.SocketServerTest simpleRequest PASSED kafka.network.SocketServerTest tooBigRequestIsRejected PASSED kafka.network.SocketServerTest testNullResponse PASSED kafka.network.SocketServerTest testSocketsCloseOnShutdown PASSED kafka.network.SocketServerTest testMaxConnectionsPerIp PASSED kafka.network.SocketServerTest testMaxConnectionsPerIPOverrides PASSED kafka.log.OffsetIndexTest truncate PASSED kafka.log.OffsetIndexTest randomLookupTest PASSED kafka.log.OffsetIndexTest lookupExtremeCases PASSED kafka.log.OffsetIndexTest appendTooMany PASSED kafka.log.OffsetIndexTest appendOutOfOrder PASSED kafka.log.OffsetIndexTest testReopen PASSED kafka.log.OffsetMapTest testBasicValidation PASSED kafka.log.OffsetMapTest testClear PASSED kafka.log.CleanerTest testCleanSegments PASSED kafka.log.CleanerTest testCleaningWithDeletes PASSED kafka.log.CleanerTest testCleaningWithUnkeyedMessages PASSED kafka.log.CleanerTest testCleanSegmentsWithAbort PASSED kafka.log.CleanerTest testSegmentGrouping PASSED kafka.log.CleanerTest testSegmentGroupingWithSparseOffsets PASSED kafka.log.CleanerTest testBuildOffsetMap PASSED kafka.log.FileMessageSetTest testWrittenEqualsRead PASSED kafka.log.FileMessageSetTest testIteratorIsConsistent PASSED kafka.log.FileMessageSetTest testSizeInBytes PASSED kafka.log.FileMessageSetTest testWriteTo PASSED kafka.log.FileMessageSetTest testFileSize PASSED kafka.log.FileMessageSetTest testIterationOverPartialAndTruncation PASSED kafka.log.FileMessageSetTest testIterationDoesntChangePosition PASSED kafka.log.FileMessageSetTest testRead PASSED kafka.log.FileMessageSetTest testSearch PASSED kafka.log.FileMessageSetTest testIteratorWithLimits PASSED kafka.log.FileMessageSetTest testTruncate PASSED kafka.log.LogTest testTimeBasedLogRoll PASSED kafka.log.LogTest testTimeBasedLogRollJitter PASSED kafka.log.LogTest testSizeBasedLogRoll PASSED kafka.log.LogTest testLoadEmptyLog PASSED kafka.log.LogTest testAppendAndReadWithSequentialOffsets PASSED kafka.log.LogTest testAppendAndReadWithNonSequentialOffsets PASSED kafka.log.LogTest testReadAtLogGap PASSED kafka.log.LogTest testReadOutOfRange PASSED kafka.log.LogTest testLogRolls PASSED kafka.log.LogTest testCompressedMessages PASSED kafka.log.LogTest testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED kafka.log.LogTest testMessageSetSizeCheck PASSED kafka.log.LogTest testCompactedTopicConstraints PASSED kafka.log.LogTest testMessageSizeCheck PASSED kafka.log.LogTest testLogRecoversToCorrectOffset PASSED kafka.log.LogTest testIndexRebuild PASSED kafka.log.LogTest testTruncateTo PASSED kafka.log.LogTest testIndexResizingAtTruncation PASSED kafka.log.LogTest testBogusIndexSegmentsAreRemoved PASSED kafka.log.LogTest testReopenThenTruncate PASSED kafka.log.LogTest testAsyncDelete PASSED kafka.log.LogTest testOpenDeletesObsoleteFiles PASSED kafka.log.LogTest testAppendMessageWithNullPayload PASSED kafka.log.LogTest testCorruptLog PASSED kafka.log.LogTest testCleanShutdownFile PASSED kafka.log.LogTest testParseTopicPartitionName PASSED kafka.log.LogTest testParseTopicPartitionNameForEmptyName PASSED kafka.log.LogTest testParseTopicPartitionNameForNull PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingSeparator PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingTopic PASSED kafka.log.LogTest testParseTopicPartitionNameForMissingPartition PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[0] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[1] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[2] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[3] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[4] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[5] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[6] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[7] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[8] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[9] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[10] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[11] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[12] PASSED kafka.log.BrokerCompressionTest testBrokerSideCompression[13] PASSED kafka.log.BrokerCompressionTest
[jira] [Comment Edited] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482042#comment-14482042 ] Sriharsha Chintalapani edited comment on KAFKA-2082 at 4/6/15 10:21 PM: [~eapache] I think the problem here is how the test is operating. In general users will configure each broker with a zookeeper connection that specifies multiple hosts like localhost:2181,localhost:2182,localhost:2183 . If there is connection loss that might be due to some issue in broker ( jvm garbage collection for example) in this case new controller will be elected and when this broker comes back up it goes through oncontrollerresignation. It get a updateMetadataRequest to update metadata cache so that the requests get latest metadata. Another case would be one of the zookeeper node going down which is fine as there are other zookeeper nodes to serve the kafka brokers. But in this particular test each broker connected to individual zookeeper host which are in a quorum. When you disable zk1, broker 1 disconnects which causes /broker/ids/1 to get deleted as its a ephemeral node. New controller will be elected and it goes through KafkaController.onControllerFailover sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) but liveOrShuttingDownBrokerIds won't have all the brokers in the list in this specific case ,as the /brokes/ids are ephemeral nodes they get deleted if the client disconnects. But the producer still connecting to broker1 which never got a updateMetadataRequest so its still serving stale data. was (Author: sriharsha): [~eapache] I think the problem here is how the test is operating. In general users will configure each broker with a zookeeper connection that specifies multiple hosts like localhost:2181,localhost:2182,localhost:2183 . If there is connection loss that might be due to some issue in broker ( jvm garbage collection for example) in this case new controller will be elected and when this broker comes back up it goes through oncontrollerresignation. It get a updateMetadataRequest to update metadata cache so that the requests get latest metadata. Another case would be one of the zookeeper node going down which is fine as there are other zookeeper nodes to serve the kafka brokers. But in this particular test each broker connected to individual zookeeper host which are in a quorum. When you disable zk1, broker 1 disconnects which causes /broker/ids/1 to get deleted as its a ephemeral node. New controller will be elected and it goes through KafkaController.onPartitionReassignment //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) but liveOrShuttingDownBrokerIds won't have all the brokers in the list in this specific case ,as the /brokes/ids are ephemeral nodes they get deleted if the client disconnects. But the producer still connecting to broker1 which never got a updateMetadataRequest so its still serving stale data. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481931#comment-14481931 ] Sriharsha Chintalapani commented on KAFKA-2082: --- [~junrao] After step 2 I noticed the controller change leaders for 1 and 3 and now the all the partitions have leader either 2,4,5. New controller did detect the failure of the brokers 1 and 3. But the client(producer) still connecting to the older broker , in this case 1 and 3 getting stale metadata response. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be
Re: Few questions regarding KAFKA-1928 (reusing common network code in core)
1. The controller - broker communication doesn't have to be blocking. It just needs to call a callback on completion of a request. We can leave this out for now. It just means that we can't run the inter cluster communication through non-plaintext port until we port this code to the common network code. Similarly, currently, the replica fetcher code currently uses SimpleConsumer. We need to change that code too to support non-plaintext port across brokers. Haven't thought about this much. So, not sure whether it's better to replace the code with the new consumer client or with some special code over the common network client. 2. Yes, it does seem that Receive needs to expose some kind of api to get the ByteBuffer as we do in the Receiver in scala. Jay probably can comment on this better. Thanks, Jun On Mon, Apr 6, 2015 at 2:55 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I started work on converting core to use common network code and I ran into two questions that hopefully someone can help me with: 1. BlockingChannel - Looks like the inter-broker communication is mostly through blocking channel. There's no client equivalent (i.e client code is all non-blocking). I was planning to leave this as is, but let me know if the intention was to make all this communication non-blocking. 2. Receive ByteBufferReceive - It looks like ByteBufferReceive is not used anywhere. What was it planned for? NetworkReceive implements payload() method that returns the backing buffer. It is used all over the place (to create requests, responses, etc). Because ByteBufferReceive doesn't implement payload() (and its not clear how it can implement it), this means that payload() is not part of the Receive interface, although I'm pretty sure it should be there. Some explanation about the Receive interface and what is ByteBufferReceive for will help. Gwen
[jira] [Commented] (KAFKA-2059) ZookeeperConsumerConnectorTest.testBasic trasient failure
[ https://issues.apache.org/jira/browse/KAFKA-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482085#comment-14482085 ] Guozhang Wang commented on KAFKA-2059: -- I saw this error once out of 100 runs, and on my machine each run takes about 12 - 15 minutes. ZookeeperConsumerConnectorTest.testBasic trasient failure - Key: KAFKA-2059 URL: https://issues.apache.org/jira/browse/KAFKA-2059 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Labels: newbie {code} kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic FAILED kafka.common.InconsistentBrokerIdException: Configured brokerId 1 doesn't match stored brokerId 0 in meta.properties at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:443) at kafka.server.KafkaServer.startup(KafkaServer.scala:116) at kafka.utils.TestUtils$.createServer(TestUtils.scala:126) at kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57) at kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:57) at kafka.javaapi.consumer.ZookeeperConsumerConnectorTest.setUp(ZookeeperConsumerConnectorTest.scala:41) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2086) testRoundRobinPartitionAssignor transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-2086. Resolution: Duplicate Yes. Closing this one. testRoundRobinPartitionAssignor transient failure - Key: KAFKA-2086 URL: https://issues.apache.org/jira/browse/KAFKA-2086 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Saw the following transient unit failure. unit.kafka.consumer.PartitionAssignorTest testRoundRobinPartitionAssignor FAILED java.lang.NullPointerException at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172) at scala.collection.immutable.List.foreach(List.scala:381) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166) at unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (KAFKA-2086) testRoundRobinPartitionAssignor transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao closed KAFKA-2086. -- testRoundRobinPartitionAssignor transient failure - Key: KAFKA-2086 URL: https://issues.apache.org/jira/browse/KAFKA-2086 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Saw the following transient unit failure. unit.kafka.consumer.PartitionAssignorTest testRoundRobinPartitionAssignor FAILED java.lang.NullPointerException at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172) at scala.collection.immutable.List.foreach(List.scala:381) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166) at unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Keep docs updated per jira
It seems that everyone is in favor of keeping the docs updated in each jira. As for the next step, anyone wants to help figure out if it's possible to move the website to git? Thanks, Jun On Tue, Mar 31, 2015 at 10:03 AM, Gwen Shapira gshap...@cloudera.com wrote: +1 On Tue, Mar 31, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: Yeah the protocol should probably move off the wiki and into the release-versioned docs. -Jay On Mon, Mar 30, 2015 at 5:17 PM, Joel Koshy jjkosh...@gmail.com wrote: Also for the wikis - those should probably correspond to the latest released version right? So for e.g., if we add or modify the protocol on trunk we can add it to the wiki but mark it with some highlight or similar just to make it clear that it is a change on trunk only. Thanks, Joel On Thu, Mar 26, 2015 at 06:27:25PM -0700, Jun Rao wrote: Hi, Everyone, Quite a few jiras these days require documentation changes (e.g., wire protocol, ZK layout, configs, jmx, etc). Historically, we have been updating the documentation just before we do a release. The issue is that some of the changes will be missed since they were done a while back. Another way to do that is to keep the docs updated as we complete each jira. Currently, our documentations are in the following places. wire protocol: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol ZK layout: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper configs/jmx: https://svn.apache.org/repos/asf/kafka/site/083 We probably don't need to update configs already ported to ConfigDef since they can be generated automatically. However, for the rest of the doc related changes, keeping they updated per jira seems a better approach. What do people think? Thanks, Jun
[GitHub] kafka pull request: KAFKA-2098: gradle files
GitHub user rekhajoshm opened a pull request: https://github.com/apache/kafka/pull/54 KAFKA-2098: gradle files gradle files, tiny footprint.lets have it in.thanks You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/kafka KAFKA-2098 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/54.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #54 commit be7dc1933d2c266d15c46a06d057686c8d48e520 Author: Joshi rekhajo...@gmail.com Date: 2015-04-06T21:02:13Z KAFKA-2098: gradle files --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481936#comment-14481936 ] ASF GitHub Bot commented on KAFKA-2098: --- GitHub user rekhajoshm opened a pull request: https://github.com/apache/kafka/pull/54 KAFKA-2098: gradle files gradle files, tiny footprint.lets have it in.thanks You can merge this pull request into a Git repository by running: $ git pull https://github.com/rekhajoshm/kafka KAFKA-2098 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/54.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #54 commit be7dc1933d2c266d15c46a06d057686c8d48e520 Author: Joshi rekhajo...@gmail.com Date: 2015-04-06T21:02:13Z KAFKA-2098: gradle files Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482058#comment-14482058 ] Evan Huus commented on KAFKA-2082: -- What if (in a normally set up cluster) a broker becomes completely isolated from all zk nodes and all other brokers? If I understand correctly, effectively the same bug will occur, as the isolated broker will serve stale metadata. So I think that regardless of how the test is set up, a broker which does not have a zookeeper connection should refuse to serve metadata requests. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with
Re: Review Request 31568: Patch for KAFKA-1989
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78668 --- Thanks for the new patch. Now that I understood the logic better, I think this is a really smart implementation. A few more comments below. core/src/main/scala/kafka/server/DelayedOperation.scala https://reviews.apache.org/r/31568/#comment128118 We probably should call forceComplete() first and only if it returns true, run onExpiration(). core/src/main/scala/kafka/server/DelayedOperation.scala https://reviews.apache.org/r/31568/#comment128145 Java executor service by default eats all unhandled exceptions. For easier debugging, we will need to add an UncaughtExceptionHandler and log an error of the exception. See KafkaSchedule.startup() and Utils.newThread(). core/src/main/scala/kafka/server/DelayedOperation.scala https://reviews.apache.org/r/31568/#comment128129 Perhaps we should trigger a purge if the number of total purgible operations (instead of the number of unque purgible operations) is more than the purgeInternal. This can be estimated as watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - delayed). core/src/main/scala/kafka/utils/timer/Timer.scala https://reviews.apache.org/r/31568/#comment128139 Is Timer too general a name? Should we rename it to sth like DelayedOperationTimer? core/src/main/scala/kafka/utils/timer/TimingWheel.scala https://reviews.apache.org/r/31568/#comment128109 For the hierachical part, let's say that u is 1 and n is 2. If current time is c, then the buckets at different levels are: levelbuckets 1[c,c] [c+1,c+1] 2[c,c+1] [c+2,c+3] 3[c,c+3] [c+4,c+7] So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 will never be used since those buckets are already covered in the lower level. This seems a bit wasteful. To remove that waste, we could choose to statt the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the same currernt time as the start time at all levels for simplicity? If so, this is probably fine since the larger the n, the less the waste. However, it's probably worth documenting that the buckets at different levels can overlap? core/src/main/scala/kafka/utils/timer/TimingWheel.scala https://reviews.apache.org/r/31568/#comment127579 A overflow = An overflow in a overflow = in an overflow core/src/main/scala/kafka/utils/timer/TimingWheel.scala https://reviews.apache.org/r/31568/#comment127580 moved the finer = moved to the finer core/src/main/scala/kafka/utils/timer/TimingWheel.scala https://reviews.apache.org/r/31568/#comment128112 It would be useful to add that the timing wheel implementation is used to optimize the common case when operations are completed before they time out. core/src/main/scala/kafka/utils/timer/TimingWheel.scala https://reviews.apache.org/r/31568/#comment128117 I was initially puzzled on how we synchronize btw add() and advanceClock(). Then I realized that the synchronization is actually done in the caller in Timer. I was thinking of how to make this part clearer. One way is to add the documentation here. Another way is to move the read/write lock from Timer to here, and pass in needed data structures like delayQueue. core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala https://reviews.apache.org/r/31568/#comment128146 should have 5 = should have 6 core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala https://reviews.apache.org/r/31568/#comment128148 Actually, where is the logic to not increment the counter on reinserting existing tasks? TimerTaskList.add() seems to always increment the counter. - Jun Rao On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated April 1, 2015, 8:50 p.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION
Re: Chicken and egg problem when initializing SocketServer
Gwen, That sounds about right to me. I've found that when ports are bound dynamically, RAII-style greatly simplifies things since if you've bound them as early as possible, then you'll always be able to get at the port even if nothing is accept()ing yet. -Ewen On Sat, Apr 4, 2015 at 8:27 PM, Gwen Shapira gshap...@cloudera.com wrote: Thanks. I think I found a solution that doesn't take new locks or buffers: 1. Change the portProtocol map to ConcurrentHashMap (which makes it both thread-safe and mutable). Initialize as empty map. 2. Initialize both processors and acceptors with the hashmap, also tell each acceptor which protocol it is accepting 3. After the acceptors bind() but before run(), register them with the hashmap. Anyone sees flaws here? On Sat, Apr 4, 2015 at 7:21 PM, Ashish Singh asi...@cloudera.com wrote: Hi Gwen, Can we buffer the requests that arrive before acceptors are up. Once we have the acceptors, the buffer can be cleared. However, this adds on to the complexity. On Sunday, April 5, 2015, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm trying to rebase KAFKA-1809 after KAFKA-1501 was committed, and I'm running into a chicken and egg problem. In my design, SocketServer creates a map of port-SecurityProtocol. This map is passed to Processor when it is initialized, and Processor uses it to route requests to the correct channel. However, after KAFKA-1501, we may not know the ports until after Acceptors are started, and we want to start Acceptors after Processors are already running... One solution is that if Processor finds that all keys in the map are 0, it will route all requests to PLAINTEXT channel. However, this will prevent us from using the new port-binding test code when testing TLS and SASL, bringing the whole port randomization problem back. Another option is new flag that will allow us to bind Acceptors before starting Processors but not accept anything until this flag is set, after the Processors started? This seems a bit tricky though. Anyone has better ideas? Gwen -- Regards, Ashish -- Thanks, Ewen
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481981#comment-14481981 ] Evan Huus commented on KAFKA-2082: -- So I guess the producer should not be connecting to broker that isn't the controller? I don't know how a non-jvm client is supposed to detect that case as it doesn't appear to be exposed in the protocol. Or should a broker which is completely disconnected from zookeeper refuse to serve metadata requests? Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the
Few questions regarding KAFKA-1928 (reusing common network code in core)
Hi, I started work on converting core to use common network code and I ran into two questions that hopefully someone can help me with: 1. BlockingChannel - Looks like the inter-broker communication is mostly through blocking channel. There's no client equivalent (i.e client code is all non-blocking). I was planning to leave this as is, but let me know if the intention was to make all this communication non-blocking. 2. Receive ByteBufferReceive - It looks like ByteBufferReceive is not used anywhere. What was it planned for? NetworkReceive implements payload() method that returns the backing buffer. It is used all over the place (to create requests, responses, etc). Because ByteBufferReceive doesn't implement payload() (and its not clear how it can implement it), this means that payload() is not part of the Receive interface, although I'm pretty sure it should be there. Some explanation about the Receive interface and what is ByteBufferReceive for will help. Gwen
[jira] [Commented] (KAFKA-1910) Refactor KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482084#comment-14482084 ] Guozhang Wang commented on KAFKA-1910: -- [~jjkoshy]: 1. No the error code change was not necessary, I originally just add it for OffsetCommitTest, hence it only checks NoError code for the case where no offsets is fetchable. 2. This change has been reverted (by accident while doing rebase) in KAFKA-1634, I can submit a follow-up patch to delete the exception class and the error code from mapping. Refactor KafkaConsumer -- Key: KAFKA-1910 URL: https://issues.apache.org/jira/browse/KAFKA-1910 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.3 Attachments: KAFKA-1910.patch, KAFKA-1910.patch, KAFKA-1910_2015-03-05_14:55:33.patch KafkaConsumer now contains all the logic on the consumer side, making it a very huge class file, better re-factoring it to have multiple layers on top of KafkaClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482207#comment-14482207 ] Gwen Shapira commented on KAFKA-1809: - Sorry for missing that [~harsha_ch]. I agree that having file, object and method names match will make life easier :) Opened a JIRA for that. Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809-DOCs.V1.patch, KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch, KAFKA-1809_2015-03-27_15:04:10.patch, KAFKA-1809_2015-04-02_15:12:30.patch, KAFKA-1809_2015-04-02_19:03:58.patch, KAFKA-1809_2015-04-04_22:00:13.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
Gwen Shapira created KAFKA-2099: --- Summary: BrokerEndPoint file, methods and object names should match Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2086) testRoundRobinPartitionAssignor transient failure
[ https://issues.apache.org/jira/browse/KAFKA-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481962#comment-14481962 ] Guozhang Wang commented on KAFKA-2086: -- Is this a duplicate of KAFKA-2056? testRoundRobinPartitionAssignor transient failure - Key: KAFKA-2086 URL: https://issues.apache.org/jira/browse/KAFKA-2086 Project: Kafka Issue Type: Sub-task Components: core Affects Versions: 0.8.3 Reporter: Jun Rao Saw the following transient unit failure. unit.kafka.consumer.PartitionAssignorTest testRoundRobinPartitionAssignor FAILED java.lang.NullPointerException at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172) at scala.collection.immutable.List.foreach(List.scala:381) at unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172) at unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166) at unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482042#comment-14482042 ] Sriharsha Chintalapani commented on KAFKA-2082: --- [~eapache] I think the problem here is how the test is operating. In general users will configure each broker with a zookeeper connection that specifies multiple hosts like localhost:2181,localhost:2182,localhost:2183 . If there is connection loss that might be due to some issue in broker ( jvm garbage collection for example) in this case new controller will be elected and when this broker comes back up it goes through oncontrollerresignation. It get a updateMetadataRequest to update metadata cache so that the requests get latest metadata. Another case would be one of the zookeeper node going down which is fine as there are other zookeeper nodes to serve the kafka brokers. But in this particular test each broker connected to individual zookeeper host which are in a quorum. When you disable zk1, broker 1 disconnects which causes /broker/ids/1 to get deleted as its a ephemeral node. New controller will be elected and it goes through KafkaController.onPartitionReassignment //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) but liveOrShuttingDownBrokerIds won't have all the brokers in the list in this specific case ,as the /brokes/ids are ephemeral nodes they get deleted if the client disconnects. But the producer still connecting to broker1 which never got a updateMetadataRequest so its still serving stale data. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the
Re: Review Request 31366: Patch for KAFKA-1461
On March 24, 2015, 10:46 p.m., Guozhang Wang wrote: core/src/main/scala/kafka/server/AbstractFetcherThread.scala, lines 81-86 https://reviews.apache.org/r/31366/diff/2/?file=898415#file898415line81 Jun has a comment about the case when all partitions gets inactive, which is common when the fetched broker has been just gone through leader migration. We can move the foreach statement before the if statement, and after foreach check if any partitions gets added, if not just backoff for fetchBackoffMs. Sriharsha Chintalapani wrote: Thanks for the review. Are you looking at something like this. This wouldn't handle if we have partitionMap populated but all of them are inactive. partitionMap.foreach { case((topicAndPartition, partitionFetchState)) = if(partitionFetchState.isActive) fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, fetchSize) } if (partitionMap.isEmpty) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) or do we want to check if all the currentParttions are inactive and than backoff? that would be expensive to check if all the partitions or active or not in dowork. What I think is a bit different and maybe simpler: For FetchRequestBuilder, in the build() call its requestMap will be cleared after the fetch request is created, so we can just add another function in FetchRequestBuilder return boolean indicating if its request map is empty. With this we can get rid of the allPartitionsInactive flag. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31366/#review77674 --- On April 4, 2015, 3:48 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31366/ --- (Updated April 4, 2015, 3:48 a.m.) Review request for kafka. Bugs: KAFKA-1461 https://issues.apache.org/jira/browse/KAFKA-1461 Repository: kafka Description --- KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. KAFKA-1461. Replica fetcher thread does not implement any back-off behavior. Diffs - core/src/main/scala/kafka/server/AbstractFetcherThread.scala 5d5cf5897cc37b3595f14bfe9d7cde43456bcc4b core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 Diff: https://reviews.apache.org/r/31366/diff/ Testing --- Thanks, Sriharsha Chintalapani
Re: Review Request 31568: Patch for KAFKA-1989
On April 1, 2015, 6:46 p.m., Jun Rao wrote: core/src/main/scala/kafka/utils/timer/Timer.scala, lines 44-45 https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line44 Could we use inLock() where applicable? Yasuhiro Matsuda wrote: I don't like to have a closure overhead here. Scala's closure creates a new instance of closure object every time. How much is the closure overhead? We use inLock() in quite a few other places. It would be good to keep the usage consistent. If it's too much overhead, perhaps we can remove all inLock() usage in a seperate jira? - Jun --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/#review78444 --- On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31568/ --- (Updated April 1, 2015, 8:50 p.m.) Review request for kafka. Bugs: KAFKA-1989 https://issues.apache.org/jira/browse/KAFKA-1989 Repository: kafka Description --- new purgatory implementation Diffs - core/src/main/scala/kafka/server/DelayedOperation.scala e317676b4dd5bb5ad9770930e694cd7282d5b6d5 core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 7a37617395b9e4226853913b8989f42e7301de7c core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/31568/diff/ Testing --- Thanks, Yasuhiro Matsuda
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482134#comment-14482134 ] Guozhang Wang commented on KAFKA-2082: -- [~eapache] the problem you described is valid, that if a broker is isolated from other ZK / brokers its metadata will become stale. For replica manager, since it currently does not backoff while getting the not-leader exception its logs will get swamped, and KAFKA-1461 will fix this issue. As for clients, they will periodically refresh their metadata from a random broker and when that metadata is fetched from other brokers, the clients will producer / fetch to / from the new leader. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused.
Re: [DISCUSS] New partitioning for better load balancing
Gianmarco, I browse through your paper (congrats for the ICDE publication BTW!), and here are some questions / comments on the algorithm: 1. One motivation of enabling key-based partitioned in Kafka is to achieve per-key ordering, i.e. with all messages with the same key sent to the same partition their ordering is preserved. However with key-splitting that seems to break this guarantee and now messages with the same key may be sent to 2 (or generally speaking many) partitions. 2. As for the local load estimation, there is a second mapping from partitions (workers in your paper) to broker hosts beside the mapping from keys to partitions, and not all broker hosts maintain each of the partitions. For example, there are 4 brokers, and broker-1/2 each takes one of the two partitions of topic A, while broker-3/4 each takes one of the two partitions of topic B, etc. I am wondering if those two issues can be resolved with the PKG framework? Guozhang On Sun, Apr 5, 2015 at 12:19 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hi Jay, Thanks, that sounds a necessary step. I guess I expected something like that to be already there, at least internally. I created KAFKA-2092 to track the PKG integration. Cheers, -- Gianmarco On 4 April 2015 at 23:50, Jay Kreps jay.kr...@gmail.com wrote: Hey guys, I think the first step here would be to expose a partitioner interface for the new producer that would make it easy to plug in these different strategies. I filed a JIRA for this: https://issues.apache.org/jira/browse/KAFKA-2091 -Jay On Fri, Apr 3, 2015 at 9:36 AM, Harsha ka...@harsha.io wrote: Gianmarco, I am coming from storm community. I think PKG is a very interesting and we can provide an implementation of Partitioner for PKG. Can you open a JIRA for this. -- Harsha Sent with Airmail On April 3, 2015 at 4:49:15 AM, Gianmarco De Francisci Morales ( g...@apache.org) wrote: Hi, We have recently studied the problem of load balancing in distributed stream processing systems such as Samza [1]. In particular, we focused on what happens when the key distribution of the stream is skewed when using key grouping. We developed a new stream partitioning scheme (which we call Partial Key Grouping). It achieves better load balancing than hashing while being more scalable than round robin in terms of memory. In the paper we show a number of mining algorithms that are easy to implement with partial key grouping, and whose performance can benefit from it. We think that it might also be useful for a larger class of algorithms. PKG has already been integrated in Storm [2], and I would like to be able to use it in Samza as well. As far as I understand, Kafka producers are the ones that decide how to partition the stream (or Kafka topic). Even after doing a bit of reading, I am still not sure if I should be writing this email here or on the Samza dev list. Anyway, my first guess is Kafka. I do not have experience with Kafka, however partial key grouping is very easy to implement: it requires just a few lines of code in Java when implemented as a custom grouping in Storm [3]. I believe it should be very easy to integrate. For all these reasons, I believe it will be a nice addition to Kafka/Samza. If the community thinks it's a good idea, I will be happy to offer support in the porting. References: [1] https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf [2] https://issues.apache.org/jira/browse/STORM-632 [3] https://github.com/gdfm/partial-key-grouping -- Gianmarco -- -- Guozhang
[jira] [Resolved] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira resolved KAFKA-2098. - Resolution: Won't Fix Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1
[ https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481979#comment-14481979 ] Gwen Shapira commented on KAFKA-2098: - I believe it was deliberately removed, since we can't have binaries in our source distribution. The solution is to install and run Gradle first. Gradle Wrapper Jar gone missing in 0.8.2.1 -- Key: KAFKA-2098 URL: https://issues.apache.org/jira/browse/KAFKA-2098 Project: Kafka Issue Type: Bug Components: build Affects Versions: 0.8.2.1 Reporter: Rekha Joshi ./gradlew idea Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain This was working in 0.8.2.Attaching patch.Thanks -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs
[ https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482198#comment-14482198 ] Sriharsha Chintalapani commented on KAFKA-1809: --- [~gwenshap] I apologize for not commenting on the RB earlier. But the following naming is causing confusion for me. We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. Refactor brokers to allow listening on multiple ports and IPs -- Key: KAFKA-1809 URL: https://issues.apache.org/jira/browse/KAFKA-1809 Project: Kafka Issue Type: Sub-task Components: security Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-1809-DOCs.V1.patch, KAFKA-1809.patch, KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch, KAFKA-1809_2015-03-27_15:04:10.patch, KAFKA-1809_2015-04-02_15:12:30.patch, KAFKA-1809_2015-04-02_19:03:58.patch, KAFKA-1809_2015-04-04_22:00:13.patch The goal is to eventually support different security mechanisms on different ports. Currently brokers are defined as host+port pair, and this definition exists throughout the code-base, therefore some refactoring is needed to support multiple ports for a single broker. The detailed design is here: https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-13 Quotas
As for 4, if we are going to reuse the purgatory class are we going to just use the produce / fetch purgatory objects or we are going to create a new throttle purgatory object? If we go with the first option then I think Jun's concern is valid such that some produce / fetch requests will have many keys and hence calling watch() will end up adding the request on each one of the watch lists, and we have seen some issues before with this scenario. Guozhang On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the
Re: Review Request 32866: Patch for KAFKA-1954
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32866/#review79084 --- Thanks for the patch. A few comments below. Also, could you rebase? core/src/test/scala/unit/kafka/admin/AdminTest.scala https://reviews.apache.org/r/32866/#comment128180 Could we do foreach instead of map? core/src/test/scala/unit/kafka/admin/AdminTest.scala https://reviews.apache.org/r/32866/#comment128179 Could we do foreach instead of map? There are a few other places like this. core/src/test/scala/unit/kafka/producer/ProducerTest.scala https://reviews.apache.org/r/32866/#comment128181 indentation core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala https://reviews.apache.org/r/32866/#comment128186 indentation core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala https://reviews.apache.org/r/32866/#comment128189 Is there a particular reason to remove the test on verifyNonDaemonThreadsStatus? core/src/test/scala/unit/kafka/utils/TestUtils.scala https://reviews.apache.org/r/32866/#comment128191 Is there a reason to change the default? In the most common case when we are just starting a single broker in a cluster, we want to enable controlled shutdown so that we know that it won't block. - Jun Rao On April 5, 2015, 8:54 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32866/ --- (Updated April 5, 2015, 8:54 p.m.) Review request for kafka. Bugs: KAFKA-1954 https://issues.apache.org/jira/browse/KAFKA-1954 Repository: kafka Description --- KAFKA-1954. Speed up unit tests. Diffs - core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 99ac9230f63105a2942bec8fe2febde8a7e48b2e core/src/test/scala/unit/kafka/admin/AdminTest.scala 0305f704b66a257d1ae732550793dbb2536e7ac1 core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 61cc6028dd7c9a2eec2d9cbe6947764655801eee core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 28e3122b71ca26c2fdf81649b0586ebc94e105fe core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 130b20533d35499e0d4e0403d521b246610bd325 core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 56b1b8c004b1719c45b1e7bc9580e3638e3438ac core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 7c87b81b12366eb04144ec4005a5fda0d0260eea core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 18361c13fd0b8cd60f45249507b76d53e6001c12 core/src/test/scala/unit/kafka/producer/ProducerTest.scala a7ca14227edd8ca1a65122350fe4740d455f0c5c core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala b0112402e7813cf4f5247e90af87482c340bd000 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 3d4258f8b31c1f2c7e0351fb6df9a96234e11d1d core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 496bf0ddfc2e7872dc187ad85de5e845c6258604 core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92e49df6964230753762aa359191a6ede337d3ac core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala a67cc37d542de0df0df24224b9d8db5472ecaf27 core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 2bfaeb307094b0fba67a1976b7f87591ca36a45c core/src/test/scala/unit/kafka/utils/TestUtils.scala f4518255a2bcf3ae3ccb19b9e2ff0b70c613966a Diff: https://reviews.apache.org/r/32866/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
[ https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-2099: --- Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Thanks for the patch. +1 and committed to trunk. BrokerEndPoint file, methods and object names should match -- Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-2099.patch [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.
[ https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon resolved KAFKA-2094. - Resolution: Not A Problem Fix Version/s: 0.8.2.0 This is the issue caused by not setting offset to consumers. I apologise for my mistake. Kafka does not create topic automatically after deleting the topic. --- Key: KAFKA-2094 URL: https://issues.apache.org/jira/browse/KAFKA-2094 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: Ubuntu 14.04 LTS Reporter: Hyukjin Kwon Priority: Critical Fix For: 0.8.2.0 After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Server side, I got this error [2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when processing fetch request for partition [test,0] offset 1687 from consumer with correlation id 5. Possible cause: Request for offset 1687 but we only have log segments in the range 0 to 107. (kafka.server.ReplicaManager) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [KIP-DISCUSSION] KIP-13 Quotas
Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com] Sent: Friday, April 03, 2015 12:48 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Aditya, thanks for the updated KIP and Jay/Jun thanks for the comments. Couple of comments in-line: 2. I would advocate for adding the return flag when we next bump the request format version just to avoid proliferation. I agree this is a good thing to know about, but at the moment I don't think we have a very well flushed out idea of how the client would actually make use of this info. I I'm somewhat inclined to having something appropriate off the bat - mainly because (i) clients really should know that they have been throttled (ii) a smart producer/consumer
Re: [KIP-DISCUSSION] KIP-13 Quotas
Here's a wild guess: An app developer included a Kafka Producer in his app, and is not happy with the throughput. He doesn't have visibility into the brokers since they are owned by a different team. Obviously the first instinct of a developer who knows that throttling exists is to blame throttling for any slowdown in the app. If he doesn't have a way to know from the responses whether or not his app is throttled, he may end up calling Aditya at 4am asked Hey, is my app throttled?. I assume Aditya is trying to avoid this scenario. On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Aditya, 2. I kind of buy it, but I really like to understand the details of the use case before we make protocol changes. What changes are you proposing in the clients for monitoring and how would that be used? -Jay On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Hi Jay, 2. At this time, the proposed response format changes are only for monitoring/informing clients. As Jun mentioned, we get instance level monitoring in this case since each instance that got throttled will have a metric confirming the same. Without client level monitoring for this, it's hard for application developers to find if they are being throttled since they will also have to be aware of all the brokers in the cluster. This is quite problematic for large clusters. It seems nice for app developers to not have to think about kafka internal metrics and only focus on the metrics exposed on their instances. Analogous to having client-sde request latency metrics. Basically, we want an easy way for clients to be aware if they are being throttled. 4. For purgatory v delay queue, I think we are on the same page. I feel it is nicer to use the purgatory but I'm happy to use a DelayQueue if there are performance implications. I don't know enough about the current and Yasuhiro's new implementation to be sure one way or the other. Stepping back, I think these two things are the only remaining point of discussion within the current proposal. Any concerns if I started a voting thread on the proposal after the KIP discussion tomorrow? (assuming we reach consensus on these items) Thanks, Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Saturday, April 04, 2015 1:36 PM To: dev@kafka.apache.org Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas Hey Aditya, 2. For the return flag I'm not terribly particular. If we want to add it let's fully think through how it will be used. The only concern I have is adding to the protocol without really thinking through the use cases. So let's work out the APIs we want to add to the Java consumer and producer and the use cases for how clients will make use of these. For my part I actually don't see much use other than monitoring since it isn't an error condition to be at your quota. And if it is just monitoring I don't see a big enough difference between having the monitoring on the server-side versus in the clients to justify putting it in the protocol. But I think you guys may have other use cases in mind of how a client would make some use of this? Let's work that out. I also don't feel strongly about it--it wouldn't be *bad* to have the monitoring available on the client, just doesn't seem that much better. 4. For the purgatory vs delay queue I think is arguably nicer to reuse the purgatory we just have to be ultra-conscious of efficiency. I think our goal is to turn quotas on across the board, so at LinkedIn that would mean potentially every request will need a small delay. I haven't worked out the efficiency implications of this choice, so as long as we do that I'm happy. -Jay On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Some responses to Jay's points. 1. Using commas - Cool. 2. Adding return flag - I'm inclined to agree with Joel that this is good to have in the initial implementation. 3. Config - +1. I'll remove it from the KIP. We can discuss this in parallel. 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the existing purgatories for both delayed produce and fetch requests. IIUC, all we need for quotas is a minWait parameter for DelayedOperation (or something equivalent) since there is already a max wait. The completion criteria can check if minWait time has elapsed before declaring the operation complete. For this to impact performance, a significant number of clients may need to exceed their quota at the same time and even then I'm not very clear on the scope of the impact. Two layers of delays might add complexity to the implementation which I'm hoping to avoid. Aditya From: Joel Koshy [jjkosh...@gmail.com]
[jira] [Commented] (KAFKA-2059) ZookeeperConsumerConnectorTest.testBasic trasient failure
[ https://issues.apache.org/jira/browse/KAFKA-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482537#comment-14482537 ] Fangmin Lv commented on KAFKA-2059: --- Every run takes 30 seconds on my machine. I've run the test for a whole day, and haven't seen any failure. Can you help paste the failed log info so that we can check what happened? You can use this command to run the single testBasic test case until it fails: $ R=0; while [[ $R -eq 0 ]]; do ./gradlew core:test --tests kafka.javaapi.consumer.ZookeeperConsumerConnectorTest.testBasic --rerun-tasks -i; R=$?; done | tee ./testBasic.log ZookeeperConsumerConnectorTest.testBasic trasient failure - Key: KAFKA-2059 URL: https://issues.apache.org/jira/browse/KAFKA-2059 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang Labels: newbie {code} kafka.javaapi.consumer.ZookeeperConsumerConnectorTest testBasic FAILED kafka.common.InconsistentBrokerIdException: Configured brokerId 1 doesn't match stored brokerId 0 in meta.properties at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:443) at kafka.server.KafkaServer.startup(KafkaServer.scala:116) at kafka.utils.TestUtils$.createServer(TestUtils.scala:126) at kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57) at kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:57) at kafka.javaapi.consumer.ZookeeperConsumerConnectorTest.setUp(ZookeeperConsumerConnectorTest.scala:41) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
[ https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2099: Status: Patch Available (was: Open) BrokerEndPoint file, methods and object names should match -- Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2099.patch [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
[ https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2099: Attachment: KAFKA-2099.patch BrokerEndPoint file, methods and object names should match -- Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2099.patch [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 32913: Patch for KAFKA-2099
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32913/ --- Review request for kafka. Bugs: KAFKA-2099 https://issues.apache.org/jira/browse/KAFKA-2099 Repository: kafka Description --- Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-2099 Diffs - core/src/main/scala/kafka/admin/AdminUtils.scala 0d3332e0e0bbb396128a69e179ee34087c221958 core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala d2a3d43c0ee7b0c502b652571f8d2d7181012f42 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala bf93632edb02d8b4ef089b12d62e584fd83425ae core/src/main/scala/kafka/api/TopicMetadata.scala 6de447d0d4d6e77c283a3504c171b6a97eacecf2 core/src/main/scala/kafka/api/TopicMetadataResponse.scala 776b604c63e64d75a854481e923f91be06ce1d31 core/src/main/scala/kafka/api/UpdateMetadataRequest.scala a91e3a67587e42554a0b79f255225895b07470a7 core/src/main/scala/kafka/client/ClientUtils.scala f08aaf274aef0b96598080a32e3465c5215f747f core/src/main/scala/kafka/cluster/Broker.scala 8e603b604fb13184331d30a319421c0d393d1728 core/src/main/scala/kafka/cluster/BrokerEndPoint.scala 22dba18a89cebb5501345bc3a84160e054afbdc6 core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 6bb0d569db8b16eddd299bbb8a903d77cb6bdd10 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala cde4481c40a74cae1a2103d734df6df659ec7271 core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 9c14428045740530cbf2d1457f0276a51fa402bd core/src/main/scala/kafka/javaapi/TopicMetadata.scala ebbd589f9a842f6ab77963ac014905f77059a656 core/src/main/scala/kafka/producer/ProducerPool.scala 07feb05460e8f0acc2eda722d840dc566a4c2ffd core/src/main/scala/kafka/server/AbstractFetcherManager.scala f8f93317531b08d5b13019820db0c34834a3d3c8 core/src/main/scala/kafka/server/AbstractFetcherThread.scala 1e26de25797f91fe94ef391e74ed2c04b4c10112 core/src/main/scala/kafka/server/MetadataCache.scala 4460b42eb7beb857d4d9dd40b68d940335200cdc core/src/main/scala/kafka/server/ReplicaFetcherManager.scala f0a2a5bb18a0dcd0496b54f07f6db04b05972b06 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b2196c81bcc2c4090c8b4069456fc4d1a8a4634d core/src/main/scala/kafka/server/ReplicaManager.scala 144a15e3a53d47fa972bef277c335da484842cf9 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala d1050b475161e64160c47f84d903da2075c2d401 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 9a6804c69c0d6591cba7e2fd75a98d7fa313cca2 core/src/main/scala/kafka/utils/ZkUtils.scala 82b0e3355eec21d9fb962c6a0b73acd3f1d125b4 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 83910f3870985deaf165de52aee16695bd3a52a5 core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ad58eedfa94caeda4cadff396ab83286c865764c core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 603cf76afd0f9cf7448d467ba948839652c873e4 core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 2169a5ccd9ebf5da9400cde97c352e8dfb5a9b21 Diff: https://reviews.apache.org/r/32913/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
[ https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482296#comment-14482296 ] Gwen Shapira commented on KAFKA-2099: - Created reviewboard https://reviews.apache.org/r/32913/diff/ against branch trunk BrokerEndPoint file, methods and object names should match -- Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2099.patch [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Kafka confusion
This is great survey for understanding which part still needs improvement, thanks Gwen! On Mon, Mar 30, 2015 at 12:27 PM, Gwen Shapira gshap...@cloudera.com wrote: I was planning on doing a re-poll about a month after every release :) Maybe it can be part of the release activity. Gwen On Mon, Mar 30, 2015 at 10:26 AM, Jay Kreps j...@confluent.io wrote: Gwen had a really good blog post on confusing things about Kafka: http://ingest.tips/2015/03/26/what-is-confusing-about-kafka/ A lot of these are in-flight now (e.g. consumer) or finished (e.g. delete topic, kind of, and non-sticky producer partitioning). Do we have bugs for the rest? It would be great to make this an ongoing thing we do to assess confusing configs, etc. Not sure the best way, to do this, but maybe just maintaining a special JIRA tag and periodically polling people again? -Jay -- -- Guozhang
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482301#comment-14482301 ] Evan Huus commented on KAFKA-2082: -- OK, that all makes sense. Some follow-up questions: 1. When the cluster heals (at the end of the test) the replication continues to fail. Based on your explanation, I would have thought that the cluster healing would have triggered oncontrollerresignation, which would update the metadata on that node and fix the replication by pointing it to the new leader, but that does not appear to be the case? 2. The go client currently tries brokers in the order specified by the user on startup, not randomly. I can see the argument for a random choice (it's on our todo list) but even that only works around the problem - an unlucky producer could choose the bad broker enough times in a row to run out of retries and start dropping messages. I really think that an isolated broker should refuse to serve metadata requests - it knows that its information is likely stale, and forcing clients to try another broker is the only way for them to reliably get fresh metadata. Just like documentation: the only metadata worse than no metadata is incorrect metadata. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482302#comment-14482302 ] Evan Huus commented on KAFKA-2082: -- OK, that all makes sense. Some follow-up questions: 1. When the cluster heals (at the end of the test) the replication continues to fail. Based on your explanation, I would have thought that the cluster healing would have triggered oncontrollerresignation, which would update the metadata on that node and fix the replication by pointing it to the new leader, but that does not appear to be the case? 2. The go client currently tries brokers in the order specified by the user on startup, not randomly. I can see the argument for a random choice (it's on our todo list) but even that only works around the problem - an unlucky producer could choose the bad broker enough times in a row to run out of retries and start dropping messages. I really think that an isolated broker should refuse to serve metadata requests - it knows that its information is likely stale, and forcing clients to try another broker is the only way for them to reliably get fresh metadata. Just like documentation: the only metadata worse than no metadata is incorrect metadata. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions
[jira] [Commented] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
[ https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482363#comment-14482363 ] Gwen Shapira commented on KAFKA-2099: - Thanks for quick review [~junrao] and [~harsha_ch] BrokerEndPoint file, methods and object names should match -- Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Fix For: 0.8.3 Attachments: KAFKA-2099.patch [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482361#comment-14482361 ] Sriharsha Chintalapani commented on KAFKA-2082: --- [~guozhang] 1. This case is working fine. Test disables zk1 and zk3 and later enables zk3 . In this Broker3 has the latest metadata. 2. In this case should we stop the socket server or is there any other way to reject requests. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482421#comment-14482421 ] Evan Huus commented on KAFKA-2082: -- 1. Then why does broker3 continue to spam logs? (I understand the *spam* is due to the lack of backoff, but if it gets the latest metadata at that point why does it continue to fail at all?) Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never cuts more than two connections at a time, so zookeeper should always have quorum, and the topic (with three replicas) should always be writable. Completely restarting the cluster via {{vagrant reload}} seems to put it back into a sane state. -- This message was sent by Atlassian JIRA
[jira] [Commented] (KAFKA-2099) BrokerEndPoint file, methods and object names should match
[ https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482316#comment-14482316 ] Sriharsha Chintalapani commented on KAFKA-2099: --- Thanks [~gwenshap] BrokerEndPoint file, methods and object names should match -- Key: KAFKA-2099 URL: https://issues.apache.org/jira/browse/KAFKA-2099 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2099.patch [~harsha_ch] commented on KAFKA-1809: We've BrokerEndPoint.scala but internally object called object BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If possible can we get these consistent in another JIRA. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 31850: Patch for KAFKA-1660
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79063 --- Thanks for the patch. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128121 [NOTE] is not a standard javadoc highlight is it? (I don't know.) If not, can you just use the standard strong for emphasis? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128123 We are doing this because - We do this because or This is done because clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128125 previous - outstanding clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128126 was not able - is unable before timeout - before the specified timeout clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128134 If timeout 0, this method blocks as it tries to join the sender thread within the specified timeout. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128135 == Also, is this completely true? It seems we may join (albeit without trying to send anything further) if called from a non-sender thread. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128138 We do this because the sender thread would otherwise try to join itself and block forever. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128142 When - If an That said, this doc is a bit weird - if that is what the user is supposed to do then why can't this method take care of it (i.e., let the interrupted exception go)? It seems the right thing to do would be to just propagate and let the caller decide what to do. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128144 with timeout = {} ms clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128141 elegantly - gracefully clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128147 ```log.warn(Overriding close timeout {} ms to 0 ms in order to prevent deadlock due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back., timeout)``` clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128154 May want to log that we are proceeding to a force close clients/src/main/java/org/apache/kafka/clients/producer/Producer.java https://reviews.apache.org/r/31850/#comment128155 until timeout is expired - within the specified timeout. If the close does not complete within the timeout, discard any pending messages and force close the producer. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java https://reviews.apache.org/r/31850/#comment128166 As you explained offline, the sender does not have access to record batches while requests are in flight, but it would be super if we can figure out a way to avoid leaking details of batch completion (which is currently exclusively in sender) into the RecordAccumulator. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java https://reviews.apache.org/r/31850/#comment128200 I don't think we should overload InterruptException for this. InterruptException is a wrapper around InterruptedException. i.e., after an InterruptException the thread should in fact have been interrupted - i.e., the interrupt status of the thread should be true (which is not the case here). clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java https://reviews.apache.org/r/31850/#comment128167 See comment above. clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java https://reviews.apache.org/r/31850/#comment128169 Can you revert this? i.e., I think the previous version with locally declared accums is cleaner. core/src/test/scala/integration/kafka/api/ProducerSendTest.scala https://reviews.apache.org/r/31850/#comment128171 Can you also add a test for calling close with a non-zero timeout in the callback? - Joel Koshy On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote: --- This is
Re: Review Request 31850: Patch for KAFKA-1660
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79107 --- clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java https://reviews.apache.org/r/31850/#comment128201 send - previously sent - Guozhang Wang On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated March 27, 2015, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 Diff: https://reviews.apache.org/r/31850/diff/ Testing --- Unit tests passed. Thanks, Jiangjie Qin
Re: Review Request 32913: Patch for KAFKA-2099
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32913/#review79099 --- Ship it! Ship It! - Sriharsha Chintalapani On April 7, 2015, 12:35 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/32913/ --- (Updated April 7, 2015, 12:35 a.m.) Review request for kafka. Bugs: KAFKA-2099 https://issues.apache.org/jira/browse/KAFKA-2099 Repository: kafka Description --- Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-2099 Diffs - core/src/main/scala/kafka/admin/AdminUtils.scala 0d3332e0e0bbb396128a69e179ee34087c221958 core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala d2a3d43c0ee7b0c502b652571f8d2d7181012f42 core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala bf93632edb02d8b4ef089b12d62e584fd83425ae core/src/main/scala/kafka/api/TopicMetadata.scala 6de447d0d4d6e77c283a3504c171b6a97eacecf2 core/src/main/scala/kafka/api/TopicMetadataResponse.scala 776b604c63e64d75a854481e923f91be06ce1d31 core/src/main/scala/kafka/api/UpdateMetadataRequest.scala a91e3a67587e42554a0b79f255225895b07470a7 core/src/main/scala/kafka/client/ClientUtils.scala f08aaf274aef0b96598080a32e3465c5215f747f core/src/main/scala/kafka/cluster/Broker.scala 8e603b604fb13184331d30a319421c0d393d1728 core/src/main/scala/kafka/cluster/BrokerEndPoint.scala 22dba18a89cebb5501345bc3a84160e054afbdc6 core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 6bb0d569db8b16eddd299bbb8a903d77cb6bdd10 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala cde4481c40a74cae1a2103d734df6df659ec7271 core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 9c14428045740530cbf2d1457f0276a51fa402bd core/src/main/scala/kafka/javaapi/TopicMetadata.scala ebbd589f9a842f6ab77963ac014905f77059a656 core/src/main/scala/kafka/producer/ProducerPool.scala 07feb05460e8f0acc2eda722d840dc566a4c2ffd core/src/main/scala/kafka/server/AbstractFetcherManager.scala f8f93317531b08d5b13019820db0c34834a3d3c8 core/src/main/scala/kafka/server/AbstractFetcherThread.scala 1e26de25797f91fe94ef391e74ed2c04b4c10112 core/src/main/scala/kafka/server/MetadataCache.scala 4460b42eb7beb857d4d9dd40b68d940335200cdc core/src/main/scala/kafka/server/ReplicaFetcherManager.scala f0a2a5bb18a0dcd0496b54f07f6db04b05972b06 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b2196c81bcc2c4090c8b4069456fc4d1a8a4634d core/src/main/scala/kafka/server/ReplicaManager.scala 144a15e3a53d47fa972bef277c335da484842cf9 core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala d1050b475161e64160c47f84d903da2075c2d401 core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 9a6804c69c0d6591cba7e2fd75a98d7fa313cca2 core/src/main/scala/kafka/utils/ZkUtils.scala 82b0e3355eec21d9fb962c6a0b73acd3f1d125b4 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 83910f3870985deaf165de52aee16695bd3a52a5 core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala ad58eedfa94caeda4cadff396ab83286c865764c core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 603cf76afd0f9cf7448d467ba948839652c873e4 core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 2169a5ccd9ebf5da9400cde97c352e8dfb5a9b21 Diff: https://reviews.apache.org/r/32913/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482343#comment-14482343 ] Guozhang Wang commented on KAFKA-2082: -- 1. when broker-1 registry re-appears on ZK, the new controller should be notified about this new broker and send the latest metadata to it. Could you check to see if that has ever happend? 2. I think I agree with you, that when the broker lost its zk session it should stop handling client requests for letting clients to refresh its metadata sooner. We can probably do that in KafkaHealthcheck's session expiration listener (handleStateChanged, which is currently no-op). Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and restoring zookeeper connections (all connections to zookeeper are run through a simple proxy on the same vm to make this easy). The majority of the time this works very well and does a good job exercising our producer's retry and failover code. However, under certain patterns of connection loss (the {{TEST_SEED}} in the instructions is important), kafka gets confused. The test never
Jenkins build is back to normal : KafkaPreCommit #60
See https://builds.apache.org/job/KafkaPreCommit/60/changes
Is there a complete Kafka 0.8.* replication design document
Hi, These days I have been focus on Kafka 0.8 replication design and found three replication design proposals from the wiki (according to the document, the V3 version is used in Kafka 0.8 release). But the v3 proposal is not complete and is inconsistent with the release. Is there a complete Kafka 0.8 replication design document? Here are part of my questions about Kafka 0.8 replication design. #1 According to V3, /brokers/topics/[topic]/[partition_id]/leaderAndISR stores leader and ISR of a partition. However in 0.8.2 release there is not such a znode, instead, it use /brokers/topics/[topic]/partitions/[partition_id]/state to store the leader and ISR of a partition. #2 In /brokers/topics/[topic], we can get all the ISR for all partitions in a certain topic, why we need /brokers/topics/[topic]/partitions/[partition_id]/state ? #3 I didn't find /admin/partitions_add/[topic]/[partition_id] and /admin/partitions_remove/[topic]/[partition_id] during my adding and removing partitions with bin/kafka-topics.sh. Is this deprecated in the 0.8 release? #4 I found these two znode under /admin only will be automaticall removed after the action complete. /admin/reassign_partitions/, /admin/preferred_replica_election/. But why this znode (/admin/delete_topic/) will not be removed automatically? #5 What's the LeaderAndISRCommand in Senario A in V3? Is that same with LeaderAndISRRequest? #6 For Senario D, when a certain broker becomes Controller, it will send a LeaderAndISRRequest to brokers with a special flag INIT. For Senario C, when the broker receive LeaderAndISRRequest with INIT flag, it will delete all local partitions not in set_p. Why we need to delete all local partitions for Controller changing? #7 For Senario E. Broker startup. The first step is read the replica assignment. Doesn't it need to add its id to /brokers/ids first? #8 Senario H. Add/remove partitions to an existing topic. In my test, I didn't found such znode for PartitionRemove Path/PartitionAdd Path in Zookeeper. Is this approach for partition adding/deleting deprecated? In fact, I didn't observe any znode change during my adding/deleting partitions. So what's the process of Kafka partition adding/deleting? #9 Senario G. seems not consistent with the release one Regards, Jason
[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state
[ https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482494#comment-14482494 ] Sriharsha Chintalapani commented on KAFKA-2082: --- [~eapache] I was testing mostly with the trunk I just re-tried with kafka 0.8.2.1 as you said broker 3 never recovers . But with the latest trunk I can see this in the broker 3 logs [2015-04-07 02:56:13,485] INFO done re-registering broker (kafka.server.KafkaHealthcheck) [2015-04-07 02:56:13,486] INFO Subscribing to /brokers/topics path to watch for new topics (kafka.server.KafkaHealthcheck) [2015-04-07 02:56:13,500] INFO New leader is 9095 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2015-04-07 02:56:13,561] INFO [KafkaApi-9093] current updateMetadataRequest Name:UpdateMetadataRequest;Version:1;Controller:909 5;ControllerEpoch:2;CorrelationId:7;ClientId:id_9095-host_localhost-port_29095; Can you also give it a try with latest trunk and make sure you add reserved.broker.max.id=3 to server.properties with the trunk. Kafka Replication ends up in a bad state Key: KAFKA-2082 URL: https://issues.apache.org/jira/browse/KAFKA-2082 Project: Kafka Issue Type: Bug Components: replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Sriharsha Chintalapani Priority: Critical Attachments: KAFKA-2082.patch While running integration tests for Sarama (the go client) we came across a pattern of connection losses that reliably puts kafka into a bad state: several of the brokers start spinning, chewing ~30% CPU and spamming the logs with hundreds of thousands of lines like: {noformat} [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,1] failed due to Leader not local for partition [many_partition,1] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on partition [many_partition,6] failed due to Leader not local for partition [many_partition,6] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,21] failed due to Leader not local for partition [many_partition,21] on broker 9093 (kafka.server.ReplicaManager) [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on partition [many_partition,26] failed due to Leader not local for partition [many_partition,26] on broker 9093 (kafka.server.ReplicaManager) {noformat} This can be easily and reliably reproduced using the {{toxiproxy-final}} branch of https://github.com/Shopify/sarama which includes a vagrant script for provisioning the appropriate cluster: - {{git clone https://github.com/Shopify/sarama.git}} - {{git checkout test-jira-kafka-2082}} - {{vagrant up}} - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}} After the test finishes (it fails because the cluster ends up in a bad state), you can log into the cluster machine with {{vagrant ssh}} and inspect the bad nodes. The vagrant script provisions five zookeepers and five brokers in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}. Additional context: the test produces continually to the cluster while randomly cutting and
Re: Review Request 31850: Patch for KAFKA-1660
On April 7, 2015, 1:28 a.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java, line 362 https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line362 As you explained offline, the sender does not have access to record batches while requests are in flight, but it would be super if we can figure out a way to avoid leaking details of batch completion (which is currently exclusively in sender) into the RecordAccumulator. Actually, since the incomplete batches list was introduced when we add the flush() call, we are sort of leaking it to accumulator already before this patch. And I feel it is not that bad to add this list into the accumulator. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79063 --- On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/ --- (Updated March 27, 2015, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1660 https://issues.apache.org/jira/browse/KAFKA-1660 Repository: kafka Description --- A minor fix. Incorporated Guozhang's comments. Modify according to the latest conclusion. Patch for the finally passed KIP-15git status Diffs - clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ab263423ff1d33170effb71acdef3fc501fa072a clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 6913090af03a455452b0b5c3df78f266126b3854 clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java fee322fa0dd9704374db4a6964246a7d2918d3e4 clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java c2fdc23239bd2196cd912c3d121b591f21393eab clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java e379ac89c9a2fbfe750d6b0dec693b7eabb76204 core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 3df450784592b894008e7507b2737f9bb07f7bd2 Diff: https://reviews.apache.org/r/31850/diff/ Testing --- Unit tests passed. Thanks, Jiangjie Qin
[jira] [Updated] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.
[ https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated KAFKA-2094: Description: After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Server side, I got this error [2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when processing fetch request for partition [lavender_0_1,0] offset 1687 from consumer with correlation id 5. Possible cause: Request for offset 1687 but we only have log segments in the range 0 to 107. (kafka.server.ReplicaManager) was: After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Kafka does not create topic automatically after deleting the topic. --- Key: KAFKA-2094 URL: https://issues.apache.org/jira/browse/KAFKA-2094 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: Ubuntu 14.04 LTS Reporter: Hyukjin Kwon Priority: Critical After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class
[jira] [Created] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.
Hyukjin Kwon created KAFKA-2094: --- Summary: Kafka does not create topic automatically after deleting the topic. Key: KAFKA-2094 URL: https://issues.apache.org/jira/browse/KAFKA-2094 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: Ubuntu 14.04 LTS Reporter: Hyukjin Kwon Priority: Critical After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ``` ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ``` ``` ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test ``` ``` bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.
[ https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated KAFKA-2094: Description: After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) was: After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ``` ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ``` ``` ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test ``` ``` bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) ``` Kafka does not create topic automatically after deleting the topic. --- Key: KAFKA-2094 URL: https://issues.apache.org/jira/browse/KAFKA-2094 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: Ubuntu 14.04 LTS Reporter: Hyukjin Kwon Priority: Critical After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException
[jira] [Updated] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.
[ https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hyukjin Kwon updated KAFKA-2094: Description: After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Server side, I got this error [2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when processing fetch request for partition [test,0] offset 1687 from consumer with correlation id 5. Possible cause: Request for offset 1687 but we only have log segments in the range 0 to 107. (kafka.server.ReplicaManager) was: After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:51:44,542] WARN Property topic is not valid (kafka.utils.VerifiableProperties) test [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata for topic test - No partition metadata for topic test due to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata for topic test- No partition metadata for topic testdue to kafka.common.LeaderNotAvailableException}] for topic [test]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo) [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test(kafka.producer.async.DefaultEventHandler) ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test [2015-04-06 20:52:08,733] WARN Property topic is not valid (kafka.utils.VerifiableProperties) Server side, I got this error [2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when processing fetch request for partition [lavender_0_1,0] offset 1687 from consumer with correlation id 5. Possible cause: Request for offset 1687 but we only have log segments in the range 0 to 107. (kafka.server.ReplicaManager) Kafka does not create topic automatically after deleting the topic. --- Key: KAFKA-2094 URL: https://issues.apache.org/jira/browse/KAFKA-2094 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.8.2.0 Environment: Ubuntu 14.04 LTS Reporter: Hyukjin Kwon Priority: Critical After I create a topic and then remove it (and wait for enough time to eventually delete it), it does not create a topic emitting errors even though auto-create topic option is true. It works okay when I manually create a topic after deleting it. Here is the command I run. ./bin/kafka-topics.sh --list --zookeeper 192.168.0.190:2181 test ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test bin/kafka-console-producer.sh --broker-list localhost:9092
Re: [jira] [Updated] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
Run Rao, Thanks for reviewing it again. What commands do you run to show unused imports? When I run grandle --daemon test, it did not show these (there were some warnings, but not unused imports). I thought that would have shown these things. Tong Li OpenStack Kafka Community Development Building 501/B205 liton...@us.ibm.com From: Jun Rao (JIRA) j...@apache.org To: dev@kafka.apache.org Date: 04/06/2015 12:48 AM Subject:[jira] [Updated] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils [ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-1926: --- Resolution: Fixed Fix Version/s: 0.8.3 Status: Resolved (was: Patch Available) Thanks for rebasing the latest patch. +1. Committed to trunk after removing unused imports in a few files. Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Assignee: Tong Li Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926_2015-04-01_22:16:46.patch, KAFKA-1926_2015-04-05_23:45:13.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Is there a complete Kafka 0.8.* replication design document
Hi, These days I have been focus on Kafka 0.8 replication design and found three replication design proposals from the wiki (according to the document, the V3 version is used in Kafka 0.8 release). But the v3 proposal is not complete and is inconsistent with the release. Is there a complete Kafka 0.8 replication design document? Here are part of my questions about Kafka 0.8 replication design. #1 According to V3, /brokers/topics/[topic]/[partition_id]/leaderAndISR stores leader and ISR of a partition. However in 0.8.2 release there is not such a znode, instead, it use /brokers/topics/[topic]/partitions/[partition_id]/state to store the leader and ISR of a partition. #2 In /brokers/topics/[topic], we can get all the ISR for all partitions in a certain topic, why we need /brokers/topics/[topic]/partitions/[partition_id]/state ? #3 I didn't find /admin/partitions_add/[topic]/[partition_id] and /admin/partitions_remove/[topic]/[partition_id] during my adding and removing partitions with bin/kafka-topics.sh. Is this deprecated in the 0.8 release? #4 I found these two znode under /admin only will be automaticall removed after the action complete. /admin/reassign_partitions/, /admin/preferred_replica_election/. But why this znode (/admin/delete_topic/) will not be removed automatically? #5 What's the LeaderAndISRCommand in Senario A in V3? Is that same with LeaderAndISRRequest? #6 For Senario D, when a certain broker becomes Controller, it will send a LeaderAndISRRequest to brokers with a special flag INIT. For Senario C, when the broker receive LeaderAndISRRequest with INIT flag, it will delete all local partitions not in set_p. Why we need to delete all local partitions for Controller changing? #7 For Senario E. Broker startup. The first step is read the replica assignment. Doesn't it need to add its id to /brokers/ids first? #8 Senario H. Add/remove partitions to an existing topic. In my test, I didn't found such znode for PartitionRemove Path/PartitionAdd Path in Zookeeper. Is this approach for partition adding/deleting deprecated? In fact, I didn't observe any znode change during my adding/deleting partitions. So what's the process of Kafka partition adding/deleting? #9 Senario G. seems not consistent with the release one Regards, Jason
[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils
[ https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481262#comment-14481262 ] Jun Rao commented on KAFKA-1926: Typically, IDE (e.g., intellij) shows unused imports. Replace kafka.utils.Utils with o.a.k.common.utils.Utils --- Key: KAFKA-1926 URL: https://issues.apache.org/jira/browse/KAFKA-1926 Project: Kafka Issue Type: Improvement Affects Versions: 0.8.2.0 Reporter: Jay Kreps Assignee: Tong Li Labels: newbie, patch Fix For: 0.8.3 Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926_2015-04-01_22:16:46.patch, KAFKA-1926_2015-04-05_23:45:13.patch There is currently a lot of duplication between the Utils class in common and the one in core. Our plan has been to deprecate duplicate code in the server and replace it with the new common code. As such we should evaluate each method in the scala Utils and do one of the following: 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose utility in active use that is not Kafka-specific. If we migrate it we should really think about the API and make sure there is some test coverage. A few things in there are kind of funky and we shouldn't just blindly copy them over. 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold any utilities that really need to make use of Scala features to be convenient. 3. Delete it if it is not used, or has a bad api. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Is there a complete Kafka 0.8.* replication design document
Yes, the wiki is a bit old. You can find out more about replication in the following links. http://kafka.apache.org/documentation.html#replication http://www.slideshare.net/junrao/kafka-replication-apachecon2013 #1, #2, #8. See the ZK layout in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper #3. Adding partitions is now done by updating /brokers/topics/[topic] directly. #4. For deleting a topic, the ZK path /admin/delete_topics/[topic_to_be_deleted] is created and removed after the deletion completes. #5 LeaderAndISRCommand should be the same as LeaderAndISRRequest. #6 This is to take care of partitions that have been deleted while the broker is down. The implementation doesn't rely on the special INIT flag. Instead, it expects the very first LeaderAndISRRequest to include all valid partitions. Local partitions not in that list will be deleted. #7 Only the controller needs to read the replica assignment. The controller can be started before the broker registers itself. This will be handled through ZK watchers. #9 The high level algorithm described there is still valid. For the implementation, you can take a look at ReplicaManager. Thanks, Jun On Mon, Apr 6, 2015 at 6:12 AM, Jason Guo (jguo2) jg...@cisco.com wrote: Hi, These days I have been focus on Kafka 0.8 replication design and found three replication design proposals from the wiki (according to the document, the V3 version is used in Kafka 0.8 release). But the v3 proposal is not complete and is inconsistent with the release. Is there a complete Kafka 0.8 replication design document? Here are part of my questions about Kafka 0.8 replication design. #1 According to V3, /brokers/topics/[topic]/[partition_id]/leaderAndISR stores leader and ISR of a partition. However in 0.8.2 release there is not such a znode, instead, it use /brokers/topics/[topic]/partitions/[partition_id]/state to store the leader and ISR of a partition. #2 In /brokers/topics/[topic], we can get all the ISR for all partitions in a certain topic, why we need /brokers/topics/[topic]/partitions/[partition_id]/state ? #3 I didn't find /admin/partitions_add/[topic]/[partition_id] and /admin/partitions_remove/[topic]/[partition_id] during my adding and removing partitions with bin/kafka-topics.sh. Is this deprecated in the 0.8 release? #4 I found these two znode under /admin only will be automaticall removed after the action complete. /admin/reassign_partitions/, /admin/preferred_replica_election/. But why this znode (/admin/delete_topic/) will not be removed automatically? #5 What's the LeaderAndISRCommand in Senario A in V3? Is that same with LeaderAndISRRequest? #6 For Senario D, when a certain broker becomes Controller, it will send a LeaderAndISRRequest to brokers with a special flag INIT. For Senario C, when the broker receive LeaderAndISRRequest with INIT flag, it will delete all local partitions not in set_p. Why we need to delete all local partitions for Controller changing? #7 For Senario E. Broker startup. The first step is read the replica assignment. Doesn't it need to add its id to /brokers/ids first? #8 Senario H. Add/remove partitions to an existing topic. In my test, I didn't found such znode for PartitionRemove Path/PartitionAdd Path in Zookeeper. Is this approach for partition adding/deleting deprecated? In fact, I didn't observe any znode change during my adding/deleting partitions. So what's the process of Kafka partition adding/deleting? #9 Senario G. seems not consistent with the release one Regards, Jason
[jira] [Created] (KAFKA-2095) ZKConfig uses hard coded value zookeeper.session.timeout.ms
Sriharsha Chintalapani created KAFKA-2095: - Summary: ZKConfig uses hard coded value zookeeper.session.timeout.ms Key: KAFKA-2095 URL: https://issues.apache.org/jira/browse/KAFKA-2095 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Assignee: Sriharsha Chintalapani -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2095) ZKConfig uses hard coded value zookeeper.session.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-2095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani resolved KAFKA-2095. --- Resolution: Invalid ZKConfig uses hard coded value zookeeper.session.timeout.ms --- Key: KAFKA-2095 URL: https://issues.apache.org/jira/browse/KAFKA-2095 Project: Kafka Issue Type: Bug Reporter: Sriharsha Chintalapani Assignee: Sriharsha Chintalapani -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2096) Enable keepalive socket option for broker
Allen Wang created KAFKA-2096: - Summary: Enable keepalive socket option for broker Key: KAFKA-2096 URL: https://issues.apache.org/jira/browse/KAFKA-2096 Project: Kafka Issue Type: Improvement Components: network Affects Versions: 0.8.2.1 Reporter: Allen Wang Assignee: Jun Rao Priority: Critical We run a Kafka 0.8.2.1 cluster in AWS with large number of producers ( 1). Also the number of producer instances scale up and down significantly on a daily basis. The issue we found is that after 10 days, the open file descriptor count will approach the limit of 32K. An investigation of these open file descriptors shows that a significant portion of these are from client instances that are terminated during scaling down. Somehow they still show as ESTABLISHED in netstat. We suspect that the AWS firewall between the client and broker causes this issue. We attempted to use keepalive socket option to reduce this socket leak on broker and it appears to be working. Specifically, we added this line to kafka.network.Acceptor.accept(): socketChannel.socket().setKeepAlive(true) It is confirmed during our experiment of this change that entries in netstat where the client instance is terminated were probed as configured in operating system. After configured number of probes, the OS determined that the peer is no longer alive and the entry is removed, possibly after an error in Kafka to read from the channel and closing the channel. Also, our experiment shows that after a few days, the instance was able to keep a stable low point of open file descriptor count, compared with other instances where the low point keeps increasing day to day. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2096) Enable keepalive socket option for broker to prevent socket leak
[ https://issues.apache.org/jira/browse/KAFKA-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Allen Wang updated KAFKA-2096: -- Summary: Enable keepalive socket option for broker to prevent socket leak (was: Enable keepalive socket option for broker) Enable keepalive socket option for broker to prevent socket leak Key: KAFKA-2096 URL: https://issues.apache.org/jira/browse/KAFKA-2096 Project: Kafka Issue Type: Improvement Components: network Affects Versions: 0.8.2.1 Reporter: Allen Wang Assignee: Jun Rao Priority: Critical We run a Kafka 0.8.2.1 cluster in AWS with large number of producers ( 1). Also the number of producer instances scale up and down significantly on a daily basis. The issue we found is that after 10 days, the open file descriptor count will approach the limit of 32K. An investigation of these open file descriptors shows that a significant portion of these are from client instances that are terminated during scaling down. Somehow they still show as ESTABLISHED in netstat. We suspect that the AWS firewall between the client and broker causes this issue. We attempted to use keepalive socket option to reduce this socket leak on broker and it appears to be working. Specifically, we added this line to kafka.network.Acceptor.accept(): socketChannel.socket().setKeepAlive(true) It is confirmed during our experiment of this change that entries in netstat where the client instance is terminated were probed as configured in operating system. After configured number of probes, the OS determined that the peer is no longer alive and the entry is removed, possibly after an error in Kafka to read from the channel and closing the channel. Also, our experiment shows that after a few days, the instance was able to keep a stable low point of open file descriptor count, compared with other instances where the low point keeps increasing day to day. -- This message was sent by Atlassian JIRA (v6.3.4#6332)