[jira] [Updated] (KAFKA-693) Consumer rebalance fails if no leader available for a partition and stops all fetchers

2013-01-16 Thread Maxime Brugidou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxime Brugidou updated KAFKA-693:
--

Attachment: KAFKA-693-v2.patch

 Consumer rebalance fails if no leader available for a partition and stops all 
 fetchers
 --

 Key: KAFKA-693
 URL: https://issues.apache.org/jira/browse/KAFKA-693
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Maxime Brugidou
Assignee: Maxime Brugidou
 Attachments: KAFKA-693.patch, KAFKA-693-v2.patch, mirror_debug.log, 
 mirror.log


 I am currently experiencing this with the MirrorMaker but I assume it happens 
 for any rebalance. The symptoms are:
 I have replication factor of 1
 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker 
 --consumer.config mirror-consumer.properties  --producer.config 
 mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 
 --num.producers=1) with a broker down
 1.1 I set the refresh.leader.backoff.ms to 60 (10min) so that the 
 ConsumerFetcherManager doesn't retry to often to get the unavailable 
 partitions
 1.2 The rebalance starts at the init step and fails: Exception in thread 
 main kafka.common.ConsumerRebalanceFailedException: 
 KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries
 1.3 After the exception, everything stops (fetchers and queues)
 1.4 I attached the full logs (info  debug) for this case
 2. If i start the MirrorMaker with all the brokers up and then kill a broker
 2.1 The first rebalance is successful
 2.2 The consumer will handle correctly the broker down and stop the 
 associated ConsumerFetcherThread
 2.3 The refresh.leader.backoff.ms to 60 works correctly
 2.4 If something triggers a rebalance (new topic, partition reassignment...), 
 then we go back to 1., the rebalance fails and stops everything.
 I think the desired behavior is to consumer whatever is available, and try 
 later at some intervals. I would be glad to help on that issue although the 
 Consumer code seems a little tough to get on.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-693) Consumer rebalance fails if no leader available for a partition and stops all fetchers

2013-01-16 Thread Maxime Brugidou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13554852#comment-13554852
 ] 

Maxime Brugidou commented on KAFKA-693:
---

10. Created PartitionTopicInfo.InvalidOffset

11. In ConsumerFetcherManager.doWork(), I believe that addFetcher() is called 
before the partition is removed from noLeaderPartitionSet, if an exception is 
caught the partition will still be in the noLeaderPartitionSet, so I didn't 
change anything

12. done

13. done

 Consumer rebalance fails if no leader available for a partition and stops all 
 fetchers
 --

 Key: KAFKA-693
 URL: https://issues.apache.org/jira/browse/KAFKA-693
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Maxime Brugidou
Assignee: Maxime Brugidou
 Attachments: KAFKA-693.patch, KAFKA-693-v2.patch, mirror_debug.log, 
 mirror.log


 I am currently experiencing this with the MirrorMaker but I assume it happens 
 for any rebalance. The symptoms are:
 I have replication factor of 1
 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker 
 --consumer.config mirror-consumer.properties  --producer.config 
 mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 
 --num.producers=1) with a broker down
 1.1 I set the refresh.leader.backoff.ms to 60 (10min) so that the 
 ConsumerFetcherManager doesn't retry to often to get the unavailable 
 partitions
 1.2 The rebalance starts at the init step and fails: Exception in thread 
 main kafka.common.ConsumerRebalanceFailedException: 
 KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries
 1.3 After the exception, everything stops (fetchers and queues)
 1.4 I attached the full logs (info  debug) for this case
 2. If i start the MirrorMaker with all the brokers up and then kill a broker
 2.1 The first rebalance is successful
 2.2 The consumer will handle correctly the broker down and stop the 
 associated ConsumerFetcherThread
 2.3 The refresh.leader.backoff.ms to 60 works correctly
 2.4 If something triggers a rebalance (new topic, partition reassignment...), 
 then we go back to 1., the rebalance fails and stops everything.
 I think the desired behavior is to consumer whatever is available, and try 
 later at some intervals. I would be glad to help on that issue although the 
 Consumer code seems a little tough to get on.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-706) broker appears to be encoding ProduceResponse, but never sending it

2013-01-16 Thread ben fleis (JIRA)
ben fleis created KAFKA-706:
---

 Summary: broker appears to be encoding ProduceResponse, but never 
sending it
 Key: KAFKA-706
 URL: https://issues.apache.org/jira/browse/KAFKA-706
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
 Environment: reproduced on both Mac OS and RH linux, via private 
node.js client
Reporter: ben fleis


By all appearances, I seem to be able to convince a broker to periodically 
encode, but never transmit, a ProduceResponse.  Unfortunately my client is 
proprietary, but I will share it with Neha via LI channels.  But I will 
describe what's going on in the hopes that there's another trivial way to 
reproduce it.  (I did search through JIRA, and haven't found anything that 
looks like this.)

I am running a single instance zookeeper and single broker.  I have a client 
that generates configurable amounts of data, tracking what is produced (both 
sent and ACK'd), and what is consumed.  I was noticing that when using high 
transfer rates via high frequency single messages, my unack'd queue appeared to 
be getting continuously larger.  So, I outfitted my client to log more 
information about correlation ids at various stages, and modified the kafka 
ProducerRequest/ProducerResponse to log (de)serialization of the same.  I then 
used tcpdump to intercept all communications between my client and the broker.  
Finally, I configured my client to generate 1 message per ~10ms, each payload 
being approximately 33 bytes; requestAckTimeout was set to 2000ms, and 
requestAcksRequired was set to 1.  I used 10ms as I found that 5ms or less 
caused my unacked queue to build up due to system speed -- it simply couldn't 
keep up.  10ms keeps the load high, but just manageable.  YMMV with that param. 
 All of this is done on a single host, over loopback.  I ran it on both my 
airbook, and a well setup RH linux box, and found the same problem.

At startup, my system logged expired requests - meaning reqs that were sent, 
but for which no ACK, positive or negative, was seen from the broker, within 
1.25x the requestAckTimeout (ie, 2500ms).  I would let it settle until the 
unacked queue was stable at or around 0.

What I found is this: ACKs are normally generated within milliseconds.  This 
was demonstrated by my logging added to the scala ProducerRe* classes, and they 
are normally seen quickly by my client.  But when the actual error occurs, 
namely that a request is ignored, the ProducerResponse class *does* encode the 
correct correlationId; however, a response containing that ID is never sent 
over the network, as evidenced by my tcpdump traces.  In my experience this 
would take anywhere from 3-15 seconds to occur after the system was warm, 
meaning that it's 1 out of several hundred on average that shows the condition.

While I can't attach my client code, I could attach logs; but since my 
intention is to share the code with LI people, I will wait to see if that's 
useful here.


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-649) Cleanup log4j logging

2013-01-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555150#comment-13555150
 ] 

Jun Rao commented on KAFKA-649:
---

I can take a look at this.

 Cleanup log4j logging
 -

 Key: KAFKA-649
 URL: https://issues.apache.org/jira/browse/KAFKA-649
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Jun Rao
Priority: Blocker

 Review the logs and do the following:
 1. Fix confusing or duplicative messages
 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
 It would also be nice to add a log4j logger for the request logging (i.e. the 
 access log) and another for the controller state change log, since these 
 really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-649) Cleanup log4j logging

2013-01-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-649:
-

Assignee: Jun Rao

 Cleanup log4j logging
 -

 Key: KAFKA-649
 URL: https://issues.apache.org/jira/browse/KAFKA-649
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Jun Rao
Priority: Blocker

 Review the logs and do the following:
 1. Fix confusing or duplicative messages
 2. Assess that messages are at the right level (TRACE/DEBUG/INFO/WARN/ERROR)
 It would also be nice to add a log4j logger for the request logging (i.e. the 
 access log) and another for the controller state change log, since these 
 really have their own use cases.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-631) Implement log compaction

2013-01-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555154#comment-13555154
 ] 

Jun Rao commented on KAFKA-631:
---

Thanks for the patch. Do you know the revision of trunk on which this patch 
will apply? I can take a look before you rebase.

 Implement log compaction
 

 Key: KAFKA-631
 URL: https://issues.apache.org/jira/browse/KAFKA-631
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-631-v1.patch


 Currently Kafka has only one way to bound the space of the log, namely by 
 deleting old segments. The policy that controls which segments are deleted 
 can be configured based either on the number of bytes to retain or the age of 
 the messages. This makes sense for event or log data which has no notion of 
 primary key. However lots of data has a primary key and consists of updates 
 by primary key. For this data it would be nice to be able to ensure that the 
 log contained at least the last version of every key.
 As an example, say that the Kafka topic contains a sequence of User Account 
 messages, each capturing the current state of a given user account. Rather 
 than simply discarding old segments, since the set of user accounts is 
 finite, it might make more sense to delete individual records that have been 
 made obsolete by a more recent update for the same key. This would ensure 
 that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-702) Deadlock between request handler/processor threads

2013-01-16 Thread Sriram Subramanian (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555219#comment-13555219
 ] 

Sriram Subramanian commented on KAFKA-702:
--

I would like to add my thoughts to this. 

1. Load shedding arbitrary clients will bound the memory but would essentially 
cause the system to fail most of the requests and not recover from it till the 
load goes down. We have quite a few inter-dependencies between requests 
(producer depends on replica requests, replica depends on produce requests and 
consumer requests depend on produce requests) and dropping requests would 
essentially cause the requests depending on it to stay longer in the purgatory 
and fail. 

2. Having client quotas may not work because we do not have one faulty client. 
Each client can at most have only one request.

Few improvements might reduce the failure scenarios

1. Currently replica request wait on a hard limit (min bytes). Instead they 
could be made to return earlier to free the purgatory and accept more requests 
during high load scenarios.
2. Direct consumers to read from other replicas in the isr that have lesser 
load. This is going to be harder.



 Deadlock between request handler/processor threads
 --

 Key: KAFKA-702
 URL: https://issues.apache.org/jira/browse/KAFKA-702
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: KAFKA-702-v1.patch


 We have seen this a couple of times in the past few days in a test cluster. 
 The request handler and processor threads deadlock on the request/response 
 queues bringing the server to a halt
 kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting 
 on condition [0x7f46f698e000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd2698 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
 at kafka.network.Processor.read(SocketServer.scala:321)
 at kafka.network.Processor.run(SocketServer.scala:231)
 at java.lang.Thread.run(Thread.java:619)
 kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 
 waiting on condition [0x7f46f5b8]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd6348 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
 at java.lang.Thread.run(Thread.java:619)
 This is because there is a cycle in the wait-for graph of processor threads 
 and request handler threads. If the request handling slows down on a busy 
 server, the request queue fills up. All processor threads quickly block on 
 adding incoming requests to the request queue. Due to this, those threads do 
 not processes responses filling up their response queues. At this moment, the 
 request handler threads start blocking on adding responses to the respective 
 response queues. This can lead to a deadlock where every thread is holding a 
 lock on one queue and asking a lock for the other queue. This brings the 
 server to a halt where it accepts connections but every request gets timed 
 out.
 One way to resolve this is by breaking the cycle in the wait-for graph of the 
 request handler and processor threads. Instead of having the processor 
 threads dispatching the responses, we can have one or more dedicated response 
 handler threads that dequeue responses from the queue and write those on the 
 socket. One downside of this approach is that now access to the selector will 
 have to be synchronized.

--
This message is 

[jira] [Commented] (KAFKA-693) Consumer rebalance fails if no leader available for a partition and stops all fetchers

2013-01-16 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555251#comment-13555251
 ] 

Jun Rao commented on KAFKA-693:
---

Thanks for patch v2. Looks good. Some minor comments:

11. I think we still need to change ConsumerFetcherManager.doWork(): Currently, 
if we hit an exception when calling addFetcher(), we won't remove any partition 
from noLeaderPartitionSet, include those that have been processed successfully. 
We can change it so that we remove each partition from noLeaderPartitionSet 
after calling addFetcher() successfully.

20. AbstractFetcherThread: Instead of doing initialOffset  0, could we define 
an isOffsetInvalid() method?


 Consumer rebalance fails if no leader available for a partition and stops all 
 fetchers
 --

 Key: KAFKA-693
 URL: https://issues.apache.org/jira/browse/KAFKA-693
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Maxime Brugidou
Assignee: Maxime Brugidou
 Attachments: KAFKA-693.patch, KAFKA-693-v2.patch, mirror_debug.log, 
 mirror.log


 I am currently experiencing this with the MirrorMaker but I assume it happens 
 for any rebalance. The symptoms are:
 I have replication factor of 1
 1. If i start the MirrorMaker (bin/kafka-run-class.sh kafka.tools.MirrorMaker 
 --consumer.config mirror-consumer.properties  --producer.config 
 mirror-producer.properties --blacklist 'xdummyx' --num.streams=1 
 --num.producers=1) with a broker down
 1.1 I set the refresh.leader.backoff.ms to 60 (10min) so that the 
 ConsumerFetcherManager doesn't retry to often to get the unavailable 
 partitions
 1.2 The rebalance starts at the init step and fails: Exception in thread 
 main kafka.common.ConsumerRebalanceFailedException: 
 KafkaMirror_mirror-01-1357893495345-fac86b15 can't rebalance after 4 retries
 1.3 After the exception, everything stops (fetchers and queues)
 1.4 I attached the full logs (info  debug) for this case
 2. If i start the MirrorMaker with all the brokers up and then kill a broker
 2.1 The first rebalance is successful
 2.2 The consumer will handle correctly the broker down and stop the 
 associated ConsumerFetcherThread
 2.3 The refresh.leader.backoff.ms to 60 works correctly
 2.4 If something triggers a rebalance (new topic, partition reassignment...), 
 then we go back to 1., the rebalance fails and stops everything.
 I think the desired behavior is to consumer whatever is available, and try 
 later at some intervals. I would be glad to help on that issue although the 
 Consumer code seems a little tough to get on.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-702) Deadlock between request handler/processor threads

2013-01-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-702.
-

Resolution: Fixed

Checked this in to proceed with deployment

 Deadlock between request handler/processor threads
 --

 Key: KAFKA-702
 URL: https://issues.apache.org/jira/browse/KAFKA-702
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: KAFKA-702-v1.patch


 We have seen this a couple of times in the past few days in a test cluster. 
 The request handler and processor threads deadlock on the request/response 
 queues bringing the server to a halt
 kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting 
 on condition [0x7f46f698e000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd2698 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
 at kafka.network.Processor.read(SocketServer.scala:321)
 at kafka.network.Processor.run(SocketServer.scala:231)
 at java.lang.Thread.run(Thread.java:619)
 kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 
 waiting on condition [0x7f46f5b8]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd6348 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
 at java.lang.Thread.run(Thread.java:619)
 This is because there is a cycle in the wait-for graph of processor threads 
 and request handler threads. If the request handling slows down on a busy 
 server, the request queue fills up. All processor threads quickly block on 
 adding incoming requests to the request queue. Due to this, those threads do 
 not processes responses filling up their response queues. At this moment, the 
 request handler threads start blocking on adding responses to the respective 
 response queues. This can lead to a deadlock where every thread is holding a 
 lock on one queue and asking a lock for the other queue. This brings the 
 server to a halt where it accepts connections but every request gets timed 
 out.
 One way to resolve this is by breaking the cycle in the wait-for graph of the 
 request handler and processor threads. Instead of having the processor 
 threads dispatching the responses, we can have one or more dedicated response 
 handler threads that dequeue responses from the queue and write those on the 
 socket. One downside of this approach is that now access to the selector will 
 have to be synchronized.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-702) Deadlock between request handler/processor threads

2013-01-16 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555350#comment-13555350
 ] 

Neha Narkhede commented on KAFKA-702:
-

 2. Having client quotas may not work because we do not have one faulty 
 client. Each client can at most have only one request.

We understand that. Client quotas are better done probably in terms of 
expirations per second. Basically, if you setup your partitions with a large 
replication factor (let's say 6) and set the num.acks in your producer to -1. 
At the same time, if you set your timeout too low, all requests will timeout 
and expire. This will allow your client to send many requests that all timeout.

Load shedding needs more thought. It is not as straightforward and when we 
scope it out, we will need to obviously keep in mind consequences of load 
shedding.

 Deadlock between request handler/processor threads
 --

 Key: KAFKA-702
 URL: https://issues.apache.org/jira/browse/KAFKA-702
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: KAFKA-702-v1.patch


 We have seen this a couple of times in the past few days in a test cluster. 
 The request handler and processor threads deadlock on the request/response 
 queues bringing the server to a halt
 kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting 
 on condition [0x7f46f698e000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd2698 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
 at kafka.network.Processor.read(SocketServer.scala:321)
 at kafka.network.Processor.run(SocketServer.scala:231)
 at java.lang.Thread.run(Thread.java:619)
 kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 
 waiting on condition [0x7f46f5b8]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd6348 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
 at java.lang.Thread.run(Thread.java:619)
 This is because there is a cycle in the wait-for graph of processor threads 
 and request handler threads. If the request handling slows down on a busy 
 server, the request queue fills up. All processor threads quickly block on 
 adding incoming requests to the request queue. Due to this, those threads do 
 not processes responses filling up their response queues. At this moment, the 
 request handler threads start blocking on adding responses to the respective 
 response queues. This can lead to a deadlock where every thread is holding a 
 lock on one queue and asking a lock for the other queue. This brings the 
 server to a halt where it accepts connections but every request gets timed 
 out.
 One way to resolve this is by breaking the cycle in the wait-for graph of the 
 request handler and processor threads. Instead of having the processor 
 threads dispatching the responses, we can have one or more dedicated response 
 handler threads that dequeue responses from the queue and write those on the 
 socket. One downside of this approach is that now access to the selector will 
 have to be synchronized.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-705) Controlled shutdown doesn't seem to work on more than one broker in a cluster

2013-01-16 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555352#comment-13555352
 ] 

Joel Koshy commented on KAFKA-705:
--

I set up a local cluster of three brokers and created a bunch of topics, 
replication factor = 2. I was able to do multiple iterations of rolling bounces 
without
issue. Since this was local, I did not use your py script as it kills pid's 
returned by ps.

Would you by any chance be able to provide a scenario to reproduce this 
locally? That said, I believe John Fung also tried to reproduce this in a
distributed environment but was unable to do so; so I'll probably need to take 
a look at logs in your environment.


 Controlled shutdown doesn't seem to work on more than one broker in a cluster
 -

 Key: KAFKA-705
 URL: https://issues.apache.org/jira/browse/KAFKA-705
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Joel Koshy
Priority: Critical
  Labels: bugs
 Attachments: shutdown_brokers_eat.py, shutdown-command


 I wrote a script (attached here) to basically round robin through the brokers 
 in a cluster doing the following 2 operations on each of them -
 1. Send the controlled shutdown admin command. If it succeeds
 2. Restart the broker
 What I've observed is that only one broker is able to finish the above 
 successfully the first time around. For the rest of the iterations, no broker 
 is able to shutdown using the admin command and every single time it fails 
 with the error message stating the same number of leaders on every broker. 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-631) Implement log compaction

2013-01-16 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555470#comment-13555470
 ] 

Jay Kreps commented on KAFKA-631:
-

362eba981de40a69ae509a291649531ead6f6aee

 Implement log compaction
 

 Key: KAFKA-631
 URL: https://issues.apache.org/jira/browse/KAFKA-631
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 0.8.1
Reporter: Jay Kreps
Assignee: Jay Kreps
 Attachments: KAFKA-631-v1.patch


 Currently Kafka has only one way to bound the space of the log, namely by 
 deleting old segments. The policy that controls which segments are deleted 
 can be configured based either on the number of bytes to retain or the age of 
 the messages. This makes sense for event or log data which has no notion of 
 primary key. However lots of data has a primary key and consists of updates 
 by primary key. For this data it would be nice to be able to ensure that the 
 log contained at least the last version of every key.
 As an example, say that the Kafka topic contains a sequence of User Account 
 messages, each capturing the current state of a given user account. Rather 
 than simply discarding old segments, since the set of user accounts is 
 finite, it might make more sense to delete individual records that have been 
 made obsolete by a more recent update for the same key. This would ensure 
 that the topic contained at least the current state of each record.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Closed] (KAFKA-702) Deadlock between request handler/processor threads

2013-01-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede closed KAFKA-702.
---


 Deadlock between request handler/processor threads
 --

 Key: KAFKA-702
 URL: https://issues.apache.org/jira/browse/KAFKA-702
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 0.8
Reporter: Joel Koshy
Assignee: Jay Kreps
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: KAFKA-702-v1.patch


 We have seen this a couple of times in the past few days in a test cluster. 
 The request handler and processor threads deadlock on the request/response 
 queues bringing the server to a halt
 kafka-processor-10251-7 prio=10 tid=0x7f4a0c3c9800 nid=0x4c39 waiting 
 on condition [0x7f46f698e000]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd2698 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendRequest(RequestChannel.scala:107)
 at kafka.network.Processor.read(SocketServer.scala:321)
 at kafka.network.Processor.run(SocketServer.scala:231)
 at java.lang.Thread.run(Thread.java:619)
 kafka-request-handler-7 daemon prio=10 tid=0x7f4a0c57f000 nid=0x4c47 
 waiting on condition [0x7f46f5b8]
java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for  0x7f48c9dd6348 (a 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
 at 
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
 at 
 java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:252)
 at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:112)
 at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:198)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:58)
 at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
 at java.lang.Thread.run(Thread.java:619)
 This is because there is a cycle in the wait-for graph of processor threads 
 and request handler threads. If the request handling slows down on a busy 
 server, the request queue fills up. All processor threads quickly block on 
 adding incoming requests to the request queue. Due to this, those threads do 
 not processes responses filling up their response queues. At this moment, the 
 request handler threads start blocking on adding responses to the respective 
 response queues. This can lead to a deadlock where every thread is holding a 
 lock on one queue and asking a lock for the other queue. This brings the 
 server to a halt where it accepts connections but every request gets timed 
 out.
 One way to resolve this is by breaking the cycle in the wait-for graph of the 
 request handler and processor threads. Instead of having the processor 
 threads dispatching the responses, we can have one or more dedicated response 
 handler threads that dequeue responses from the queue and write those on the 
 socket. One downside of this approach is that now access to the selector will 
 have to be synchronized.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-691:
--

Attachment: kafka-691_extra.patch

The last patch introduced a bug. DefaultEventHander.getPartition() is expected 
to return the index of the partitionList, instead of the actual partition id. 
Attach a patch that fixes the issue.

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Maxime Brugidou
 Fix For: 0.8

 Attachments: kafka-691_extra.patch, KAFKA-691-v1.patch, 
 KAFKA-691-v2.patch


 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-691) Fault tolerance broken with replication factor 1

2013-01-16 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-691?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-691:
--

Attachment: kafka-691_extra.patch

Attach the right patch (kafka-691_extra.patch).

 Fault tolerance broken with replication factor 1
 

 Key: KAFKA-691
 URL: https://issues.apache.org/jira/browse/KAFKA-691
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8
Reporter: Jay Kreps
Assignee: Maxime Brugidou
 Fix For: 0.8

 Attachments: kafka-691_extra.patch, KAFKA-691-v1.patch, 
 KAFKA-691-v2.patch


 In 0.7 if a partition was down we would just send the message elsewhere. This 
 meant that the partitioning was really more of a stickiness then a hard 
 guarantee. This made it impossible to depend on it for partitioned, stateful 
 processing.
 In 0.8 when running with replication this should not be a problem generally 
 as the partitions are now highly available and fail over to other replicas. 
 However in the case of replication factor = 1 no longer really works for most 
 cases as now a dead broker will give errors for that broker.
 I am not sure of the best fix. Intuitively I think this is something that 
 should be handled by the Partitioner interface. However currently the 
 partitioner has no knowledge of which nodes are available. So you could use a 
 random partitioner, but that would keep going back to the down node.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-708) ISR becomes empty while marking a partition offline

2013-01-16 Thread Swapnil Ghike (JIRA)
Swapnil Ghike created KAFKA-708:
---

 Summary: ISR becomes empty while marking a partition offline
 Key: KAFKA-708
 URL: https://issues.apache.org/jira/browse/KAFKA-708
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Swapnil Ghike
Priority: Blocker
 Fix For: 0.8


Attached state change log shows that ISR becomes empty when a partition is 
being marked as offline.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Assigned] (KAFKA-708) ISR becomes empty while marking a partition offline

2013-01-16 Thread Swapnil Ghike (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-708?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Swapnil Ghike reassigned KAFKA-708:
---

Assignee: Neha Narkhede

 ISR becomes empty while marking a partition offline
 ---

 Key: KAFKA-708
 URL: https://issues.apache.org/jira/browse/KAFKA-708
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8, 0.8.1
Reporter: Swapnil Ghike
Assignee: Neha Narkhede
Priority: Blocker
  Labels: bugs
 Fix For: 0.8

 Attachments: kafka-request.log.2013-01-16-15


 Attached state change log shows that ISR becomes empty when a partition is 
 being marked as offline.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-709) Default queue.enqueue.timeout.ms to -1

2013-01-16 Thread Chris Riccomini (JIRA)
Chris Riccomini created KAFKA-709:
-

 Summary: Default queue.enqueue.timeout.ms to -1
 Key: KAFKA-709
 URL: https://issues.apache.org/jira/browse/KAFKA-709
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Jun Rao


Hey Guys,

It seems that, by default, producers in 0.8 are async, and have a default 
queue.enqueue.timeout.ms of 0. This means that anyone who reads messages faster 
than they're producing them will likely end up eventually hitting this 
exception:

Exception in thread Thread-3 kafka.common.QueueFullException: Event queue is 
full of unsent messages, could not send event: 
KeyedMessage(PageViewEventByGroupJson,Missing Page 
Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125])
at 
kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111)
at kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
at kafka.producer.Producer.asyncSend(Producer.scala:89)
at kafka.producer.Producer.send(Producer.scala:77)

As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can 
result in losing messages, and nasty exceptions in the logs. I think the better 
default is setting queue.enqueue.timeout.ms to -1, which will just block until 
the queue frees up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-709) Default queue.enqueue.timeout.ms to -1

2013-01-16 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13555782#comment-13555782
 ] 

Jay Kreps commented on KAFKA-709:
-

Yeah I agree. Everyone who tries to use it hits this and then basically just 
thinks kafka is broken because the error message isn't very clear.

 Default queue.enqueue.timeout.ms to -1
 --

 Key: KAFKA-709
 URL: https://issues.apache.org/jira/browse/KAFKA-709
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Jun Rao

 Hey Guys,
 It seems that, by default, producers in 0.8 are async, and have a default 
 queue.enqueue.timeout.ms of 0. This means that anyone who reads messages 
 faster than they're producing them will likely end up eventually hitting this 
 exception:
 Exception in thread Thread-3 kafka.common.QueueFullException: Event queue 
 is full of unsent messages, could not send event: 
 KeyedMessage(PageViewEventByGroupJson,Missing Page 
 Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125])
 at 
 kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111)
 at 
 kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
 at kafka.producer.Producer.asyncSend(Producer.scala:89)
 at kafka.producer.Producer.send(Producer.scala:77)
 As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can 
 result in losing messages, and nasty exceptions in the logs. I think the 
 better default is setting queue.enqueue.timeout.ms to -1, which will just 
 block until the queue frees up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


dynamic topic config

2013-01-16 Thread Jay Kreps
I added a first pass at a proposal for moving topic configuration into
zookeeper so that you would specify the config when you created the topic
and config could be altered without bouncing brokers. I think other people
know more about the zookeeper interaction so I probably need help working
out some of the details for the best way to do that:

https://cwiki.apache.org/confluence/display/KAFKA/Dynamic+Topic+Config

-Jay


Abou Kafka 0.8 producer throughput test

2013-01-16 Thread Jun Guo -X (jungu - CIIC at Cisco)
Hi,
  I do producer(Kafka 0.8) throughput test many times. But the average 
value is 3MB/S. Below is my test environment:
   CPU core  :16
   Vendor_id :GenuineIntel
   Cpu family :6
   Cpu MHz  :2899.999
   Cache size:20480 KB
   Cpu level  :13
   MEM :16330832KB=15.57GB
   Disk   : RAID5

   I don’t know the detail information about the disk, such as rotation. 
But I do some test about the I/O performance of the disk. The write rate is 
500MB~600MB/S, the read rate is 180MB/S. The detail is as below.
[cid:image002.png@01CDF4AE.52046900]

And I adjust the broker configuration file as the official document says as 
below. And I adjust the JVM to 5120MB.
I run producer performance test with the script kafka-producer-perf-test.sh, 
with the test command is
bin/kafka-producer-perf-test.sh --broker-list 10.75.167.46:49092 --topics 
topic_perf_46_1,topic_perf_46_2,topic_perf_46_3,topic_perf_46_4, 
topic_perf_46_5,topic_perf_46_6, 
topic_perf_46_7,topic_perf_46_8,topic_perf_46_9,topic_perf_46_10 
--initial-message-id 0 --threads 200 --messages 100 --message-size 200 
--compression-codec 1

But the test result is also not as good as the official document says(50MB/S, 
and that value in your paper is 100MB/S). The test result is as below:
2013-01-17 04:15:24:768, 2013-01-17 04:25:01:637, 0, 200, 200, 1907.35, 3.3064, 
1000, 17334.9582

On the other hand, I do consumer throughput test, the result is about 60MB/S 
while that value in official document is 100MB/S.
I really don’t know why?
You know high throughput is one of the most important features of Kafka. So I 
am really concerned with it.

Thanks and best regards!

From: Jay Kreps [mailto:jkr...@linkedin.com]
Sent: 2013年1月16日 2:22
To: Jun Guo -X (jungu - CIIC at Cisco)
Subject: RE: About acknowledge from broker to producer in your paper.

Not sure which version you are using...

In 0.7 this would happen only if there was a socket level error (i.e. can't 
connect to the host). This covers a lot of cases since in the event of I/O 
errors (disk full, etc) we just have that node shut itself down to let others 
take over.

In 0.8 we send all errors back to the client.

So the difference is that, for example, in the event of a disk error, in 0.7 
the client would send a message, the broker would get an error and shoot itself 
in the head and disconnect its clients, and the client would get the error the 
next time it tried to send a message. So in 0.7 the error might not get passed 
back to the client until the second message send. In 0.8 this would happen with 
the first send, which is an improvement.

-Jay

From: Jun Guo -X (jungu - CIIC at Cisco) [ju...@cisco.com]
Sent: Monday, January 14, 2013 9:45 PM
To: Jay Kreps
Subject: About acknowledge from broker to producer in your paper.
Hi,
   I have read your paper Kafka: a Distributed Messaging System for Log 
Processing .
   In experimental results part. There are some words as below:

   There are a few reasons why Kafka performed much better. First, the 
Kafka producer currently doesn’t wait for acknowledgements from the broker and 
sends messages as faster as the broker can handle. This significantly increased 
the throughput of the publisher. With a batch size of 50, a single Kafka 
producer almost saturated the 1Gb link between the producer and the broker. 
This is a valid optimization for the log aggregation case, as data must be sent 
asynchronously to avoid introducing any latency into the live serving of 
traffic. We note that without acknowledging the producer, there is no guarantee 
that every published message is actually received by the broker. For many types 
of log data, it is desirable to trade durability for throughput, as long as the 
number of dropped messages is relatively small. However, we do plan to
address the durability issue for more critical data in the future.

   But I have done a series of test. I found that ,if I shut down all the 
brokers, when I send a message from producer to broker, the producer will 
report kafka.common.FailedToSendMessageException . It says, Failed to send 
messages after 3 tries.
[cid:image003.png@01CDF4AE.D547ED00]
   If there is no acknowledge from broker, how the producer find the 
sending is failed? And how it try 3 times?

   Maybe, the acknowledge in your paper refers to another thing, if so 
,please tell what is the meaning of acknowledge?

   Many thanks and best regards!

Guo Jun


[jira] [Assigned] (KAFKA-709) Default queue.enqueue.timeout.ms to -1

2013-01-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-709?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede reassigned KAFKA-709:
---

Assignee: Neha Narkhede  (was: Jun Rao)

 Default queue.enqueue.timeout.ms to -1
 --

 Key: KAFKA-709
 URL: https://issues.apache.org/jira/browse/KAFKA-709
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8
Reporter: Chris Riccomini
Assignee: Neha Narkhede
 Attachments: kafka-709.patch


 Hey Guys,
 It seems that, by default, producers in 0.8 are async, and have a default 
 queue.enqueue.timeout.ms of 0. This means that anyone who reads messages 
 faster than they're producing them will likely end up eventually hitting this 
 exception:
 Exception in thread Thread-3 kafka.common.QueueFullException: Event queue 
 is full of unsent messages, could not send event: 
 KeyedMessage(PageViewEventByGroupJson,Missing Page 
 Group,java.nio.HeapByteBuffer[pos=0 lim=125 cap=125])
 at 
 kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:111)
 at 
 kafka.producer.Producer$$anonfun$asyncSend$1.apply(Producer.scala:89)
 at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
 at 
 scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:32)
 at kafka.producer.Producer.asyncSend(Producer.scala:89)
 at kafka.producer.Producer.send(Producer.scala:77)
 As it says in https://cwiki.apache.org/KAFKA/kafka-mirroring.html, this can 
 result in losing messages, and nasty exceptions in the logs. I think the 
 better default is setting queue.enqueue.timeout.ms to -1, which will just 
 block until the queue frees up.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-707) Improve error message in the producer when sending data to a partition without an active leader

2013-01-16 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-707?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-707:


Attachment: kafka-707.patch

Minor patch to fix the error message in the producer

 Improve error message in the producer when sending data to a partition 
 without an active leader
 ---

 Key: KAFKA-707
 URL: https://issues.apache.org/jira/browse/KAFKA-707
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Affects Versions: 0.8
Reporter: Neha Narkhede
Assignee: Neha Narkhede
 Attachments: kafka-707.patch

   Original Estimate: 24h
  Remaining Estimate: 24h

 We log a very cryptic message when the producer tries to send data to a 
 partition that doesn't have a leader at that moment -
 Failed to send to broker -1 with data Map([PageViewEventByGroupJson,8] 
 Let's improve this to log a better error message

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira