[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481521#comment-14481521
 ] 

Sriharsha Chintalapani commented on KAFKA-2082:
---

[~eapache] [~junrao] These are my observations so far
1) producer test connects to all the 5 brokers and sends messages. These 
brokers are connected to their respective zookeepers. I.e broker1 connects to 
localhost:2181 and broker2 connects to localhost:2182
2) producer test disables zookeeper connections for 2181 and 2183 (zookeeper1 , 
zookeeper2). broker1 and broker3 are deregistered from /broker/ids .
3) But broker1  still listening on their respective ports and sending 
responses. since they don't have any active connections to zookeeper they are 
returning stale data and still has older controller as the controller.
4) Hence the producer gets wrong leaders for the topic partitions from broker1

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of 

[jira] [Created] (KAFKA-2097) Implement request delays for quota violations

2015-04-06 Thread Aditya Auradkar (JIRA)
Aditya Auradkar created KAFKA-2097:
--

 Summary: Implement request delays for quota violations
 Key: KAFKA-2097
 URL: https://issues.apache.org/jira/browse/KAFKA-2097
 Project: Kafka
  Issue Type: Sub-task
Reporter: Aditya Auradkar
Assignee: Aditya Auradkar


As defined in the KIP, implement delays on a per-request basis for both 
producer and consumer. This involves either modifying the existing purgatory or 
adding a new delay queue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-04-06 Thread Dmitry Bugaychenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitry Bugaychenko updated KAFKA-2029:
--
Attachment: KAFKA-2029.patch

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical
 Attachments: KAFKA-2029.patch, KAFKA-2029.patch


 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
 solution: use bounded wait in rebalance thread*. KafkaController.scala:
 {code}
   // ODKL Patch to prevent deadlocks in shutdown.
   /**
* Execute the given function inside the lock
*/
   def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = {
 if (isRunning || lock.isHeldByCurrentThread) {
   // TODO: Configure timeout.
   if (!lock.tryLock(10, TimeUnit.SECONDS)) {
 throw new IllegalStateException(Failed to acquire controller lock in 
 10 seconds.);
   }

Review Request 32890: Patch for conrolled shutdown rebase on trunk

2015-04-06 Thread Dmitry Bugaychenko

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32890/
---

Review request for kafka.


Bugs: KAFKA-2029
https://issues.apache.org/jira/browse/KAFKA-2029


Repository: kafka


Description
---

Controlled shutdown patch


Diffs
-

  core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
97acdb23f6e95554c3e0357aa112eddfc875efbc 
  core/src/main/scala/kafka/controller/KafkaController.scala 
3a09377611b48198c4c3cd1a118fc12eda0543d4 
  core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 
3b15ab4eef22c6f50a7483e99a6af40fb55aca9f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
144a15e3a53d47fa972bef277c335da484842cf9 

Diff: https://reviews.apache.org/r/32890/diff/


Testing
---


Thanks,

Dmitry Bugaychenko



[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-04-06 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481465#comment-14481465
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
---

Created reviewboard https://reviews.apache.org/r/32890/diff/
 against branch origin/trunk

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical
 Attachments: KAFKA-2029.patch, KAFKA-2029.patch


 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
 solution: use bounded wait in rebalance thread*. KafkaController.scala:
 {code}
   // ODKL Patch to prevent deadlocks in shutdown.
   /**
* Execute the given function inside the lock
*/
   def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = {
 if (isRunning || lock.isHeldByCurrentThread) {
   // TODO: Configure timeout.
   if (!lock.tryLock(10, 

RE: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-06 Thread Aditya Auradkar
Hi Jay,

2. At this time, the proposed response format changes are only for 
monitoring/informing clients. As Jun mentioned, we get instance level 
monitoring in this case since each instance that got throttled will have a 
metric confirming the same. Without client level monitoring for this, it's hard 
for application developers to find if they are being throttled since they will 
also have to be aware of all the brokers in the cluster. This is quite 
problematic for large clusters.

It seems nice for app developers to not have to think about kafka internal 
metrics and only focus on the metrics exposed on their instances. Analogous to 
having client-sde request latency metrics. Basically, we want an easy way for 
clients to be aware if they are being throttled.

4. For purgatory v delay queue, I think we are on the same page. I feel it is 
nicer to use the purgatory but I'm happy to use a DelayQueue if there are 
performance implications. I don't know enough about the current and Yasuhiro's 
new implementation to be sure one way or the other.

Stepping back, I think these two things are the only remaining point of 
discussion within the current proposal. Any concerns if I started a voting 
thread on the proposal after the KIP discussion tomorrow? (assuming we reach 
consensus on these items)

Thanks,
Aditya

From: Jay Kreps [jay.kr...@gmail.com]
Sent: Saturday, April 04, 2015 1:36 PM
To: dev@kafka.apache.org
Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

Hey Aditya,

2. For the return flag I'm not terribly particular. If we want to add it
let's fully think through how it will be used. The only concern I have is
adding to the protocol without really thinking through the use cases. So
let's work out the APIs we want to add to the Java consumer and producer
and the use cases for how clients will make use of these. For my part I
actually don't see much use other than monitoring since it isn't an error
condition to be at your quota. And if it is just monitoring I don't see a
big enough difference between having the monitoring on the server-side
versus in the clients to justify putting it in the protocol. But I think
you guys may have other use cases in mind of how a client would make some
use of this? Let's work that out. I also don't feel strongly about it--it
wouldn't be *bad* to have the monitoring available on the client, just
doesn't seem that much better.

4. For the purgatory vs delay queue I think is arguably nicer to reuse the
purgatory we just have to be ultra-conscious of efficiency. I think our
goal is to turn quotas on across the board, so at LinkedIn that would mean
potentially every request will need a small delay. I haven't worked out the
efficiency implications of this choice, so as long as we do that I'm happy.

-Jay

On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Some responses to Jay's points.

 1. Using commas - Cool.

 2. Adding return flag - I'm inclined to agree with Joel that this is good
 to have in the initial implementation.

 3. Config - +1. I'll remove it from the KIP. We can discuss this in
 parallel.

 4. Purgatory vs Delay queue - I feel that it is simpler to reuse the
 existing purgatories for both delayed produce and fetch requests. IIUC, all
 we need for quotas is a minWait parameter for DelayedOperation (or
 something equivalent) since there is already a max wait. The completion
 criteria can check if minWait time has elapsed before declaring the
 operation complete. For this to impact performance, a significant number of
 clients may need to exceed their quota at the same time and even then I'm
 not very clear on the scope of the impact. Two layers of delays might add
 complexity to the implementation which I'm hoping to avoid.

 Aditya

 
 From: Joel Koshy [jjkosh...@gmail.com]
 Sent: Friday, April 03, 2015 12:48 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 Aditya, thanks for the updated KIP and Jay/Jun thanks for the
 comments. Couple of comments in-line:

  2. I would advocate for adding the return flag when we next bump the
  request format version just to avoid proliferation. I agree this is a
 good
  thing to know about, but at the moment I don't think we have a very well
  flushed out idea of how the client would actually make use of this info.
 I

 I'm somewhat inclined to having something appropriate off the bat -
 mainly because (i) clients really should know that they have been
 throttled (ii) a smart producer/consumer implementation would want to
 know how much to back off. So perhaps this and config-management
 should be moved to a separate discussion, but it would be good to have
 this discussion going and incorporated into the first quota
 implementation.

  3. Config--I think we need to generalize the topic stuff so we can
 override
  at multiple levels. We have topic and client, but I suspect user and
  

[jira] [Commented] (KAFKA-2029) Improving controlled shutdown for rolling updates

2015-04-06 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481477#comment-14481477
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
---

Rebased on trunk.

 Improving controlled shutdown for rolling updates
 -

 Key: KAFKA-2029
 URL: https://issues.apache.org/jira/browse/KAFKA-2029
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Neha Narkhede
Priority: Critical
 Attachments: KAFKA-2029.patch, KAFKA-2029.patch


 Controlled shutdown as implemented currently can cause numerous problems: 
 deadlocks, local and global datalos, partitions without leader and etc. In 
 some cases the only way to restore cluster is to stop it completelly using 
 kill -9 and start again.
 Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
 queue size makes things much worse (see discussion there).
 Note 2: The problems described here can occure in any setup, but they are 
 extremly painful in setup with large brokers (36 disks, 1000+ assigned 
 partitions per broker in our case).
 Note 3: These improvements are actually workarounds and it is worth to 
 consider global refactoring of the controller (make it single thread, or even 
 get rid of it in the favour of ZK leader elections for partitions).
 The problems and improvements are:
 # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
 shutdown requests and finally considers it as failed and procedes to unclean 
 shutdow, controller got stuck for a while (holding a lock waiting for free 
 space in controller-to-broker queue). After broker starts back it receives 
 followers request and erases highwatermarks (with a message that replica 
 does not exists - controller hadn't yet sent a request with replica 
 assignment), then controller starts replicas on the broker it deletes all 
 local data (due to missing highwatermarks). Furthermore, controller starts 
 processing pending shutdown request and stops replicas on the broker letting 
 it in a non-functional state. Solution to the problem might be to increase 
 time broker waits for controller reponse to shutdown request, but this 
 timeout is taken from controller.socket.timeout.ms which is global for all 
 broker-controller communication and setting it to 30 minutes is dangerous. 
 *Proposed solution: introduce dedicated config parameter for this timeout 
 with a high default*.
 # If a broker gets down during controlled shutdown and did not come back 
 controller got stuck in a deadlock (one thread owns the lock and tries to add 
 message to the dead broker's queue, send thread is a infinite loop trying to 
 retry message to the dead broker, and the broker failure handler is waiting 
 for a lock). There are numerous partitions without a leader and the only way 
 out is to kill -9 the controller. *Proposed solution: add timeout for adding 
 message to broker's queue*. ControllerChannelManager.sendRequest:
 {code}
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
 (RequestOrResponse) = Unit = null) {
 brokerLock synchronized {
   val stateInfoOpt = brokerStateInfo.get(brokerId)
   stateInfoOpt match {
 case Some(stateInfo) =
   // ODKL Patch: prevent infinite hang on trying to send message to a 
 dead broker.
   // TODO: Move timeout to config
   if (!stateInfo.messageQueue.offer((request, callback), 10, 
 TimeUnit.SECONDS)) {
 error(Timed out trying to send message to broker  + 
 brokerId.toString)
 // Do not throw, as it brings controller into completely 
 non-functional state
 // Controller to broker state change requests batch is not empty 
 while creating a new one
 //throw new IllegalStateException(Timed out trying to send 
 message to broker  + brokerId.toString)
   }
 case None =
   warn(Not sending request %s to broker %d, since it is 
 offline..format(request, brokerId))
   }
 }
   }
 {code}
 # When broker which is a controler starts shut down if auto leader rebalance 
 is running it deadlocks in the end (shutdown thread owns the lock and waits 
 for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
 solution: use bounded wait in rebalance thread*. KafkaController.scala:
 {code}
   // ODKL Patch to prevent deadlocks in shutdown.
   /**
* Execute the given function inside the lock
*/
   def inLockIfRunning[T](lock: ReentrantLock)(fun: = T): T = {
 if (isRunning || lock.isHeldByCurrentThread) {
   // TODO: Configure timeout.
   if (!lock.tryLock(10, TimeUnit.SECONDS)) {
 throw new IllegalStateException(Failed to acquire 

[jira] [Commented] (KAFKA-1621) Standardize --messages option in perf scripts

2015-04-06 Thread Rekha Joshi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481865#comment-14481865
 ] 

Rekha Joshi commented on KAFKA-1621:


Hi.missed updating earlier. everything else was setup as 
https://cwiki.apache.org/confluence/display/KAFKA/Patch+submission+and+review 
but the jira-client could not be installed (Mac 10.9.5, Python 2.7.5, pip 
1.5.6), hence could not update review board.
suggestion: why not review directly on git as rest of the apache projects?

python kafka-patch-review.py --help
Traceback (most recent call last):
  File kafka-patch-review.py, line 10, in module
from jira.client import JIRA
ImportError: No module named jira.client

pip install jira-python
Downloading/unpacking jira-python
  Could not find any downloads that satisfy the requirement jira-python
Cleaning up...
No distributions at all found for jira-python
Storing debug log for failure in /usr/local/.pip/pip.log

sudo easy_install jira-python
Searching for jira-python
Reading http://pypi.python.org/simple/jira-python/
Couldn't find index page for 'jira-python' (maybe misspelled?)
Scanning index of all packages (this may take a while)
Reading http://pypi.python.org/simple/
No local packages or download links found for jira-python
error: Could not find suitable distribution for Requirement.parse('jira-python')

Think it could be related to security enabled on pip?, but I would prefer not 
to downgrade pip as used for my other projects as well.

 Standardize --messages option in perf scripts
 -

 Key: KAFKA-1621
 URL: https://issues.apache.org/jira/browse/KAFKA-1621
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Jay Kreps
  Labels: newbie

 This option is specified in PerfConfig and is used by the producer, consumer 
 and simple consumer perf commands. The docstring on the argument does not 
 list it as required but the producer performance test requires it--others 
 don't.
 We should standardize this so that either all the commands require the option 
 and it is marked as required in the docstring or none of them list it as 
 required.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-06 Thread Rekha Joshi (JIRA)
Rekha Joshi created KAFKA-2098:
--

 Summary: Gradle Wrapper Jar gone missing in 0.8.2.1
 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi


./gradlew idea
Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain

This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481843#comment-14481843
 ] 

Jun Rao commented on KAFKA-2082:


After step 2), which broker is the new controller? The new controller should be 
able to detect that broker 1 and 3 are dead and move the leaders to other 
brokers. So, was the issue that the new broker didn't detect the failure of 
broker 1 and 3 properly or it didn't propagate the new metadata properly.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never cuts more than two connections at a time, so 
 zookeeper should always have quorum, and the topic (with three replicas) 
 should always be writable.
 Completely restarting the cluster 

[jira] [Updated] (KAFKA-2043) CompressionType is passed in each RecordAccumulator append

2015-04-06 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2043:
---
   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1 and committed to trunk.

 CompressionType is passed in each RecordAccumulator append
 --

 Key: KAFKA-2043
 URL: https://issues.apache.org/jira/browse/KAFKA-2043
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.0
Reporter: Grant Henke
Assignee: Grant Henke
Priority: Minor
 Fix For: 0.8.3

 Attachments: KAFKA-2043.patch, KAFKA-2043_2015-03-25_13:28:52.patch


 Currently org.apache.kafka.clients.producer.internals.RecordAccumulator 
 append method accepts the compressionType on a per record basis. It looks 
 like the code would only work on a per batch basis because the 
 CompressionType is only used when creating a new RecordBatch. My 
 understanding is this should only support setting per batch at most. 
 public RecordAppendResult append(TopicPartition tp, byte[] key, byte[] 
 value, CompressionType compression, Callback callback) throws 
 InterruptedException;
 The compression type is a producer
 level config. Instead of passing it in for each append, we probably should
 just pass it in once during the creation RecordAccumulator.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2078) Getting Selector [WARN] Error in I/O with host java.io.EOFException

2015-04-06 Thread Aravind (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481617#comment-14481617
 ] 

Aravind commented on KAFKA-2078:


Yes, I could reproduce this with just 1 broker as well. As I have created a 
topic with single partition and no replication factor.

Thanks!

 Getting Selector [WARN] Error in I/O with host java.io.EOFException
 ---

 Key: KAFKA-2078
 URL: https://issues.apache.org/jira/browse/KAFKA-2078
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
 Environment: OS Version: 2.6.39-400.209.1.el5uek and Hardware: 8 x 
 Intel(R) Xeon(R) CPU X5660  @ 2.80GHz/44GB
Reporter: Aravind
Assignee: Jun Rao

 When trying to Produce 1000 (10 MB) messages, getting this below error some 
 where between 997 to 1000th message. There is no pattern but able to 
 reproduce.
 [PDT] 2015-03-31 13:53:50 Selector [WARN] Error in I/O with our host 
 java.io.EOFException at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
  at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at 
 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at 
 java.lang.Thread.run(Thread.java:724)
 This error I am getting some times @ 997th message or 999th message. There is 
 no pattern but able to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2078) Getting Selector [WARN] Error in I/O with host java.io.EOFException

2015-04-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481861#comment-14481861
 ] 

Jun Rao commented on KAFKA-2078:


What kind of network do you have? Do you have any load balancer or firewalls? 
Could you reproduce the issue on a localhost?

 Getting Selector [WARN] Error in I/O with host java.io.EOFException
 ---

 Key: KAFKA-2078
 URL: https://issues.apache.org/jira/browse/KAFKA-2078
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2.0
 Environment: OS Version: 2.6.39-400.209.1.el5uek and Hardware: 8 x 
 Intel(R) Xeon(R) CPU X5660  @ 2.80GHz/44GB
Reporter: Aravind
Assignee: Jun Rao

 When trying to Produce 1000 (10 MB) messages, getting this below error some 
 where between 997 to 1000th message. There is no pattern but able to 
 reproduce.
 [PDT] 2015-03-31 13:53:50 Selector [WARN] Error in I/O with our host 
 java.io.EOFException at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
  at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at 
 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at 
 java.lang.Thread.run(Thread.java:724)
 This error I am getting some times @ 997th message or 999th message. There is 
 no pattern but able to reproduce.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2096) Enable keepalive socket option for broker to prevent socket leak

2015-04-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481890#comment-14481890
 ] 

Jun Rao commented on KAFKA-2096:


[~allenxwang], that seems to be a good fix. Do you want to submit a patch?

 Enable keepalive socket option for broker to prevent socket leak
 

 Key: KAFKA-2096
 URL: https://issues.apache.org/jira/browse/KAFKA-2096
 Project: Kafka
  Issue Type: Improvement
  Components: network
Affects Versions: 0.8.2.1
Reporter: Allen Wang
Assignee: Jun Rao
Priority: Critical

 We run a Kafka 0.8.2.1 cluster in AWS with large number of producers ( 
 1). Also the number of producer instances scale up and down significantly 
 on a daily basis.
 The issue we found is that after 10 days, the open file descriptor count will 
 approach the limit of 32K. An investigation of these open file descriptors 
 shows that a significant portion of these are from client instances that are 
 terminated during scaling down. Somehow they still show as ESTABLISHED in 
 netstat. We suspect that the AWS firewall between the client and broker 
 causes this issue.
 We attempted to use keepalive socket option to reduce this socket leak on 
 broker and it appears to be working. Specifically, we added this line to 
 kafka.network.Acceptor.accept():
   socketChannel.socket().setKeepAlive(true)
 It is confirmed during our experiment of this change that entries in netstat 
 where the client instance is terminated were probed as configured in 
 operating system. After configured number of probes, the OS determined that 
 the peer is no longer alive and the entry is removed, possibly after an error 
 in Kafka to read from the channel and closing the channel. Also, our 
 experiment shows that after a few days, the instance was able to keep a 
 stable low point of open file descriptor count, compared with other instances 
 where the low point keeps increasing day to day.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: KafkaPreCommit #59

2015-04-06 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/59/changes

Changes:

[junrao] kafka-2043; CompressionType is passed in each RecordAccumulator 
append; patched by Grant Henke; reviewed by Jun Rao

--
[...truncated 2129 lines...]
kafka.api.test.ProducerCompressionTest  testCompression[1] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[2] PASSED

kafka.api.test.ProducerCompressionTest  testCompression[3] PASSED

kafka.network.SocketServerTest  simpleRequest PASSED

kafka.network.SocketServerTest  tooBigRequestIsRejected PASSED

kafka.network.SocketServerTest  testNullResponse PASSED

kafka.network.SocketServerTest  testSocketsCloseOnShutdown PASSED

kafka.network.SocketServerTest  testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest  testMaxConnectionsPerIPOverrides PASSED

kafka.log.OffsetIndexTest  truncate PASSED

kafka.log.OffsetIndexTest  randomLookupTest PASSED

kafka.log.OffsetIndexTest  lookupExtremeCases PASSED

kafka.log.OffsetIndexTest  appendTooMany PASSED

kafka.log.OffsetIndexTest  appendOutOfOrder PASSED

kafka.log.OffsetIndexTest  testReopen PASSED

kafka.log.OffsetMapTest  testBasicValidation PASSED

kafka.log.OffsetMapTest  testClear PASSED

kafka.log.CleanerTest  testCleanSegments PASSED

kafka.log.CleanerTest  testCleaningWithDeletes PASSED

kafka.log.CleanerTest  testCleaningWithUnkeyedMessages PASSED

kafka.log.CleanerTest  testCleanSegmentsWithAbort PASSED

kafka.log.CleanerTest  testSegmentGrouping PASSED

kafka.log.CleanerTest  testSegmentGroupingWithSparseOffsets PASSED

kafka.log.CleanerTest  testBuildOffsetMap PASSED

kafka.log.FileMessageSetTest  testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest  testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest  testSizeInBytes PASSED

kafka.log.FileMessageSetTest  testWriteTo PASSED

kafka.log.FileMessageSetTest  testFileSize PASSED

kafka.log.FileMessageSetTest  testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest  testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest  testRead PASSED

kafka.log.FileMessageSetTest  testSearch PASSED

kafka.log.FileMessageSetTest  testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest  testTruncate PASSED

kafka.log.LogTest  testTimeBasedLogRoll PASSED

kafka.log.LogTest  testTimeBasedLogRollJitter PASSED

kafka.log.LogTest  testSizeBasedLogRoll PASSED

kafka.log.LogTest  testLoadEmptyLog PASSED

kafka.log.LogTest  testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest  testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest  testReadAtLogGap PASSED

kafka.log.LogTest  testReadOutOfRange PASSED

kafka.log.LogTest  testLogRolls PASSED

kafka.log.LogTest  testCompressedMessages PASSED

kafka.log.LogTest  testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest  testMessageSetSizeCheck PASSED

kafka.log.LogTest  testCompactedTopicConstraints PASSED

kafka.log.LogTest  testMessageSizeCheck PASSED

kafka.log.LogTest  testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest  testIndexRebuild PASSED

kafka.log.LogTest  testTruncateTo PASSED

kafka.log.LogTest  testIndexResizingAtTruncation PASSED

kafka.log.LogTest  testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest  testReopenThenTruncate PASSED

kafka.log.LogTest  testAsyncDelete PASSED

kafka.log.LogTest  testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest  testAppendMessageWithNullPayload PASSED

kafka.log.LogTest  testCorruptLog PASSED

kafka.log.LogTest  testCleanShutdownFile PASSED

kafka.log.LogTest  testParseTopicPartitionName PASSED

kafka.log.LogTest  testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest  testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingTopic PASSED

kafka.log.LogTest  testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest  testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest  

[jira] [Comment Edited] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482042#comment-14482042
 ] 

Sriharsha Chintalapani edited comment on KAFKA-2082 at 4/6/15 10:21 PM:


[~eapache] 
 I think the problem here is how the test is operating. In general users will 
configure each broker with a zookeeper connection that specifies multiple hosts 
like localhost:2181,localhost:2182,localhost:2183 .
If there is connection loss that might be due to some issue in broker ( jvm 
garbage collection for example) in this case new controller will be elected and 
when this broker comes back up it goes through oncontrollerresignation. It get 
a updateMetadataRequest to update metadata cache so that the requests get 
latest metadata.
Another case would be one of the zookeeper node going down which is fine as 
there are other zookeeper nodes to serve the kafka brokers.

But in this particular test each broker connected to individual zookeeper host 
which are in a quorum. When you disable zk1, broker 1 disconnects which causes 
/broker/ids/1 to get deleted as its a ephemeral node. 

New controller will be elected and it goes through 
KafkaController.onControllerFailover


sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set(topicAndPartition))

but liveOrShuttingDownBrokerIds won't have all the brokers in the list in this 
specific case ,as the /brokes/ids are ephemeral nodes they get deleted if the 
client disconnects.  But the producer still connecting to broker1 which never 
got a updateMetadataRequest so its still serving stale data.



was (Author: sriharsha):
[~eapache] 
 I think the problem here is how the test is operating. In general users will 
configure each broker with a zookeeper connection that specifies multiple hosts 
like localhost:2181,localhost:2182,localhost:2183 .
If there is connection loss that might be due to some issue in broker ( jvm 
garbage collection for example) in this case new controller will be elected and 
when this broker comes back up it goes through oncontrollerresignation. It get 
a updateMetadataRequest to update metadata cache so that the requests get 
latest metadata.
Another case would be one of the zookeeper node going down which is fine as 
there are other zookeeper nodes to serve the kafka brokers.

But in this particular test each broker connected to individual zookeeper host 
which are in a quorum. When you disable zk1, broker 1 disconnects which causes 
/broker/ids/1 to get deleted as its a ephemeral node. 

New controller will be elected and it goes through 
KafkaController.onPartitionReassignment

//12. After electing leader, the replicas and isr information changes, 
so resend the update metadata request to every broker

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set(topicAndPartition))

but liveOrShuttingDownBrokerIds won't have all the brokers in the list in this 
specific case ,as the /brokes/ids are ephemeral nodes they get deleted if the 
client disconnects.  But the producer still connecting to broker1 which never 
got a updateMetadataRequest so its still serving stale data.


 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 

[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481931#comment-14481931
 ] 

Sriharsha Chintalapani commented on KAFKA-2082:
---

[~junrao]

After step 2  I noticed the controller change leaders for 1 and 3 and now the 
all the partitions have leader either 2,4,5. New controller did detect the 
failure of the brokers 1 and 3. But the client(producer) still connecting to 
the older broker , in this case 1 and 3 getting stale metadata response.  

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never cuts more than two connections at a time, so 
 zookeeper should always have quorum, and the topic (with three replicas) 
 should always be 

Re: Few questions regarding KAFKA-1928 (reusing common network code in core)

2015-04-06 Thread Jun Rao
1. The controller - broker communication doesn't have to be blocking. It
just needs to call a callback on completion of a request. We can leave this
out for now. It just means that we can't run the inter cluster
communication through non-plaintext port until we port this code to the
common network code. Similarly, currently, the replica fetcher code
currently uses SimpleConsumer. We need to change that code too to support
non-plaintext port across brokers. Haven't thought about this much. So, not
sure whether it's better to replace the code with the new consumer client
or with some special code over the common network client.

2. Yes, it does seem that Receive needs to expose some kind of api to get
the ByteBuffer as we do in the Receiver in scala. Jay probably can comment
on this better.

Thanks,

Jun

On Mon, Apr 6, 2015 at 2:55 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi,

 I started work on converting core to use common network code and I ran into
 two questions that hopefully someone can help me with:

 1. BlockingChannel - Looks like the inter-broker communication is mostly
 through blocking channel. There's no client equivalent (i.e client code is
 all non-blocking). I was planning to leave this as is, but let me know if
 the intention was to make all this communication non-blocking.

 2. Receive  ByteBufferReceive - It looks like ByteBufferReceive is not
 used anywhere. What was it planned for?
 NetworkReceive implements payload() method that returns the backing buffer.
 It is used all over the place (to create requests, responses, etc). Because
 ByteBufferReceive doesn't implement payload() (and its not clear how it can
 implement it), this means that payload() is not part of the Receive
 interface, although I'm pretty sure it should be there.
 Some explanation about the Receive interface and what is ByteBufferReceive
 for will help.

 Gwen



[jira] [Commented] (KAFKA-2059) ZookeeperConsumerConnectorTest.testBasic trasient failure

2015-04-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482085#comment-14482085
 ] 

Guozhang Wang commented on KAFKA-2059:
--

I saw this error once out of 100 runs, and on my machine each run takes about 
12 - 15 minutes.

 ZookeeperConsumerConnectorTest.testBasic trasient failure
 -

 Key: KAFKA-2059
 URL: https://issues.apache.org/jira/browse/KAFKA-2059
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
  Labels: newbie

 {code}
 kafka.javaapi.consumer.ZookeeperConsumerConnectorTest  testBasic FAILED
 kafka.common.InconsistentBrokerIdException: Configured brokerId 1 doesn't 
 match stored brokerId 0 in meta.properties
 at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:443)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:116)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:126)
 at 
 kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57)
 at 
 kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:57)
 at 
 kafka.javaapi.consumer.ZookeeperConsumerConnectorTest.setUp(ZookeeperConsumerConnectorTest.scala:41)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2086) testRoundRobinPartitionAssignor transient failure

2015-04-06 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-2086.

Resolution: Duplicate

Yes. Closing this one.

 testRoundRobinPartitionAssignor transient failure
 -

 Key: KAFKA-2086
 URL: https://issues.apache.org/jira/browse/KAFKA-2086
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao

 Saw the following transient unit failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.NullPointerException
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-2086) testRoundRobinPartitionAssignor transient failure

2015-04-06 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao closed KAFKA-2086.
--

 testRoundRobinPartitionAssignor transient failure
 -

 Key: KAFKA-2086
 URL: https://issues.apache.org/jira/browse/KAFKA-2086
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao

 Saw the following transient unit failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.NullPointerException
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Keep docs updated per jira

2015-04-06 Thread Jun Rao
It seems that everyone is in favor of keeping the docs updated in each jira.

As for the next step, anyone wants to help figure out if it's possible to
move the website to git?

Thanks,

Jun

On Tue, Mar 31, 2015 at 10:03 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 +1

 On Tue, Mar 31, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
  Yeah the protocol should probably move off the wiki and into the
  release-versioned docs.
 
  -Jay
 
  On Mon, Mar 30, 2015 at 5:17 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Also for the wikis - those should probably correspond to the latest
  released version right? So for e.g., if we add or modify the protocol
  on trunk we can add it to the wiki but mark it with some highlight or
  similar just to make it clear that it is a change on trunk only.
 
  Thanks,
 
  Joel
 
  On Thu, Mar 26, 2015 at 06:27:25PM -0700, Jun Rao wrote:
   Hi, Everyone,
  
   Quite a few jiras these days require documentation changes (e.g., wire
   protocol, ZK layout, configs, jmx, etc). Historically, we have been
   updating the documentation just before we do a release. The issue is
 that
   some of the changes will be missed since they were done a while back.
   Another way to do that is to keep the docs updated as we complete each
   jira. Currently, our documentations are in the following places.
  
   wire protocol:
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
   ZK layout:
  
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
   configs/jmx: https://svn.apache.org/repos/asf/kafka/site/083
  
   We probably don't need to update configs already ported to ConfigDef
  since
   they can be generated automatically. However, for the rest of the doc
   related changes, keeping they updated per jira seems a better
 approach.
   What do people think?
  
   Thanks,
  
   Jun
 
 



[GitHub] kafka pull request: KAFKA-2098: gradle files

2015-04-06 Thread rekhajoshm
GitHub user rekhajoshm opened a pull request:

https://github.com/apache/kafka/pull/54

KAFKA-2098: gradle files

gradle files, tiny footprint.lets have it in.thanks

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/kafka KAFKA-2098

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/54.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #54


commit be7dc1933d2c266d15c46a06d057686c8d48e520
Author: Joshi rekhajo...@gmail.com
Date:   2015-04-06T21:02:13Z

KAFKA-2098: gradle files




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481936#comment-14481936
 ] 

ASF GitHub Bot commented on KAFKA-2098:
---

GitHub user rekhajoshm opened a pull request:

https://github.com/apache/kafka/pull/54

KAFKA-2098: gradle files

gradle files, tiny footprint.lets have it in.thanks

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rekhajoshm/kafka KAFKA-2098

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/54.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #54


commit be7dc1933d2c266d15c46a06d057686c8d48e520
Author: Joshi rekhajo...@gmail.com
Date:   2015-04-06T21:02:13Z

KAFKA-2098: gradle files




 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482058#comment-14482058
 ] 

Evan Huus commented on KAFKA-2082:
--

What if (in a normally set up cluster) a broker becomes completely isolated 
from all zk nodes and all other brokers? If I understand correctly, effectively 
the same bug will occur, as the isolated broker will serve stale metadata.

So I think that regardless of how the test is set up, a broker which does not 
have a zookeeper connection should refuse to serve metadata requests.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never cuts more than two connections at a time, so 
 zookeeper should always have quorum, and the topic (with 

Re: Review Request 31568: Patch for KAFKA-1989

2015-04-06 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78668
---


Thanks for the new patch. Now that I understood the logic better, I think this 
is a really smart implementation. A few more comments below.


core/src/main/scala/kafka/server/DelayedOperation.scala
https://reviews.apache.org/r/31568/#comment128118

We probably should call forceComplete() first and only if it returns true, 
run onExpiration().



core/src/main/scala/kafka/server/DelayedOperation.scala
https://reviews.apache.org/r/31568/#comment128145

Java executor service by default eats all unhandled exceptions. For easier 
debugging, we will need to add an UncaughtExceptionHandler and log an error of 
the exception. See KafkaSchedule.startup() and Utils.newThread().



core/src/main/scala/kafka/server/DelayedOperation.scala
https://reviews.apache.org/r/31568/#comment128129

Perhaps we should trigger a purge if the number of total purgible 
operations (instead of the number of unque purgible operations) is more than 
the purgeInternal. This can be estimated as 
watched/estimatedTotalOperations.get * (estimatedTotalOperations.get - delayed).



core/src/main/scala/kafka/utils/timer/Timer.scala
https://reviews.apache.org/r/31568/#comment128139

Is Timer too general a name? Should we rename it to sth like 
DelayedOperationTimer?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
https://reviews.apache.org/r/31568/#comment128109

For the hierachical part, let's say that u is 1 and n is 2. If current time 
is c, then the buckets at different levels are:

levelbuckets
1[c,c]   [c+1,c+1]
2[c,c+1] [c+2,c+3]
3[c,c+3] [c+4,c+7]

So, at any given point of time, [c,c+1] at level 2 and [c,c+3] at level 3 
will never be used since those buckets are already covered in the lower level.

This seems a bit wasteful. To remove that waste, we could choose to statt 
the 2nd level at c+2 and the 3rd level at c+6, etc. Do we choose to use the 
same currernt time as the start time at all levels for simplicity? If so, this 
is probably fine since the larger the n, the less the waste. However, it's 
probably worth documenting that the buckets at different levels can overlap?



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
https://reviews.apache.org/r/31568/#comment127579

A overflow = An overflow

in a overflow = in an overflow



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
https://reviews.apache.org/r/31568/#comment127580

moved the finer = moved to the finer



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
https://reviews.apache.org/r/31568/#comment128112

It would be useful to add that the timing wheel implementation is used to 
optimize the common case when operations are completed before they time out.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala
https://reviews.apache.org/r/31568/#comment128117

I was initially puzzled on how we synchronize btw add() and advanceClock(). 
Then I realized that the synchronization is actually done in the caller in 
Timer. I was thinking of how to make this part clearer. One way is to add the 
documentation here. Another way is to move the read/write lock from Timer to 
here, and pass in needed data structures like delayQueue.



core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
https://reviews.apache.org/r/31568/#comment128146

should have 5 = should have 6



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala
https://reviews.apache.org/r/31568/#comment128148

Actually, where is the logic to not increment the counter on reinserting 
existing tasks? TimerTaskList.add() seems to always increment the counter.


- Jun Rao


On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31568/
 ---
 
 (Updated April 1, 2015, 8:50 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1989
 https://issues.apache.org/jira/browse/KAFKA-1989
 
 
 Repository: kafka
 
 
 Description
 ---
 
 new purgatory implementation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/DelayedOperation.scala 
 e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
   

Re: Chicken and egg problem when initializing SocketServer

2015-04-06 Thread Ewen Cheslack-Postava
Gwen,

That sounds about right to me. I've found that when ports are bound
dynamically, RAII-style greatly simplifies things since if you've bound
them as early as possible, then you'll always be able to get at the port
even if nothing is accept()ing yet.

-Ewen

On Sat, Apr 4, 2015 at 8:27 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Thanks. I think I found a solution that doesn't take new locks or buffers:

 1. Change the portProtocol map to ConcurrentHashMap (which makes it both
 thread-safe and mutable). Initialize as empty map.
 2. Initialize both processors and acceptors with the hashmap, also tell
 each acceptor which protocol it is accepting
 3. After the acceptors bind() but before run(), register them with the
 hashmap.

 Anyone sees flaws here?

 On Sat, Apr 4, 2015 at 7:21 PM, Ashish Singh asi...@cloudera.com wrote:

  Hi Gwen,
 
  Can we buffer the requests that arrive before acceptors are up. Once we
  have the acceptors, the buffer can be cleared. However, this adds on to
 the
  complexity.
 
  On Sunday, April 5, 2015, Gwen Shapira gshap...@cloudera.com wrote:
 
   Hi,
  
   I'm trying to rebase KAFKA-1809 after KAFKA-1501 was committed, and I'm
   running into a chicken and egg problem.
  
   In my design, SocketServer creates a map of port-SecurityProtocol.
   This map is passed to Processor when it is initialized, and Processor
  uses
   it to route requests to the correct channel.
  
   However, after KAFKA-1501, we may not know the ports until after
  Acceptors
   are started, and we want to start Acceptors after Processors are
 already
   running...
  
   One solution is that if Processor finds that all keys in the map are
 0,
   it will route all requests to PLAINTEXT channel. However, this will
  prevent
   us from using the new port-binding test code when testing TLS and SASL,
   bringing the whole port randomization problem back.
  
   Another option is new flag that will allow us to bind Acceptors before
   starting Processors but not accept anything until this flag is set,
 after
   the Processors started? This seems a bit tricky though.
  
   Anyone has better ideas?
  
   Gwen
  
 
 
  --
 
  Regards,
  Ashish
 




-- 
Thanks,
Ewen


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481981#comment-14481981
 ] 

Evan Huus commented on KAFKA-2082:
--

So I guess the producer should not be connecting to broker that isn't the 
controller? I don't know how a non-jvm client is supposed to detect that case 
as it doesn't appear to be exposed in the protocol. Or should a broker which is 
completely disconnected from zookeeper refuse to serve metadata requests?

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never cuts more than two connections at a time, so 
 zookeeper should always have quorum, and the topic (with three replicas) 
 should always be writable.
 Completely restarting the 

Few questions regarding KAFKA-1928 (reusing common network code in core)

2015-04-06 Thread Gwen Shapira
Hi,

I started work on converting core to use common network code and I ran into
two questions that hopefully someone can help me with:

1. BlockingChannel - Looks like the inter-broker communication is mostly
through blocking channel. There's no client equivalent (i.e client code is
all non-blocking). I was planning to leave this as is, but let me know if
the intention was to make all this communication non-blocking.

2. Receive  ByteBufferReceive - It looks like ByteBufferReceive is not
used anywhere. What was it planned for?
NetworkReceive implements payload() method that returns the backing buffer.
It is used all over the place (to create requests, responses, etc). Because
ByteBufferReceive doesn't implement payload() (and its not clear how it can
implement it), this means that payload() is not part of the Receive
interface, although I'm pretty sure it should be there.
Some explanation about the Receive interface and what is ByteBufferReceive
for will help.

Gwen


[jira] [Commented] (KAFKA-1910) Refactor KafkaConsumer

2015-04-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482084#comment-14482084
 ] 

Guozhang Wang commented on KAFKA-1910:
--

[~jjkoshy]:

1. No the error code change was not necessary, I originally just add it for 
OffsetCommitTest, hence it only checks NoError code for the case where no 
offsets is fetchable.
2. This change has been reverted (by accident while doing rebase) in 
KAFKA-1634, I can submit a follow-up patch to delete the exception class and 
the error code from mapping.

 Refactor KafkaConsumer
 --

 Key: KAFKA-1910
 URL: https://issues.apache.org/jira/browse/KAFKA-1910
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.8.3

 Attachments: KAFKA-1910.patch, KAFKA-1910.patch, 
 KAFKA-1910_2015-03-05_14:55:33.patch


 KafkaConsumer now contains all the logic on the consumer side, making it a 
 very huge class file, better re-factoring it to have multiple layers on top 
 of KafkaClient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-04-06 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482207#comment-14482207
 ] 

Gwen Shapira commented on KAFKA-1809:
-

Sorry for missing that [~harsha_ch]. I agree that having file, object and 
method names match will make life easier :)
Opened a JIRA for that.

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809-DOCs.V1.patch, KAFKA-1809.patch, 
 KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch, 
 KAFKA-1809_2015-03-27_15:04:10.patch, KAFKA-1809_2015-04-02_15:12:30.patch, 
 KAFKA-1809_2015-04-02_19:03:58.patch, KAFKA-1809_2015-04-04_22:00:13.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-2099:
---

 Summary: BrokerEndPoint file, methods and object names should match
 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira


[~harsha_ch] commented on KAFKA-1809:
We've BrokerEndPoint.scala but internally object called object BrokerEndpoint 
with a smallcase p and method createBrokerEndPoint . If possible can we get 
these consistent in another JIRA.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2086) testRoundRobinPartitionAssignor transient failure

2015-04-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481962#comment-14481962
 ] 

Guozhang Wang commented on KAFKA-2086:
--

Is this a duplicate of KAFKA-2056?

 testRoundRobinPartitionAssignor transient failure
 -

 Key: KAFKA-2086
 URL: https://issues.apache.org/jira/browse/KAFKA-2086
 Project: Kafka
  Issue Type: Sub-task
  Components: core
Affects Versions: 0.8.3
Reporter: Jun Rao

 Saw the following transient unit failure.
 unit.kafka.consumer.PartitionAssignorTest  testRoundRobinPartitionAssignor 
 FAILED
 java.lang.NullPointerException
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:173)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify$1.apply(PartitionAssignorTest.scala:172)
 at scala.collection.immutable.List.foreach(List.scala:381)
 at 
 unit.kafka.consumer.PartitionAssignorTest$.unit$kafka$consumer$PartitionAssignorTest$$assignAndVerify(PartitionAssignorTest.scala:172)
 at 
 unit.kafka.consumer.PartitionAssignorTest$$anonfun$testRoundRobinPartitionAssignor$1.apply$mcVI$sp(PartitionAssignorTest.scala:54)
 at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:166)
 at 
 unit.kafka.consumer.PartitionAssignorTest.testRoundRobinPartitionAssignor(PartitionAssignorTest.scala:39)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482042#comment-14482042
 ] 

Sriharsha Chintalapani commented on KAFKA-2082:
---

[~eapache] 
 I think the problem here is how the test is operating. In general users will 
configure each broker with a zookeeper connection that specifies multiple hosts 
like localhost:2181,localhost:2182,localhost:2183 .
If there is connection loss that might be due to some issue in broker ( jvm 
garbage collection for example) in this case new controller will be elected and 
when this broker comes back up it goes through oncontrollerresignation. It get 
a updateMetadataRequest to update metadata cache so that the requests get 
latest metadata.
Another case would be one of the zookeeper node going down which is fine as 
there are other zookeeper nodes to serve the kafka brokers.

But in this particular test each broker connected to individual zookeeper host 
which are in a quorum. When you disable zk1, broker 1 disconnects which causes 
/broker/ids/1 to get deleted as its a ephemeral node. 

New controller will be elected and it goes through 
KafkaController.onPartitionReassignment

//12. After electing leader, the replicas and isr information changes, 
so resend the update metadata request to every broker

sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, 
Set(topicAndPartition))

but liveOrShuttingDownBrokerIds won't have all the brokers in the list in this 
specific case ,as the /brokes/ids are ephemeral nodes they get deleted if the 
client disconnects.  But the producer still connecting to broker1 which never 
got a updateMetadataRequest so its still serving stale data.


 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the 

Re: Review Request 31366: Patch for KAFKA-1461

2015-04-06 Thread Guozhang Wang


 On March 24, 2015, 10:46 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala, lines 81-86
  https://reviews.apache.org/r/31366/diff/2/?file=898415#file898415line81
 
  Jun has a comment about the case when all partitions gets inactive, 
  which is common when the fetched broker has been just gone through leader 
  migration.
  
  We can move the foreach statement before the if statement, and after 
  foreach check if any partitions gets added, if not just backoff for 
  fetchBackoffMs.
 
 Sriharsha Chintalapani wrote:
 Thanks for the review. Are you looking at something like this. This 
 wouldn't handle if we have partitionMap populated but all of them are 
 inactive.
 
   partitionMap.foreach {
 case((topicAndPartition, partitionFetchState)) =
   if(partitionFetchState.isActive)
 fetchRequestBuilder.addFetch(topicAndPartition.topic, 
 topicAndPartition.partition,
   partitionFetchState.offset, fetchSize)
   }
   if (partitionMap.isEmpty)
 partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
 or do we want to check if all the currentParttions are inactive and than 
 backoff? that would be expensive to check if all the partitions or active or 
 not in dowork.

What I think is a bit different and maybe simpler:

For FetchRequestBuilder, in the build() call its requestMap will be cleared 
after the fetch request is created, so we can just add another function in 
FetchRequestBuilder return boolean indicating if its request map is empty. With 
this we can get rid of the allPartitionsInactive flag.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31366/#review77674
---


On April 4, 2015, 3:48 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31366/
 ---
 
 (Updated April 4, 2015, 3:48 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1461
 https://issues.apache.org/jira/browse/KAFKA-1461
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
 
 
 KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
 
 
 KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 5d5cf5897cc37b3595f14bfe9d7cde43456bcc4b 
   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 
 
 Diff: https://reviews.apache.org/r/31366/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 31568: Patch for KAFKA-1989

2015-04-06 Thread Jun Rao


 On April 1, 2015, 6:46 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/utils/timer/Timer.scala, lines 44-45
  https://reviews.apache.org/r/31568/diff/2/?file=901427#file901427line44
 
  Could we use inLock() where applicable?
 
 Yasuhiro Matsuda wrote:
 I don't like to have a closure overhead here. Scala's closure creates a 
 new instance of closure object every time.

How much is the closure overhead? We use inLock() in quite a few other places. 
It would be good to keep the usage consistent. If it's too much overhead, 
perhaps we can remove all inLock() usage in a seperate jira?


- Jun


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review78444
---


On April 1, 2015, 8:50 p.m., Yasuhiro Matsuda wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31568/
 ---
 
 (Updated April 1, 2015, 8:50 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1989
 https://issues.apache.org/jira/browse/KAFKA-1989
 
 
 Repository: kafka
 
 
 Description
 ---
 
 new purgatory implementation
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/DelayedOperation.scala 
 e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
 7a37617395b9e4226853913b8989f42e7301de7c 
   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/31568/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Yasuhiro Matsuda
 




[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482134#comment-14482134
 ] 

Guozhang Wang commented on KAFKA-2082:
--

[~eapache] the problem you described is valid, that if a broker is isolated 
from other ZK / brokers its metadata will become stale. 

For replica manager, since it currently does not backoff while getting the 
not-leader exception its logs will get swamped, and KAFKA-1461 will fix this 
issue.

As for clients, they will periodically refresh their metadata from a random 
broker and when that metadata is fetched from other brokers, the clients will 
producer / fetch to / from the new leader.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. 

Re: [DISCUSS] New partitioning for better load balancing

2015-04-06 Thread Guozhang Wang
Gianmarco,

I browse through your paper (congrats for the ICDE publication BTW!), and
here are some questions / comments on the algorithm:

1. One motivation of enabling key-based partitioned in Kafka is to achieve
per-key ordering, i.e. with all messages with the same key sent to the same
partition their ordering is preserved. However with key-splitting that
seems to break this guarantee and now messages with the same key may be
sent to 2 (or generally speaking many) partitions.

2. As for the local load estimation, there is a second mapping from
partitions (workers in your paper) to broker hosts beside the mapping from
keys to partitions, and not all broker hosts maintain each of the
partitions. For example, there are 4 brokers, and broker-1/2 each takes one
of the two partitions of topic A, while broker-3/4 each takes one of the
two partitions of topic B, etc.

I am wondering if those two issues can be resolved with the PKG framework?

Guozhang

On Sun, Apr 5, 2015 at 12:19 AM, Gianmarco De Francisci Morales 
g...@apache.org wrote:

 Hi Jay,

 Thanks, that sounds a necessary step. I guess I expected something like
 that to be already there, at least internally.
 I created KAFKA-2092 to track the PKG integration.

 Cheers,

 --
 Gianmarco

 On 4 April 2015 at 23:50, Jay Kreps jay.kr...@gmail.com wrote:

  Hey guys,
 
  I think the first step here would be to expose a partitioner interface
 for
  the new producer that would make it easy to plug in these different
  strategies. I filed a JIRA for this:
  https://issues.apache.org/jira/browse/KAFKA-2091
 
  -Jay
 
  On Fri, Apr 3, 2015 at 9:36 AM, Harsha ka...@harsha.io wrote:
 
  Gianmarco,
   I am coming from storm community. I think PKG is a very
  interesting and we can provide an implementation of Partitioner for PKG.
  Can you open a JIRA for this.
 
  --
  Harsha
  Sent with Airmail
 
  On April 3, 2015 at 4:49:15 AM, Gianmarco De Francisci Morales (
  g...@apache.org) wrote:
 
  Hi,
 
  We have recently studied the problem of load balancing in distributed
  stream processing systems such as Samza [1].
  In particular, we focused on what happens when the key distribution of
 the
  stream is skewed when using key grouping.
  We developed a new stream partitioning scheme (which we call Partial Key
  Grouping). It achieves better load balancing than hashing while being
 more
  scalable than round robin in terms of memory.
 
  In the paper we show a number of mining algorithms that are easy to
  implement with partial key grouping, and whose performance can benefit
  from
  it. We think that it might also be useful for a larger class of
  algorithms.
 
  PKG has already been integrated in Storm [2], and I would like to be
 able
  to use it in Samza as well. As far as I understand, Kafka producers are
  the
  ones that decide how to partition the stream (or Kafka topic). Even
 after
  doing a bit of reading, I am still not sure if I should be writing this
  email here or on the Samza dev list. Anyway, my first guess is Kafka.
 
  I do not have experience with Kafka, however partial key grouping is
 very
  easy to implement: it requires just a few lines of code in Java when
  implemented as a custom grouping in Storm [3].
  I believe it should be very easy to integrate.
 
  For all these reasons, I believe it will be a nice addition to
  Kafka/Samza.
  If the community thinks it's a good idea, I will be happy to offer
 support
  in the porting.
 
  References:
  [1]
 
 
 https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
  [2] https://issues.apache.org/jira/browse/STORM-632
  [3] https://github.com/gdfm/partial-key-grouping
  --
  Gianmarco
 
 
 




-- 
-- Guozhang


[jira] [Resolved] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-06 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira resolved KAFKA-2098.
-
Resolution: Won't Fix

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2098) Gradle Wrapper Jar gone missing in 0.8.2.1

2015-04-06 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481979#comment-14481979
 ] 

Gwen Shapira commented on KAFKA-2098:
-

I believe it was deliberately removed, since we can't have binaries in our 
source distribution.

The solution is to install and run Gradle first.

 Gradle Wrapper Jar gone missing in 0.8.2.1
 --

 Key: KAFKA-2098
 URL: https://issues.apache.org/jira/browse/KAFKA-2098
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2.1
Reporter: Rekha Joshi

 ./gradlew idea
 Error: Could not find or load main class org.gradle.wrapper.GradleWrapperMain
 This was working in 0.8.2.Attaching patch.Thanks



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1809) Refactor brokers to allow listening on multiple ports and IPs

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482198#comment-14482198
 ] 

Sriharsha Chintalapani commented on KAFKA-1809:
---

[~gwenshap] I apologize for not commenting on the RB earlier. But the following 
naming is causing confusion for me.
We've BrokerEndPoint.scala but internally object called object BrokerEndpoint 
with a smallcase p and method createBrokerEndPoint  . If possible can we get 
these consistent in another JIRA.

 Refactor brokers to allow listening on multiple ports and IPs 
 --

 Key: KAFKA-1809
 URL: https://issues.apache.org/jira/browse/KAFKA-1809
 Project: Kafka
  Issue Type: Sub-task
  Components: security
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-1809-DOCs.V1.patch, KAFKA-1809.patch, 
 KAFKA-1809.v1.patch, KAFKA-1809.v2.patch, KAFKA-1809.v3.patch, 
 KAFKA-1809_2014-12-25_11:04:17.patch, KAFKA-1809_2014-12-27_12:03:35.patch, 
 KAFKA-1809_2014-12-27_12:22:56.patch, KAFKA-1809_2014-12-29_12:11:36.patch, 
 KAFKA-1809_2014-12-30_11:25:29.patch, KAFKA-1809_2014-12-30_18:48:46.patch, 
 KAFKA-1809_2015-01-05_14:23:57.patch, KAFKA-1809_2015-01-05_20:25:15.patch, 
 KAFKA-1809_2015-01-05_21:40:14.patch, KAFKA-1809_2015-01-06_11:46:22.patch, 
 KAFKA-1809_2015-01-13_18:16:21.patch, KAFKA-1809_2015-01-28_10:26:22.patch, 
 KAFKA-1809_2015-02-02_11:55:16.patch, KAFKA-1809_2015-02-03_10:45:31.patch, 
 KAFKA-1809_2015-02-03_10:46:42.patch, KAFKA-1809_2015-02-03_10:52:36.patch, 
 KAFKA-1809_2015-03-16_09:02:18.patch, KAFKA-1809_2015-03-16_09:40:49.patch, 
 KAFKA-1809_2015-03-27_15:04:10.patch, KAFKA-1809_2015-04-02_15:12:30.patch, 
 KAFKA-1809_2015-04-02_19:03:58.patch, KAFKA-1809_2015-04-04_22:00:13.patch


 The goal is to eventually support different security mechanisms on different 
 ports. 
 Currently brokers are defined as host+port pair, and this definition exists 
 throughout the code-base, therefore some refactoring is needed to support 
 multiple ports for a single broker.
 The detailed design is here: 
 https://cwiki.apache.org/confluence/display/KAFKA/Multiple+Listeners+for+Kafka+Brokers



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-06 Thread Guozhang Wang
As for 4, if we are going to reuse the purgatory class are we going to just
use the produce / fetch purgatory objects or we are going to create a new
throttle purgatory object? If we go with the first option then I think
Jun's concern is valid such that some produce / fetch requests will have
many keys and hence calling watch() will end up adding the request on
each one of the watch lists, and we have seen some issues before with this
scenario.

Guozhang

On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Hi Jay,

 2. At this time, the proposed response format changes are only for
 monitoring/informing clients. As Jun mentioned, we get instance level
 monitoring in this case since each instance that got throttled will have a
 metric confirming the same. Without client level monitoring for this, it's
 hard for application developers to find if they are being throttled since
 they will also have to be aware of all the brokers in the cluster. This is
 quite problematic for large clusters.

 It seems nice for app developers to not have to think about kafka internal
 metrics and only focus on the metrics exposed on their instances. Analogous
 to having client-sde request latency metrics. Basically, we want an easy
 way for clients to be aware if they are being throttled.

 4. For purgatory v delay queue, I think we are on the same page. I feel it
 is nicer to use the purgatory but I'm happy to use a DelayQueue if there
 are performance implications. I don't know enough about the current and
 Yasuhiro's new implementation to be sure one way or the other.

 Stepping back, I think these two things are the only remaining point of
 discussion within the current proposal. Any concerns if I started a voting
 thread on the proposal after the KIP discussion tomorrow? (assuming we
 reach consensus on these items)

 Thanks,
 Aditya
 
 From: Jay Kreps [jay.kr...@gmail.com]
 Sent: Saturday, April 04, 2015 1:36 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 Hey Aditya,

 2. For the return flag I'm not terribly particular. If we want to add it
 let's fully think through how it will be used. The only concern I have is
 adding to the protocol without really thinking through the use cases. So
 let's work out the APIs we want to add to the Java consumer and producer
 and the use cases for how clients will make use of these. For my part I
 actually don't see much use other than monitoring since it isn't an error
 condition to be at your quota. And if it is just monitoring I don't see a
 big enough difference between having the monitoring on the server-side
 versus in the clients to justify putting it in the protocol. But I think
 you guys may have other use cases in mind of how a client would make some
 use of this? Let's work that out. I also don't feel strongly about it--it
 wouldn't be *bad* to have the monitoring available on the client, just
 doesn't seem that much better.

 4. For the purgatory vs delay queue I think is arguably nicer to reuse the
 purgatory we just have to be ultra-conscious of efficiency. I think our
 goal is to turn quotas on across the board, so at LinkedIn that would mean
 potentially every request will need a small delay. I haven't worked out the
 efficiency implications of this choice, so as long as we do that I'm happy.

 -Jay

 On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  Some responses to Jay's points.
 
  1. Using commas - Cool.
 
  2. Adding return flag - I'm inclined to agree with Joel that this is good
  to have in the initial implementation.
 
  3. Config - +1. I'll remove it from the KIP. We can discuss this in
  parallel.
 
  4. Purgatory vs Delay queue - I feel that it is simpler to reuse the
  existing purgatories for both delayed produce and fetch requests. IIUC,
 all
  we need for quotas is a minWait parameter for DelayedOperation (or
  something equivalent) since there is already a max wait. The completion
  criteria can check if minWait time has elapsed before declaring the
  operation complete. For this to impact performance, a significant number
 of
  clients may need to exceed their quota at the same time and even then I'm
  not very clear on the scope of the impact. Two layers of delays might add
  complexity to the implementation which I'm hoping to avoid.
 
  Aditya
 
  
  From: Joel Koshy [jjkosh...@gmail.com]
  Sent: Friday, April 03, 2015 12:48 PM
  To: dev@kafka.apache.org
  Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas
 
  Aditya, thanks for the updated KIP and Jay/Jun thanks for the
  comments. Couple of comments in-line:
 
   2. I would advocate for adding the return flag when we next bump the
   request format version just to avoid proliferation. I agree this is a
  good
   thing to know about, but at the moment I don't think we have a very
 well
   flushed out idea of how the 

Re: Review Request 32866: Patch for KAFKA-1954

2015-04-06 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32866/#review79084
---


Thanks for the patch. A few comments below. Also, could you rebase?


core/src/test/scala/unit/kafka/admin/AdminTest.scala
https://reviews.apache.org/r/32866/#comment128180

Could we do foreach instead of map?



core/src/test/scala/unit/kafka/admin/AdminTest.scala
https://reviews.apache.org/r/32866/#comment128179

Could we do foreach instead of map? There are a few other places like this.



core/src/test/scala/unit/kafka/producer/ProducerTest.scala
https://reviews.apache.org/r/32866/#comment128181

indentation



core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala
https://reviews.apache.org/r/32866/#comment128186

indentation



core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
https://reviews.apache.org/r/32866/#comment128189

Is there a particular reason to remove the test on 
verifyNonDaemonThreadsStatus?



core/src/test/scala/unit/kafka/utils/TestUtils.scala
https://reviews.apache.org/r/32866/#comment128191

Is there a reason to change the default? In the most common case when we 
are just starting a single broker in a cluster, we want to enable controlled 
shutdown so that we know that it won't block.


- Jun Rao


On April 5, 2015, 8:54 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32866/
 ---
 
 (Updated April 5, 2015, 8:54 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1954
 https://issues.apache.org/jira/browse/KAFKA-1954
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1954. Speed up unit tests.
 
 
 Diffs
 -
 
   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
 99ac9230f63105a2942bec8fe2febde8a7e48b2e 
   core/src/test/scala/unit/kafka/admin/AdminTest.scala 
 0305f704b66a257d1ae732550793dbb2536e7ac1 
   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
 61cc6028dd7c9a2eec2d9cbe6947764655801eee 
   core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
 28e3122b71ca26c2fdf81649b0586ebc94e105fe 
   core/src/test/scala/unit/kafka/integration/RollingBounceTest.scala 
 130b20533d35499e0d4e0403d521b246610bd325 
   core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
 56b1b8c004b1719c45b1e7bc9580e3638e3438ac 
   core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala 
 7c87b81b12366eb04144ec4005a5fda0d0260eea 
   core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala 
 18361c13fd0b8cd60f45249507b76d53e6001c12 
   core/src/test/scala/unit/kafka/producer/ProducerTest.scala 
 a7ca14227edd8ca1a65122350fe4740d455f0c5c 
   core/src/test/scala/unit/kafka/server/AdvertiseBrokerTest.scala 
 b0112402e7813cf4f5247e90af87482c340bd000 
   core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala 
 3d4258f8b31c1f2c7e0351fb6df9a96234e11d1d 
   core/src/test/scala/unit/kafka/server/LogOffsetTest.scala 
 496bf0ddfc2e7872dc187ad85de5e845c6258604 
   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
 92e49df6964230753762aa359191a6ede337d3ac 
   core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala 
 a67cc37d542de0df0df24224b9d8db5472ecaf27 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 2bfaeb307094b0fba67a1976b7f87591ca36a45c 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 f4518255a2bcf3ae3ccb19b9e2ff0b70c613966a 
 
 Diff: https://reviews.apache.org/r/32866/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Updated] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-2099:
---
   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

Thanks for the patch. +1 and committed to trunk.

 BrokerEndPoint file, methods and object names should match
 --

 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-2099.patch


 [~harsha_ch] commented on KAFKA-1809:
 We've BrokerEndPoint.scala but internally object called object 
 BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If 
 possible can we get these consistent in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.

2015-04-06 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved KAFKA-2094.
-
   Resolution: Not A Problem
Fix Version/s: 0.8.2.0

This is the issue caused by not setting offset to consumers. I apologise for my 
mistake.

 Kafka does not create topic automatically after deleting the topic.
 ---

 Key: KAFKA-2094
 URL: https://issues.apache.org/jira/browse/KAFKA-2094
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: Ubuntu 14.04 LTS
Reporter: Hyukjin Kwon
Priority: Critical
 Fix For: 0.8.2.0


 After I create a topic and then remove it (and wait for enough time to 
 eventually delete it), it does not create a topic emitting errors even though 
  auto-create topic option is true. It works okay when I manually create a 
 topic after deleting it.
 Here is the command I run.
 ./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
 test
 ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 [2015-04-06 20:51:44,542] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 test
 [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
 for topic test -
 No partition metadata for topic test due to 
 kafka.common.LeaderNotAvailableException}] for topic [test]: class 
 kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
 [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
 for topic test-
 No partition metadata for topic testdue to 
 kafka.common.LeaderNotAvailableException}] for topic [test]: class 
 kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
 [2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, 
 partition due to: Failed to fetch topic metadata for topic: 
 test(kafka.producer.async.DefaultEventHandler)
 ^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 [2015-04-06 20:52:08,733] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 Server side, I got this error
 [2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when 
 processing fetch request for partition [test,0] offset 1687 from consumer 
 with correlation id 5. Possible cause: Request for offset 1687 but we only 
 have log segments in the range 0 to 107. (kafka.server.ReplicaManager)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-06 Thread Jay Kreps
Hey Aditya,

2. I kind of buy it, but I really like to understand the details of the use
case before we make protocol changes. What changes are you proposing in the
clients for monitoring and how would that be used?

-Jay

On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar 
aaurad...@linkedin.com.invalid wrote:

 Hi Jay,

 2. At this time, the proposed response format changes are only for
 monitoring/informing clients. As Jun mentioned, we get instance level
 monitoring in this case since each instance that got throttled will have a
 metric confirming the same. Without client level monitoring for this, it's
 hard for application developers to find if they are being throttled since
 they will also have to be aware of all the brokers in the cluster. This is
 quite problematic for large clusters.

 It seems nice for app developers to not have to think about kafka internal
 metrics and only focus on the metrics exposed on their instances. Analogous
 to having client-sde request latency metrics. Basically, we want an easy
 way for clients to be aware if they are being throttled.

 4. For purgatory v delay queue, I think we are on the same page. I feel it
 is nicer to use the purgatory but I'm happy to use a DelayQueue if there
 are performance implications. I don't know enough about the current and
 Yasuhiro's new implementation to be sure one way or the other.

 Stepping back, I think these two things are the only remaining point of
 discussion within the current proposal. Any concerns if I started a voting
 thread on the proposal after the KIP discussion tomorrow? (assuming we
 reach consensus on these items)

 Thanks,
 Aditya
 
 From: Jay Kreps [jay.kr...@gmail.com]
 Sent: Saturday, April 04, 2015 1:36 PM
 To: dev@kafka.apache.org
 Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas

 Hey Aditya,

 2. For the return flag I'm not terribly particular. If we want to add it
 let's fully think through how it will be used. The only concern I have is
 adding to the protocol without really thinking through the use cases. So
 let's work out the APIs we want to add to the Java consumer and producer
 and the use cases for how clients will make use of these. For my part I
 actually don't see much use other than monitoring since it isn't an error
 condition to be at your quota. And if it is just monitoring I don't see a
 big enough difference between having the monitoring on the server-side
 versus in the clients to justify putting it in the protocol. But I think
 you guys may have other use cases in mind of how a client would make some
 use of this? Let's work that out. I also don't feel strongly about it--it
 wouldn't be *bad* to have the monitoring available on the client, just
 doesn't seem that much better.

 4. For the purgatory vs delay queue I think is arguably nicer to reuse the
 purgatory we just have to be ultra-conscious of efficiency. I think our
 goal is to turn quotas on across the board, so at LinkedIn that would mean
 potentially every request will need a small delay. I haven't worked out the
 efficiency implications of this choice, so as long as we do that I'm happy.

 -Jay

 On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  Some responses to Jay's points.
 
  1. Using commas - Cool.
 
  2. Adding return flag - I'm inclined to agree with Joel that this is good
  to have in the initial implementation.
 
  3. Config - +1. I'll remove it from the KIP. We can discuss this in
  parallel.
 
  4. Purgatory vs Delay queue - I feel that it is simpler to reuse the
  existing purgatories for both delayed produce and fetch requests. IIUC,
 all
  we need for quotas is a minWait parameter for DelayedOperation (or
  something equivalent) since there is already a max wait. The completion
  criteria can check if minWait time has elapsed before declaring the
  operation complete. For this to impact performance, a significant number
 of
  clients may need to exceed their quota at the same time and even then I'm
  not very clear on the scope of the impact. Two layers of delays might add
  complexity to the implementation which I'm hoping to avoid.
 
  Aditya
 
  
  From: Joel Koshy [jjkosh...@gmail.com]
  Sent: Friday, April 03, 2015 12:48 PM
  To: dev@kafka.apache.org
  Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas
 
  Aditya, thanks for the updated KIP and Jay/Jun thanks for the
  comments. Couple of comments in-line:
 
   2. I would advocate for adding the return flag when we next bump the
   request format version just to avoid proliferation. I agree this is a
  good
   thing to know about, but at the moment I don't think we have a very
 well
   flushed out idea of how the client would actually make use of this
 info.
  I
 
  I'm somewhat inclined to having something appropriate off the bat -
  mainly because (i) clients really should know that they have been
  throttled (ii) a smart producer/consumer 

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-04-06 Thread Gwen Shapira
Here's a wild guess:

An app developer included a Kafka Producer in his app, and is not happy
with the throughput. He doesn't have visibility into the brokers since they
are owned by a different team. Obviously the first instinct of a developer
who knows that throttling exists is to blame throttling for any slowdown in
the app.
If he doesn't have a way to know from the responses whether or not his app
is throttled, he may end up calling Aditya at 4am asked Hey, is my app
throttled?.

I assume Aditya is trying to avoid this scenario.

On Mon, Apr 6, 2015 at 7:47 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Aditya,

 2. I kind of buy it, but I really like to understand the details of the use
 case before we make protocol changes. What changes are you proposing in the
 clients for monitoring and how would that be used?

 -Jay

 On Mon, Apr 6, 2015 at 10:36 AM, Aditya Auradkar 
 aaurad...@linkedin.com.invalid wrote:

  Hi Jay,
 
  2. At this time, the proposed response format changes are only for
  monitoring/informing clients. As Jun mentioned, we get instance level
  monitoring in this case since each instance that got throttled will have
 a
  metric confirming the same. Without client level monitoring for this,
 it's
  hard for application developers to find if they are being throttled since
  they will also have to be aware of all the brokers in the cluster. This
 is
  quite problematic for large clusters.
 
  It seems nice for app developers to not have to think about kafka
 internal
  metrics and only focus on the metrics exposed on their instances.
 Analogous
  to having client-sde request latency metrics. Basically, we want an easy
  way for clients to be aware if they are being throttled.
 
  4. For purgatory v delay queue, I think we are on the same page. I feel
 it
  is nicer to use the purgatory but I'm happy to use a DelayQueue if there
  are performance implications. I don't know enough about the current and
  Yasuhiro's new implementation to be sure one way or the other.
 
  Stepping back, I think these two things are the only remaining point of
  discussion within the current proposal. Any concerns if I started a
 voting
  thread on the proposal after the KIP discussion tomorrow? (assuming we
  reach consensus on these items)
 
  Thanks,
  Aditya
  
  From: Jay Kreps [jay.kr...@gmail.com]
  Sent: Saturday, April 04, 2015 1:36 PM
  To: dev@kafka.apache.org
  Subject: Re: [KIP-DISCUSSION] KIP-13 Quotas
 
  Hey Aditya,
 
  2. For the return flag I'm not terribly particular. If we want to add it
  let's fully think through how it will be used. The only concern I have is
  adding to the protocol without really thinking through the use cases. So
  let's work out the APIs we want to add to the Java consumer and producer
  and the use cases for how clients will make use of these. For my part I
  actually don't see much use other than monitoring since it isn't an error
  condition to be at your quota. And if it is just monitoring I don't see a
  big enough difference between having the monitoring on the server-side
  versus in the clients to justify putting it in the protocol. But I think
  you guys may have other use cases in mind of how a client would make some
  use of this? Let's work that out. I also don't feel strongly about it--it
  wouldn't be *bad* to have the monitoring available on the client, just
  doesn't seem that much better.
 
  4. For the purgatory vs delay queue I think is arguably nicer to reuse
 the
  purgatory we just have to be ultra-conscious of efficiency. I think our
  goal is to turn quotas on across the board, so at LinkedIn that would
 mean
  potentially every request will need a small delay. I haven't worked out
 the
  efficiency implications of this choice, so as long as we do that I'm
 happy.
 
  -Jay
 
  On Fri, Apr 3, 2015 at 1:10 PM, Aditya Auradkar 
  aaurad...@linkedin.com.invalid wrote:
 
   Some responses to Jay's points.
  
   1. Using commas - Cool.
  
   2. Adding return flag - I'm inclined to agree with Joel that this is
 good
   to have in the initial implementation.
  
   3. Config - +1. I'll remove it from the KIP. We can discuss this in
   parallel.
  
   4. Purgatory vs Delay queue - I feel that it is simpler to reuse the
   existing purgatories for both delayed produce and fetch requests. IIUC,
  all
   we need for quotas is a minWait parameter for DelayedOperation (or
   something equivalent) since there is already a max wait. The completion
   criteria can check if minWait time has elapsed before declaring the
   operation complete. For this to impact performance, a significant
 number
  of
   clients may need to exceed their quota at the same time and even then
 I'm
   not very clear on the scope of the impact. Two layers of delays might
 add
   complexity to the implementation which I'm hoping to avoid.
  
   Aditya
  
   
   From: Joel Koshy [jjkosh...@gmail.com]
   

[jira] [Commented] (KAFKA-2059) ZookeeperConsumerConnectorTest.testBasic trasient failure

2015-04-06 Thread Fangmin Lv (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482537#comment-14482537
 ] 

Fangmin Lv commented on KAFKA-2059:
---

Every run takes 30 seconds on my machine. I've run the test for a whole day, 
and haven't seen any failure. Can you help paste the failed log info so that we 
can check what happened? You can use this command to run the single testBasic 
test case until it fails:

$ R=0; while [[ $R -eq 0 ]]; do ./gradlew core:test --tests 
kafka.javaapi.consumer.ZookeeperConsumerConnectorTest.testBasic --rerun-tasks 
-i; R=$?; done | tee ./testBasic.log

 ZookeeperConsumerConnectorTest.testBasic trasient failure
 -

 Key: KAFKA-2059
 URL: https://issues.apache.org/jira/browse/KAFKA-2059
 Project: Kafka
  Issue Type: Sub-task
Reporter: Guozhang Wang
  Labels: newbie

 {code}
 kafka.javaapi.consumer.ZookeeperConsumerConnectorTest  testBasic FAILED
 kafka.common.InconsistentBrokerIdException: Configured brokerId 1 doesn't 
 match stored brokerId 0 in meta.properties
 at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:443)
 at kafka.server.KafkaServer.startup(KafkaServer.scala:116)
 at kafka.utils.TestUtils$.createServer(TestUtils.scala:126)
 at 
 kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57)
 at 
 kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:57)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at 
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.Iterator$class.foreach(Iterator.scala:727)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at 
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at 
 kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:57)
 at 
 kafka.javaapi.consumer.ZookeeperConsumerConnectorTest.setUp(ZookeeperConsumerConnectorTest.scala:41)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2099:

Status: Patch Available  (was: Open)

 BrokerEndPoint file, methods and object names should match
 --

 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2099.patch


 [~harsha_ch] commented on KAFKA-1809:
 We've BrokerEndPoint.scala but internally object called object 
 BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If 
 possible can we get these consistent in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-2099:

Attachment: KAFKA-2099.patch

 BrokerEndPoint file, methods and object names should match
 --

 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2099.patch


 [~harsha_ch] commented on KAFKA-1809:
 We've BrokerEndPoint.scala but internally object called object 
 BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If 
 possible can we get these consistent in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32913: Patch for KAFKA-2099

2015-04-06 Thread Gwen Shapira

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32913/
---

Review request for kafka.


Bugs: KAFKA-2099
https://issues.apache.org/jira/browse/KAFKA-2099


Repository: kafka


Description
---

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
KAFKA-2099


Diffs
-

  core/src/main/scala/kafka/admin/AdminUtils.scala 
0d3332e0e0bbb396128a69e179ee34087c221958 
  core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
d2a3d43c0ee7b0c502b652571f8d2d7181012f42 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
bf93632edb02d8b4ef089b12d62e584fd83425ae 
  core/src/main/scala/kafka/api/TopicMetadata.scala 
6de447d0d4d6e77c283a3504c171b6a97eacecf2 
  core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
776b604c63e64d75a854481e923f91be06ce1d31 
  core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
a91e3a67587e42554a0b79f255225895b07470a7 
  core/src/main/scala/kafka/client/ClientUtils.scala 
f08aaf274aef0b96598080a32e3465c5215f747f 
  core/src/main/scala/kafka/cluster/Broker.scala 
8e603b604fb13184331d30a319421c0d393d1728 
  core/src/main/scala/kafka/cluster/BrokerEndPoint.scala 
22dba18a89cebb5501345bc3a84160e054afbdc6 
  core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
6bb0d569db8b16eddd299bbb8a903d77cb6bdd10 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
cde4481c40a74cae1a2103d734df6df659ec7271 
  core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 
9c14428045740530cbf2d1457f0276a51fa402bd 
  core/src/main/scala/kafka/javaapi/TopicMetadata.scala 
ebbd589f9a842f6ab77963ac014905f77059a656 
  core/src/main/scala/kafka/producer/ProducerPool.scala 
07feb05460e8f0acc2eda722d840dc566a4c2ffd 
  core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
f8f93317531b08d5b13019820db0c34834a3d3c8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
1e26de25797f91fe94ef391e74ed2c04b4c10112 
  core/src/main/scala/kafka/server/MetadataCache.scala 
4460b42eb7beb857d4d9dd40b68d940335200cdc 
  core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
f0a2a5bb18a0dcd0496b54f07f6db04b05972b06 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
b2196c81bcc2c4090c8b4069456fc4d1a8a4634d 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
144a15e3a53d47fa972bef277c335da484842cf9 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
d1050b475161e64160c47f84d903da2075c2d401 
  core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
9a6804c69c0d6591cba7e2fd75a98d7fa313cca2 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
82b0e3355eec21d9fb962c6a0b73acd3f1d125b4 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
83910f3870985deaf165de52aee16695bd3a52a5 
  core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala 
ad58eedfa94caeda4cadff396ab83286c865764c 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
603cf76afd0f9cf7448d467ba948839652c873e4 
  core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
2169a5ccd9ebf5da9400cde97c352e8dfb5a9b21 

Diff: https://reviews.apache.org/r/32913/diff/


Testing
---


Thanks,

Gwen Shapira



[jira] [Commented] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482296#comment-14482296
 ] 

Gwen Shapira commented on KAFKA-2099:
-

Created reviewboard https://reviews.apache.org/r/32913/diff/
 against branch trunk

 BrokerEndPoint file, methods and object names should match
 --

 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2099.patch


 [~harsha_ch] commented on KAFKA-1809:
 We've BrokerEndPoint.scala but internally object called object 
 BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If 
 possible can we get these consistent in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Kafka confusion

2015-04-06 Thread Guozhang Wang
This is great survey for understanding which part still needs improvement,
thanks Gwen!

On Mon, Mar 30, 2015 at 12:27 PM, Gwen Shapira gshap...@cloudera.com
wrote:

 I was planning on doing a re-poll about a month after every release :)

 Maybe it can be part of the release activity.

 Gwen

 On Mon, Mar 30, 2015 at 10:26 AM, Jay Kreps j...@confluent.io wrote:
  Gwen had a really good blog post on confusing things about Kafka:
  http://ingest.tips/2015/03/26/what-is-confusing-about-kafka/
 
  A lot of these are in-flight now (e.g. consumer) or finished (e.g. delete
  topic, kind of, and non-sticky producer partitioning). Do we have bugs
 for
  the rest?
 
  It would be great to make this an ongoing thing we do to assess confusing
  configs, etc. Not sure the best way, to do this, but maybe just
 maintaining
  a special JIRA tag and periodically polling people again?
 
  -Jay




-- 
-- Guozhang


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482301#comment-14482301
 ] 

Evan Huus commented on KAFKA-2082:
--

OK, that all makes sense. Some follow-up questions:

1. When the cluster heals (at the end of the test) the replication continues to 
fail. Based on your explanation, I would have thought that the cluster healing 
would have triggered oncontrollerresignation, which would update the metadata 
on that node and fix the replication by pointing it to the new leader, but that 
does not appear to be the case?

2. The go client currently tries brokers in the order specified by the user on 
startup, not randomly. I can see the argument for a random choice (it's on our 
todo list) but even that only works around the problem - an unlucky producer 
could choose the bad broker enough times in a row to run out of retries and 
start dropping messages. I really think that an isolated broker should refuse 
to serve metadata requests - it knows that its information is likely stale, and 
forcing clients to try another broker is the only way for them to reliably get 
fresh metadata. Just like documentation: the only metadata worse than no 
metadata is incorrect metadata.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions 

[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482302#comment-14482302
 ] 

Evan Huus commented on KAFKA-2082:
--

OK, that all makes sense. Some follow-up questions:

1. When the cluster heals (at the end of the test) the replication continues to 
fail. Based on your explanation, I would have thought that the cluster healing 
would have triggered oncontrollerresignation, which would update the metadata 
on that node and fix the replication by pointing it to the new leader, but that 
does not appear to be the case?

2. The go client currently tries brokers in the order specified by the user on 
startup, not randomly. I can see the argument for a random choice (it's on our 
todo list) but even that only works around the problem - an unlucky producer 
could choose the bad broker enough times in a row to run out of retries and 
start dropping messages. I really think that an isolated broker should refuse 
to serve metadata requests - it knows that its information is likely stale, and 
forcing clients to try another broker is the only way for them to reliably get 
fresh metadata. Just like documentation: the only metadata worse than no 
metadata is incorrect metadata.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions 

[jira] [Commented] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482363#comment-14482363
 ] 

Gwen Shapira commented on KAFKA-2099:
-

Thanks for quick review [~junrao] and [~harsha_ch]

 BrokerEndPoint file, methods and object names should match
 --

 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Fix For: 0.8.3

 Attachments: KAFKA-2099.patch


 [~harsha_ch] commented on KAFKA-1809:
 We've BrokerEndPoint.scala but internally object called object 
 BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If 
 possible can we get these consistent in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482361#comment-14482361
 ] 

Sriharsha Chintalapani commented on KAFKA-2082:
---

[~guozhang]
1. This case is working fine. Test disables zk1 and zk3 and later enables zk3 . 
In this Broker3 has the latest metadata.
2. In this case should we stop the socket server or is there any other way to 
reject requests.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never cuts more than two connections at a time, so 
 zookeeper should always have quorum, and the topic (with three replicas) 
 should always be writable.
 Completely restarting the cluster via {{vagrant reload}} seems to put it back 
 

[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Evan Huus (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482421#comment-14482421
 ] 

Evan Huus commented on KAFKA-2082:
--

1. Then why does broker3 continue to spam logs? (I understand the *spam* is due 
to the lack of backoff, but if it gets the latest metadata at that point why 
does it continue to fail at all?)

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never cuts more than two connections at a time, so 
 zookeeper should always have quorum, and the topic (with three replicas) 
 should always be writable.
 Completely restarting the cluster via {{vagrant reload}} seems to put it back 
 into a sane state.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (KAFKA-2099) BrokerEndPoint file, methods and object names should match

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482316#comment-14482316
 ] 

Sriharsha Chintalapani commented on KAFKA-2099:
---

Thanks [~gwenshap] 

 BrokerEndPoint file, methods and object names should match
 --

 Key: KAFKA-2099
 URL: https://issues.apache.org/jira/browse/KAFKA-2099
 Project: Kafka
  Issue Type: Bug
Reporter: Gwen Shapira
Assignee: Gwen Shapira
 Attachments: KAFKA-2099.patch


 [~harsha_ch] commented on KAFKA-1809:
 We've BrokerEndPoint.scala but internally object called object 
 BrokerEndpoint with a smallcase p and method createBrokerEndPoint . If 
 possible can we get these consistent in another JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31850: Patch for KAFKA-1660

2015-04-06 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79063
---


Thanks for the patch.


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128121

[NOTE] is not a standard javadoc highlight is it? (I don't know.) If not, 
can you just use the standard strong for emphasis?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128123

We are doing this because - We do this because or This is done because



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128125

previous - outstanding



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128126

was not able - is unable
before timeout - before the specified timeout



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128134

If timeout  0, this method blocks as it tries to join the sender thread 
within the specified timeout.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128135

==

Also, is this completely true? It seems we may join (albeit without trying 
to send anything further) if called from a non-sender thread.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128138

We do this because the sender thread would otherwise try to join itself and 
block forever.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128142

When - If an

That said, this doc is a bit weird - if that is what the user is supposed 
to do then why can't this method take care of it (i.e., let the interrupted 
exception go)? It seems the right thing to do would be to just propagate and 
let the caller decide what to do.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128144

with timeout = {} ms



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128141

elegantly - gracefully



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128147

```log.warn(Overriding close timeout {} ms to 0 ms in order to prevent 
deadlock due to self-join. This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back., timeout)```



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128154

May want to log that we are proceeding to a force close



clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
https://reviews.apache.org/r/31850/#comment128155

until timeout is expired - within the specified timeout. If the close does 
not complete within the timeout, discard any pending messages and force close 
the producer.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
https://reviews.apache.org/r/31850/#comment128166

As you explained offline, the sender does not have access to record batches 
while requests are in flight, but it would be super if we can figure out a way 
to avoid leaking details of batch completion (which is currently exclusively in 
sender) into the RecordAccumulator.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
https://reviews.apache.org/r/31850/#comment128200

I don't think we should overload InterruptException for this. 
InterruptException is a wrapper around InterruptedException. i.e., after an 
InterruptException the thread should in fact have been interrupted - i.e., the 
interrupt status of the thread should be true (which is not the case here).



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
https://reviews.apache.org/r/31850/#comment128167

See comment above.



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
https://reviews.apache.org/r/31850/#comment128169

Can you revert this? i.e., I think the previous version with locally 
declared accums is cleaner.



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
https://reviews.apache.org/r/31850/#comment128171

Can you also add a test for calling close with a non-zero timeout in the 
callback?


- Joel Koshy


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
 
 ---
 This is 

Re: Review Request 31850: Patch for KAFKA-1660

2015-04-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79107
---



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
https://reviews.apache.org/r/31850/#comment128201

send - previously sent


- Guozhang Wang


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31850/
 ---
 
 (Updated March 27, 2015, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1660
 https://issues.apache.org/jira/browse/KAFKA-1660
 
 
 Repository: kafka
 
 
 Description
 ---
 
 A minor fix.
 
 
 Incorporated Guozhang's comments.
 
 
 Modify according to the latest conclusion.
 
 
 Patch for the finally passed KIP-15git status
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 ab263423ff1d33170effb71acdef3fc501fa072a 
   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
 6913090af03a455452b0b5c3df78f266126b3854 
   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
   
 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
 fee322fa0dd9704374db4a6964246a7d2918d3e4 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c2fdc23239bd2196cd912c3d121b591f21393eab 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
 3df450784592b894008e7507b2737f9bb07f7bd2 
 
 Diff: https://reviews.apache.org/r/31850/diff/
 
 
 Testing
 ---
 
 Unit tests passed.
 
 
 Thanks,
 
 Jiangjie Qin
 




Re: Review Request 32913: Patch for KAFKA-2099

2015-04-06 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32913/#review79099
---

Ship it!


Ship It!

- Sriharsha Chintalapani


On April 7, 2015, 12:35 a.m., Gwen Shapira wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/32913/
 ---
 
 (Updated April 7, 2015, 12:35 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2099
 https://issues.apache.org/jira/browse/KAFKA-2099
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into 
 KAFKA-2099
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/admin/AdminUtils.scala 
 0d3332e0e0bbb396128a69e179ee34087c221958 
   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
 d2a3d43c0ee7b0c502b652571f8d2d7181012f42 
   core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
 bf93632edb02d8b4ef089b12d62e584fd83425ae 
   core/src/main/scala/kafka/api/TopicMetadata.scala 
 6de447d0d4d6e77c283a3504c171b6a97eacecf2 
   core/src/main/scala/kafka/api/TopicMetadataResponse.scala 
 776b604c63e64d75a854481e923f91be06ce1d31 
   core/src/main/scala/kafka/api/UpdateMetadataRequest.scala 
 a91e3a67587e42554a0b79f255225895b07470a7 
   core/src/main/scala/kafka/client/ClientUtils.scala 
 f08aaf274aef0b96598080a32e3465c5215f747f 
   core/src/main/scala/kafka/cluster/Broker.scala 
 8e603b604fb13184331d30a319421c0d393d1728 
   core/src/main/scala/kafka/cluster/BrokerEndPoint.scala 
 22dba18a89cebb5501345bc3a84160e054afbdc6 
   core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
 6bb0d569db8b16eddd299bbb8a903d77cb6bdd10 
   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
 cde4481c40a74cae1a2103d734df6df659ec7271 
   core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 
 9c14428045740530cbf2d1457f0276a51fa402bd 
   core/src/main/scala/kafka/javaapi/TopicMetadata.scala 
 ebbd589f9a842f6ab77963ac014905f77059a656 
   core/src/main/scala/kafka/producer/ProducerPool.scala 
 07feb05460e8f0acc2eda722d840dc566a4c2ffd 
   core/src/main/scala/kafka/server/AbstractFetcherManager.scala 
 f8f93317531b08d5b13019820db0c34834a3d3c8 
   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
 1e26de25797f91fe94ef391e74ed2c04b4c10112 
   core/src/main/scala/kafka/server/MetadataCache.scala 
 4460b42eb7beb857d4d9dd40b68d940335200cdc 
   core/src/main/scala/kafka/server/ReplicaFetcherManager.scala 
 f0a2a5bb18a0dcd0496b54f07f6db04b05972b06 
   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
 b2196c81bcc2c4090c8b4069456fc4d1a8a4634d 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 144a15e3a53d47fa972bef277c335da484842cf9 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 d1050b475161e64160c47f84d903da2075c2d401 
   core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
 9a6804c69c0d6591cba7e2fd75a98d7fa313cca2 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 82b0e3355eec21d9fb962c6a0b73acd3f1d125b4 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 83910f3870985deaf165de52aee16695bd3a52a5 
   core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala 
 ad58eedfa94caeda4cadff396ab83286c865764c 
   core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
 603cf76afd0f9cf7448d467ba948839652c873e4 
   core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala 
 2169a5ccd9ebf5da9400cde97c352e8dfb5a9b21 
 
 Diff: https://reviews.apache.org/r/32913/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Gwen Shapira
 




[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482343#comment-14482343
 ] 

Guozhang Wang commented on KAFKA-2082:
--

1. when broker-1 registry re-appears on ZK, the new controller should be 
notified about this new broker and send the latest metadata to it. Could you 
check to see if that has ever happend?

2. I think I agree with you, that when the broker lost its zk session it should 
stop handling client requests for letting clients to refresh its metadata 
sooner. We can probably do that in KafkaHealthcheck's session expiration 
listener (handleStateChanged, which is currently no-op).

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and restoring zookeeper connections (all connections to 
 zookeeper are run through a simple proxy on the same vm to make this easy). 
 The majority of the time this works very well and does a good job exercising 
 our producer's retry and failover code. However, under certain patterns of 
 connection loss (the {{TEST_SEED}} in the instructions is important), kafka 
 gets confused. The test never 

Jenkins build is back to normal : KafkaPreCommit #60

2015-04-06 Thread Apache Jenkins Server
See https://builds.apache.org/job/KafkaPreCommit/60/changes



Is there a complete Kafka 0.8.* replication design document

2015-04-06 Thread Jason Guo (jguo2)
Hi,

 These days I have been focus on Kafka 0.8 replication design and found 
three replication design proposals from the wiki (according to the document, 
the V3 version is used in Kafka 0.8 release).
 But the v3 proposal is not complete and is inconsistent with the 
release.
Is there a complete Kafka 0.8 replication design document?

Here are part of my questions about Kafka 0.8 replication design.
#1   According to V3, /brokers/topics/[topic]/[partition_id]/leaderAndISR 
stores leader and ISR of a partition. However in 0.8.2 release there is not 
such a znode, instead, it use 
/brokers/topics/[topic]/partitions/[partition_id]/state to store the leader and 
ISR of a partition.
#2  In /brokers/topics/[topic], we can get all the ISR for all partitions in a 
certain topic, why we need 
/brokers/topics/[topic]/partitions/[partition_id]/state ?
#3   I didn't find /admin/partitions_add/[topic]/[partition_id] and 
/admin/partitions_remove/[topic]/[partition_id] during my adding and removing 
partitions with bin/kafka-topics.sh. Is this deprecated in the 0.8 release?
 #4  I found these two znode under /admin only will be automaticall 
removed after the action complete. /admin/reassign_partitions/, 
/admin/preferred_replica_election/. But why this znode (/admin/delete_topic/) 
will not be removed automatically?
 #5   What's the LeaderAndISRCommand in Senario A in V3? Is that same 
with LeaderAndISRRequest?
 #6   For Senario D, when a certain broker becomes Controller, it will 
send a LeaderAndISRRequest to brokers with a special flag INIT. For Senario C, 
when the broker receive LeaderAndISRRequest with INIT flag, it will delete all 
local partitions not in set_p. Why we need to delete all local partitions for 
Controller changing?
 #7   For Senario E. Broker startup.  The first step is read the 
replica assignment. Doesn't it need to add its id to /brokers/ids first?
 #8   Senario H. Add/remove partitions to an existing topic.  In my 
test, I didn't found such znode for PartitionRemove Path/PartitionAdd Path in 
Zookeeper. Is this approach for partition adding/deleting deprecated? In fact, 
I didn't observe any znode change during my adding/deleting partitions. So 
what's the process of Kafka partition adding/deleting?
 #9   Senario G. seems not consistent with the release one



Regards,
Jason







[jira] [Commented] (KAFKA-2082) Kafka Replication ends up in a bad state

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14482494#comment-14482494
 ] 

Sriharsha Chintalapani commented on KAFKA-2082:
---

[~eapache] I was testing mostly with the trunk  I just re-tried with kafka 
0.8.2.1 as you said broker 3 never recovers . 
But with the latest trunk I can see this in the broker 3 logs
[2015-04-07 02:56:13,485] INFO done re-registering broker 
(kafka.server.KafkaHealthcheck)
[2015-04-07 02:56:13,486] INFO Subscribing to /brokers/topics path to watch for 
new topics (kafka.server.KafkaHealthcheck)
[2015-04-07 02:56:13,500] INFO New leader is 9095 
(kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2015-04-07 02:56:13,561] INFO [KafkaApi-9093] current updateMetadataRequest 
Name:UpdateMetadataRequest;Version:1;Controller:909
5;ControllerEpoch:2;CorrelationId:7;ClientId:id_9095-host_localhost-port_29095;

Can you also give it a try with latest trunk and make sure you add 
reserved.broker.max.id=3 to server.properties with the trunk.

 Kafka Replication ends up in a bad state
 

 Key: KAFKA-2082
 URL: https://issues.apache.org/jira/browse/KAFKA-2082
 Project: Kafka
  Issue Type: Bug
  Components: replication
Affects Versions: 0.8.2.1
Reporter: Evan Huus
Assignee: Sriharsha Chintalapani
Priority: Critical
 Attachments: KAFKA-2082.patch


 While running integration tests for Sarama (the go client) we came across a 
 pattern of connection losses that reliably puts kafka into a bad state: 
 several of the brokers start spinning, chewing ~30% CPU and spamming the logs 
 with hundreds of thousands of lines like:
 {noformat}
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111094 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,070] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,1] failed due to Leader not local for partition 
 [many_partition,1] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,071] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111095 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,6] failed due to Leader not local for partition 
 [many_partition,6] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,21] failed due to Leader not local for partition 
 [many_partition,21] on broker 9093 (kafka.server.ReplicaManager)
 [2015-04-01 13:08:40,072] WARN [Replica Manager on Broker 9093]: Fetch 
 request with correlation id 111096 from client ReplicaFetcherThread-0-9093 on 
 partition [many_partition,26] failed due to Leader not local for partition 
 [many_partition,26] on broker 9093 (kafka.server.ReplicaManager)
 {noformat}
 This can be easily and reliably reproduced using the {{toxiproxy-final}} 
 branch of https://github.com/Shopify/sarama which includes a vagrant script 
 for provisioning the appropriate cluster: 
 - {{git clone https://github.com/Shopify/sarama.git}}
 - {{git checkout test-jira-kafka-2082}}
 - {{vagrant up}}
 - {{TEST_SEED=1427917826425719059 DEBUG=true go test -v}}
 After the test finishes (it fails because the cluster ends up in a bad 
 state), you can log into the cluster machine with {{vagrant ssh}} and inspect 
 the bad nodes. The vagrant script provisions five zookeepers and five brokers 
 in {{/opt/kafka-9091/}} through {{/opt/kafka-9095/}}.
 Additional context: the test produces continually to the cluster while 
 randomly cutting and 

Re: Review Request 31850: Patch for KAFKA-1660

2015-04-06 Thread Guozhang Wang


 On April 7, 2015, 1:28 a.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java,
   line 362
  https://reviews.apache.org/r/31850/diff/5/?file=908540#file908540line362
 
  As you explained offline, the sender does not have access to record 
  batches while requests are in flight, but it would be super if we can 
  figure out a way to avoid leaking details of batch completion (which is 
  currently exclusively in sender) into the RecordAccumulator.

Actually, since the incomplete batches list was introduced when we add the 
flush() call, we are sort of leaking it to accumulator already before this 
patch. And I feel it is not that bad to add this list into the accumulator.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79063
---


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/31850/
 ---
 
 (Updated March 27, 2015, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1660
 https://issues.apache.org/jira/browse/KAFKA-1660
 
 
 Repository: kafka
 
 
 Description
 ---
 
 A minor fix.
 
 
 Incorporated Guozhang's comments.
 
 
 Modify according to the latest conclusion.
 
 
 Patch for the finally passed KIP-15git status
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
 ab263423ff1d33170effb71acdef3fc501fa072a 
   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
 6913090af03a455452b0b5c3df78f266126b3854 
   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
   
 clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
 fee322fa0dd9704374db4a6964246a7d2918d3e4 
   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
 c2fdc23239bd2196cd912c3d121b591f21393eab 
   
 clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
 3df450784592b894008e7507b2737f9bb07f7bd2 
 
 Diff: https://reviews.apache.org/r/31850/diff/
 
 
 Testing
 ---
 
 Unit tests passed.
 
 
 Thanks,
 
 Jiangjie Qin
 




[jira] [Updated] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.

2015-04-06 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated KAFKA-2094:

Description: 
After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.


./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test



./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test



bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)





Server side, I got this error

[2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when 
processing fetch request for partition [lavender_0_1,0] offset 1687 from 
consumer with correlation id 5. Possible cause: Request for offset 1687 but we 
only have log segments in the range 0 to 107. (kafka.server.ReplicaManager)


  was:
After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.


./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test



./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test



bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)



 Kafka does not create topic automatically after deleting the topic.
 ---

 Key: KAFKA-2094
 URL: https://issues.apache.org/jira/browse/KAFKA-2094
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: Ubuntu 14.04 LTS
Reporter: Hyukjin Kwon
Priority: Critical

 After I create a topic and then remove it (and wait for enough time to 
 eventually delete it), it does not create a topic emitting errors even though 
  auto-create topic option is true. It works okay when I manually create a 
 topic after deleting it.
 Here is the command I run.
 ./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
 test
 ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 [2015-04-06 20:51:44,542] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 test
 [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
 for topic test -
 No partition metadata for topic test due to 
 kafka.common.LeaderNotAvailableException}] for topic [test]: class 
 

[jira] [Created] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.

2015-04-06 Thread Hyukjin Kwon (JIRA)
Hyukjin Kwon created KAFKA-2094:
---

 Summary: Kafka does not create topic automatically after deleting 
the topic.
 Key: KAFKA-2094
 URL: https://issues.apache.org/jira/browse/KAFKA-2094
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: Ubuntu 14.04 LTS
Reporter: Hyukjin Kwon
Priority: Critical


After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.

```
./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test
```

```
./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test
```

```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.

2015-04-06 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated KAFKA-2094:

Description: 
After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.


./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test



./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test



bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)


  was:
After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.

```
./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test
```

```
./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test
```

```
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
```


 Kafka does not create topic automatically after deleting the topic.
 ---

 Key: KAFKA-2094
 URL: https://issues.apache.org/jira/browse/KAFKA-2094
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: Ubuntu 14.04 LTS
Reporter: Hyukjin Kwon
Priority: Critical

 After I create a topic and then remove it (and wait for enough time to 
 eventually delete it), it does not create a topic emitting errors even though 
  auto-create topic option is true. It works okay when I manually create a 
 topic after deleting it.
 Here is the command I run.
 ./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
 test
 ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test
 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
 [2015-04-06 20:51:44,542] WARN Property topic is not valid 
 (kafka.utils.VerifiableProperties)
 test
 [2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
 for topic test -
 No partition metadata for topic test due to 
 kafka.common.LeaderNotAvailableException}] for topic [test]: class 
 kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
 [2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
 for topic test-
 No partition metadata for topic testdue to 
 kafka.common.LeaderNotAvailableException}] for topic [test]: class 
 kafka.common.LeaderNotAvailableException  

[jira] [Updated] (KAFKA-2094) Kafka does not create topic automatically after deleting the topic.

2015-04-06 Thread Hyukjin Kwon (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon updated KAFKA-2094:

Description: 
After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.


./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test



./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test



bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)





Server side, I got this error

[2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when 
processing fetch request for partition [test,0] offset 1687 from consumer with 
correlation id 5. Possible cause: Request for offset 1687 but we only have log 
segments in the range 0 to 107. (kafka.server.ReplicaManager)


  was:
After I create a topic and then remove it (and wait for enough time to 
eventually delete it), it does not create a topic emitting errors even though  
auto-create topic option is true. It works okay when I manually create a topic 
after deleting it.

Here is the command I run.


./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
test



./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test



bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:51:44,542] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)
test
[2015-04-06 20:51:49,896] WARN Error while fetching metadata [{TopicMetadata 
for topic test -
No partition metadata for topic test due to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,904] WARN Error while fetching metadata [{TopicMetadata 
for topic test-
No partition metadata for topic testdue to 
kafka.common.LeaderNotAvailableException}] for topic [test]: class 
kafka.common.LeaderNotAvailableException  (kafka.producer.BrokerPartitionInfo)
[2015-04-06 20:51:49,905] ERROR Failed to collate messages by topic, partition 
due to: Failed to fetch topic metadata for topic: 
test(kafka.producer.async.DefaultEventHandler)
^Csnowdrop@master:~/workspace/kafka/kafka_2.10-0.8.2.0$ 
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2015-04-06 20:52:08,733] WARN Property topic is not valid 
(kafka.utils.VerifiableProperties)





Server side, I got this error

[2015-04-06 21:41:08,491] ERROR [Replica Manager on Broker 0]: Error when 
processing fetch request for partition [lavender_0_1,0] offset 1687 from 
consumer with correlation id 5. Possible cause: Request for offset 1687 but we 
only have log segments in the range 0 to 107. (kafka.server.ReplicaManager)



 Kafka does not create topic automatically after deleting the topic.
 ---

 Key: KAFKA-2094
 URL: https://issues.apache.org/jira/browse/KAFKA-2094
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.2.0
 Environment: Ubuntu 14.04 LTS
Reporter: Hyukjin Kwon
Priority: Critical

 After I create a topic and then remove it (and wait for enough time to 
 eventually delete it), it does not create a topic emitting errors even though 
  auto-create topic option is true. It works okay when I manually create a 
 topic after deleting it.
 Here is the command I run.
 ./bin/kafka-topics.sh --list  --zookeeper 192.168.0.190:2181
 test
 ./bin/kafka-topics.sh --delete --zookeeper 192.168.0.190:2181 --topic test
 bin/kafka-console-producer.sh --broker-list localhost:9092 

Re: [jira] [Updated] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-04-06 Thread Tong Li

Run Rao,
Thanks for reviewing it again. What commands do you run to show unused
imports? When I run grandle --daemon test, it did not show these (there
were some warnings, but not unused imports). I thought that would have
shown these things.

Tong Li
OpenStack  Kafka Community Development
Building 501/B205
liton...@us.ibm.com



From:   Jun Rao (JIRA) j...@apache.org
To: dev@kafka.apache.org
Date:   04/06/2015 12:48 AM
Subject:[jira] [Updated] (KAFKA-1926) Replace kafka.utils.Utils with
o.a.k.common.utils.Utils




 [
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1926:
---
   Resolution: Fixed
Fix Version/s: 0.8.3
   Status: Resolved  (was: Patch Available)

Thanks for rebasing the latest patch. +1. Committed to trunk after removing
unused imports in a few files.

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
Assignee: Tong Li
  Labels: newbie, patch
 Fix For: 0.8.3

 Attachments: KAFKA-1926.patch, KAFKA-1926.patch,
KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926_2015-04-01_22:16:46.patch,
KAFKA-1926_2015-04-05_23:45:13.patch


 There is currently a lot of duplication between the Utils class in common
and the one in core.
 Our plan has been to deprecate duplicate code in the server and replace
it with the new common code.
 As such we should evaluate each method in the scala Utils and do one of
the following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general
purpose utility in active use that is not Kafka-specific. If we migrate it
we should really think about the API and make sure there is some test
coverage. A few things in there are kind of funky and we shouldn't just
blindly copy them over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will
hold any utilities that really need to make use of Scala features to be
convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)



Is there a complete Kafka 0.8.* replication design document

2015-04-06 Thread Jason Guo (jguo2)
Hi,

 These days I have been focus on Kafka 0.8 replication design and found 
three replication design proposals from the wiki (according to the document, 
the V3 version is used in Kafka 0.8 release).
 But the v3 proposal is not complete and is inconsistent with the 
release.
Is there a complete Kafka 0.8 replication design document?

Here are part of my questions about Kafka 0.8 replication design.
#1   According to V3, /brokers/topics/[topic]/[partition_id]/leaderAndISR 
stores leader and ISR of a partition. However in 0.8.2 release there is not 
such a znode, instead, it use 
/brokers/topics/[topic]/partitions/[partition_id]/state to store the leader and 
ISR of a partition.
#2  In /brokers/topics/[topic], we can get all the ISR for all partitions in a 
certain topic, why we need 
/brokers/topics/[topic]/partitions/[partition_id]/state ?
#3   I didn't find /admin/partitions_add/[topic]/[partition_id] and 
/admin/partitions_remove/[topic]/[partition_id] during my adding and removing 
partitions with bin/kafka-topics.sh. Is this deprecated in the 0.8 release?
 #4  I found these two znode under /admin only will be automaticall 
removed after the action complete. /admin/reassign_partitions/, 
/admin/preferred_replica_election/. But why this znode (/admin/delete_topic/) 
will not be removed automatically?
 #5   What's the LeaderAndISRCommand in Senario A in V3? Is that same 
with LeaderAndISRRequest?
 #6   For Senario D, when a certain broker becomes Controller, it will 
send a LeaderAndISRRequest to brokers with a special flag INIT. For Senario C, 
when the broker receive LeaderAndISRRequest with INIT flag, it will delete all 
local partitions not in set_p. Why we need to delete all local partitions for 
Controller changing?
 #7   For Senario E. Broker startup.  The first step is read the 
replica assignment. Doesn't it need to add its id to /brokers/ids first?
 #8   Senario H. Add/remove partitions to an existing topic.  In my 
test, I didn't found such znode for PartitionRemove Path/PartitionAdd Path in 
Zookeeper. Is this approach for partition adding/deleting deprecated? In fact, 
I didn't observe any znode change during my adding/deleting partitions. So 
what's the process of Kafka partition adding/deleting?
 #9   Senario G. seems not consistent with the release one



Regards,
Jason







[jira] [Commented] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-04-06 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14481262#comment-14481262
 ] 

Jun Rao commented on KAFKA-1926:


Typically, IDE (e.g., intellij) shows unused imports.

 Replace kafka.utils.Utils with o.a.k.common.utils.Utils
 ---

 Key: KAFKA-1926
 URL: https://issues.apache.org/jira/browse/KAFKA-1926
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.2.0
Reporter: Jay Kreps
Assignee: Tong Li
  Labels: newbie, patch
 Fix For: 0.8.3

 Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch, 
 KAFKA-1926.patch, KAFKA-1926_2015-04-01_22:16:46.patch, 
 KAFKA-1926_2015-04-05_23:45:13.patch


 There is currently a lot of duplication between the Utils class in common and 
 the one in core.
 Our plan has been to deprecate duplicate code in the server and replace it 
 with the new common code.
 As such we should evaluate each method in the scala Utils and do one of the 
 following:
 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
 utility in active use that is not Kafka-specific. If we migrate it we should 
 really think about the API and make sure there is some test coverage. A few 
 things in there are kind of funky and we shouldn't just blindly copy them 
 over.
 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
 any utilities that really need to make use of Scala features to be convenient.
 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Is there a complete Kafka 0.8.* replication design document

2015-04-06 Thread Jun Rao
Yes, the wiki is a bit old. You can find out more about replication in the
following links.
http://kafka.apache.org/documentation.html#replication
http://www.slideshare.net/junrao/kafka-replication-apachecon2013

#1, #2, #8. See the ZK layout in
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

#3. Adding partitions is now done by updating /brokers/topics/[topic]
directly.

#4. For deleting a topic, the ZK path
/admin/delete_topics/[topic_to_be_deleted]
is created and removed after the deletion completes.

#5  LeaderAndISRCommand should be the same as LeaderAndISRRequest.

#6 This is to take care of partitions that have been deleted while the
broker is down. The implementation doesn't rely on the special INIT flag.
Instead, it expects the very first LeaderAndISRRequest to include all valid
partitions. Local partitions not in that list will be deleted.

#7 Only the controller needs to read the replica assignment. The controller
can be started before the broker registers itself. This will be handled
through ZK watchers.

#9 The high level algorithm described there is still valid. For the
implementation, you can take a look at ReplicaManager.

Thanks,

Jun


On Mon, Apr 6, 2015 at 6:12 AM, Jason Guo (jguo2) jg...@cisco.com wrote:

 Hi,

  These days I have been focus on Kafka 0.8 replication design and
 found three replication design proposals from the wiki (according to the
 document, the V3 version is used in Kafka 0.8 release).
  But the v3 proposal is not complete and is inconsistent with the
 release.
 Is there a complete Kafka 0.8 replication design document?

 Here are part of my questions about Kafka 0.8 replication design.
 #1   According to V3, /brokers/topics/[topic]/[partition_id]/leaderAndISR
 stores leader and ISR of a partition. However in 0.8.2 release there is not
 such a znode, instead, it use
 /brokers/topics/[topic]/partitions/[partition_id]/state to store the leader
 and ISR of a partition.
 #2  In /brokers/topics/[topic], we can get all the ISR for all partitions
 in a certain topic, why we need
 /brokers/topics/[topic]/partitions/[partition_id]/state ?
 #3   I didn't find /admin/partitions_add/[topic]/[partition_id] and
 /admin/partitions_remove/[topic]/[partition_id] during my adding and
 removing partitions with bin/kafka-topics.sh. Is this deprecated in the 0.8
 release?
  #4  I found these two znode under /admin only will be
 automaticall removed after the action complete.
 /admin/reassign_partitions/, /admin/preferred_replica_election/. But why
 this znode (/admin/delete_topic/) will not be removed automatically?
  #5   What's the LeaderAndISRCommand in Senario A in V3? Is that
 same with LeaderAndISRRequest?
  #6   For Senario D, when a certain broker becomes Controller, it
 will send a LeaderAndISRRequest to brokers with a special flag INIT. For
 Senario C, when the broker receive LeaderAndISRRequest with INIT flag, it
 will delete all local partitions not in set_p. Why we need to delete all
 local partitions for Controller changing?
  #7   For Senario E. Broker startup.  The first step is read the
 replica assignment. Doesn't it need to add its id to /brokers/ids first?
  #8   Senario H. Add/remove partitions to an existing topic.  In
 my test, I didn't found such znode for PartitionRemove Path/PartitionAdd
 Path in Zookeeper. Is this approach for partition adding/deleting
 deprecated? In fact, I didn't observe any znode change during my
 adding/deleting partitions. So what's the process of Kafka partition
 adding/deleting?
  #9   Senario G. seems not consistent with the release one



 Regards,
 Jason








[jira] [Created] (KAFKA-2095) ZKConfig uses hard coded value zookeeper.session.timeout.ms

2015-04-06 Thread Sriharsha Chintalapani (JIRA)
Sriharsha Chintalapani created KAFKA-2095:
-

 Summary: ZKConfig uses hard coded value 
zookeeper.session.timeout.ms
 Key: KAFKA-2095
 URL: https://issues.apache.org/jira/browse/KAFKA-2095
 Project: Kafka
  Issue Type: Bug
Reporter: Sriharsha Chintalapani
Assignee: Sriharsha Chintalapani






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-2095) ZKConfig uses hard coded value zookeeper.session.timeout.ms

2015-04-06 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2095?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani resolved KAFKA-2095.
---
Resolution: Invalid

 ZKConfig uses hard coded value zookeeper.session.timeout.ms
 ---

 Key: KAFKA-2095
 URL: https://issues.apache.org/jira/browse/KAFKA-2095
 Project: Kafka
  Issue Type: Bug
Reporter: Sriharsha Chintalapani
Assignee: Sriharsha Chintalapani





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2096) Enable keepalive socket option for broker

2015-04-06 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-2096:
-

 Summary: Enable keepalive socket option for broker
 Key: KAFKA-2096
 URL: https://issues.apache.org/jira/browse/KAFKA-2096
 Project: Kafka
  Issue Type: Improvement
  Components: network
Affects Versions: 0.8.2.1
Reporter: Allen Wang
Assignee: Jun Rao
Priority: Critical


We run a Kafka 0.8.2.1 cluster in AWS with large number of producers ( 1). 
Also the number of producer instances scale up and down significantly on a 
daily basis.

The issue we found is that after 10 days, the open file descriptor count will 
approach the limit of 32K. An investigation of these open file descriptors 
shows that a significant portion of these are from client instances that are 
terminated during scaling down. Somehow they still show as ESTABLISHED in 
netstat. We suspect that the AWS firewall between the client and broker causes 
this issue.

We attempted to use keepalive socket option to reduce this socket leak on 
broker and it appears to be working. Specifically, we added this line to 
kafka.network.Acceptor.accept():

  socketChannel.socket().setKeepAlive(true)

It is confirmed during our experiment of this change that entries in netstat 
where the client instance is terminated were probed as configured in operating 
system. After configured number of probes, the OS determined that the peer is 
no longer alive and the entry is removed, possibly after an error in Kafka to 
read from the channel and closing the channel. Also, our experiment shows that 
after a few days, the instance was able to keep a stable low point of open file 
descriptor count, compared with other instances where the low point keeps 
increasing day to day.






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2096) Enable keepalive socket option for broker to prevent socket leak

2015-04-06 Thread Allen Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2096?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Allen Wang updated KAFKA-2096:
--
Summary: Enable keepalive socket option for broker to prevent socket leak  
(was: Enable keepalive socket option for broker)

 Enable keepalive socket option for broker to prevent socket leak
 

 Key: KAFKA-2096
 URL: https://issues.apache.org/jira/browse/KAFKA-2096
 Project: Kafka
  Issue Type: Improvement
  Components: network
Affects Versions: 0.8.2.1
Reporter: Allen Wang
Assignee: Jun Rao
Priority: Critical

 We run a Kafka 0.8.2.1 cluster in AWS with large number of producers ( 
 1). Also the number of producer instances scale up and down significantly 
 on a daily basis.
 The issue we found is that after 10 days, the open file descriptor count will 
 approach the limit of 32K. An investigation of these open file descriptors 
 shows that a significant portion of these are from client instances that are 
 terminated during scaling down. Somehow they still show as ESTABLISHED in 
 netstat. We suspect that the AWS firewall between the client and broker 
 causes this issue.
 We attempted to use keepalive socket option to reduce this socket leak on 
 broker and it appears to be working. Specifically, we added this line to 
 kafka.network.Acceptor.accept():
   socketChannel.socket().setKeepAlive(true)
 It is confirmed during our experiment of this change that entries in netstat 
 where the client instance is terminated were probed as configured in 
 operating system. After configured number of probes, the OS determined that 
 the peer is no longer alive and the entry is removed, possibly after an error 
 in Kafka to read from the channel and closing the channel. Also, our 
 experiment shows that after a few days, the instance was able to keep a 
 stable low point of open file descriptor count, compared with other instances 
 where the low point keeps increasing day to day.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)