Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure

2014-07-25 Thread David Corley
Hey Gouzhang,
Yes, I spotted that commit and had updated my working copy to that, but the
test is still hanging. If it's any help, the test looks like it's doing
_something_  as the CPU usage ramps up significantly and stays there until
I kill the process.
/Dave


On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Dave,

 KAFKA-1533 has just been committed targeting at this issue. Did your update
 on trunk include this commit?

 commit ff05e9b3616a222e29a42f6e8fdf41945a417f41
 Author: Guozhang Wang guw...@linkedin.com
 Date:   Tue Jul 22 14:14:19 2014 -0700

 Guozhang

 kafka-1533; transient unit test failure in ProducerFailureHandlingTest;
 reviewed by Guozhang Wang; reviewed by Jun Rao




 On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com
 wrote:

  Hey all,
  I'm trying my hand at writing some patches for open issues, but I'm
 running
  into issues with running gradlew test.
  It hangs every time when trying to run testBrokerFailure in the
  ProducerFailureHangingTest suite.
 
  It was working for a time, but I updated to trunk HEAD and it's no longer
  working.
  I'm running on OSX with JDK 1.6.0_65.
  I tried increasing the HeapSize for the test target, but hit the same
  issue.
  Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
 
  Would appreciate any help.
  Regards,
  Dave
 



 --
 -- Guozhang



[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-25 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074119#comment-14074119
 ] 

Dmitry Bugaychenko commented on KAFKA-1414:
---

I'm not a scala expert, but the threading schema (at least how I understood it) 
looks fine.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Fix For: 0.8.2

 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
 KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074122#comment-14074122
 ] 

Joe Stein commented on KAFKA-1555:
--

The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want message loss 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three. Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to MUST be in different data 
centers.  After sync writing to them them join their result (I say join 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful transaction.

If that doesn't work for you then you must accept data loss as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding (returning to your caller or 
fulfilling the promise) that the message has been written.  

Hopefully this make sense and I appreciate that not all (even most) use cases 
need this multi data center + multi rack type of sustainability but it works 
with Kafka if you go by what Kafka guarantees without trying to change it 
unnecessarily.

If there are defects we should fix them but going up and down this thread I am 
getting a bit lost in what we should be doing (if anything) to the current code 
now.



 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074122#comment-14074122
 ] 

Joe Stein edited comment on KAFKA-1555 at 7/25/14 6:11 AM:
---

The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want message loss 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three). Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to MUST be in different data 
centers.  After sync writing to them them join their result (I say join 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful transaction.

If that doesn't work for you then you must accept data loss as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding (returning to your caller or 
fulfilling the promise) that the message has been written.  

Hopefully this make sense and I appreciate that not all (even most) use cases 
need this multi data center + multi rack type of sustainability but it works 
with Kafka if you go by what Kafka guarantees without trying to change it 
unnecessarily.

If there are defects we should fix them but going up and down this thread I am 
getting a bit lost in what we should be doing (if anything) to the current code 
now.




was (Author: joestein):
The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want message loss 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three. Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to MUST be in different data 
centers.  After sync writing to them them join their result (I say join 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful transaction.

If that doesn't work for you then you must accept data loss as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding 

[jira] [Updated] (KAFKA-1542) normal IOException in the new producer is logged as ERROR

2014-07-25 Thread David Corley (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Corley updated KAFKA-1542:


Attachment: KAFKA-1542.patch

Attaching patch

 normal IOException in the new producer is logged as ERROR
 -

 Key: KAFKA-1542
 URL: https://issues.apache.org/jira/browse/KAFKA-1542
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1542.patch


 Saw the following error in the log. It seems this can happen if the broker is 
 down. So, this probably should be logged as WARN, instead ERROR.
 2014/07/16 00:12:51.799 [Selector] Error in I/O: 
 java.io.IOException: Connection timed out
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:241)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114)
 at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1542) normal IOException in the new producer is logged as ERROR

2014-07-25 Thread David Corley (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Corley updated KAFKA-1542:


Status: Patch Available  (was: Open)

 normal IOException in the new producer is logged as ERROR
 -

 Key: KAFKA-1542
 URL: https://issues.apache.org/jira/browse/KAFKA-1542
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1542.patch


 Saw the following error in the log. It seems this can happen if the broker is 
 down. So, this probably should be logged as WARN, instead ERROR.
 2014/07/16 00:12:51.799 [Selector] Error in I/O: 
 java.io.IOException: Connection timed out
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:241)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114)
 at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1502) source jar is empty

2014-07-25 Thread David Corley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074158#comment-14074158
 ] 

David Corley commented on KAFKA-1502:
-

Can't reproduce this on trunk or on the 0.8.1 branch. Seems to be resolved.

 source jar is empty
 ---

 Key: KAFKA-1502
 URL: https://issues.apache.org/jira/browse/KAFKA-1502
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Joel Koshy
  Labels: newbie

 When doing a local publish, kafka_2.8.0-0.8.1.1-sources.jar only contains the 
 following files.
 META-INF/
 META-INF/MANIFEST.MF
 LICENSE
 NOTICE



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2014-07-25 Thread Daniel Compton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074275#comment-14074275
 ] 

Daniel Compton commented on KAFKA-960:
--

What are the plans for Metrics 3.0? I was thinking at taking a crack at this 
but it looks like there might be a private fork where this has been done 
already? Also, because it will change the name of the Mbeans to stop wrapping 
them in quotes, would you want this to be in 0.9 or is a breaking change like 
this OK in 0.8.2?

 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Cosmin Lehene

 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Jiang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074343#comment-14074343
 ] 

Jiang Wu commented on KAFKA-1555:
-

Jun,

We may need to tolerate more than R-1 failures that are spread over a long time 
period, for example, all brokers are restarted in turn in 20 minutes. 

Our concern with acks=-1 is that, it can happen that only 1 broker is in ISR, 
therefore a published message may have only one copy. Lossing that copy results 
in the message loss permanently.

For ack=2, I'm considering two approaches to avoid a lagged replica to be 
elected as new leader:
1. set replica.lag.max.messages=0. When new leader election happens, first 
update the ISR list. The replica without the lastest message will be removed 
out of ISR and has not chance to be new leader.
2. When new leader election happens, choose the replica with more messages as 
the new leader.

Could you comment on the approaches?

Regards,
Jiang

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074460#comment-14074460
 ] 

Jun Rao commented on KAFKA-1555:


Jiang,

If you use controlled shutdown, we will make sure that there is at least 
another replica in ISR before shutting the broker.

Two comments on your proposal.
1. In some of the corner cases, the replica with the most message shouldn't be 
the leader. Consider the following example. Suppose that we have 2 replicas A 
and B. At some point the messages in the logs look like the following.

offset   A B
0 m1   m1
1 m2 
2 m3 

Suppose that m1 is committed and m2 and m3 are not. At this point, A dies. B 
takes over as the new leader. B then accepts and commits message m4. The log 
will now like the following.

offset   A B
0 m1   m1
1 m2   m4
2 m3 

At this point, let's say both A and B go down and come up again. Now, you can't 
pick replica A as the new leader even though it has more message. Otherwise, 
you will lose the committed message m4. Note that it's ok to lose message m2 
and m3 since they are never committed. The correct way to pick the longest 
log is a bit more involved and would require knowing the leader generation id 
of each message. For details, see KAFKA-1211.

2. A second thing that you need to worry about is coordination. Suppose that we 
have the same above scenario when both A and B die. If B comes up first, we 
actually can elect B as the new leader without waiting for A. However, if you 
need to compare the length of the log, you need to wait for A to come back 
since A could be having the longest log. Then the question is that if A never 
comes back, will you block the writes forever or do you risk losing committed 
messages.

ISR is sort of our way for remembering the new leader candidates. Therefore, 
without coordination among replicas, we can easily figure out who can be the 
new leader.




 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure

2014-07-25 Thread Guozhang Wang
Could you get a thread dump when it hangs?


On Thu, Jul 24, 2014 at 11:01 PM, David Corley davidcor...@gmail.com
wrote:

 Hey Gouzhang,
 Yes, I spotted that commit and had updated my working copy to that, but the
 test is still hanging. If it's any help, the test looks like it's doing
 _something_  as the CPU usage ramps up significantly and stays there until
 I kill the process.
 /Dave


 On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hi Dave,
 
  KAFKA-1533 has just been committed targeting at this issue. Did your
 update
  on trunk include this commit?
 
  commit ff05e9b3616a222e29a42f6e8fdf41945a417f41
  Author: Guozhang Wang guw...@linkedin.com
  Date:   Tue Jul 22 14:14:19 2014 -0700
 
  Guozhang
 
  kafka-1533; transient unit test failure in
 ProducerFailureHandlingTest;
  reviewed by Guozhang Wang; reviewed by Jun Rao
 
 
 
 
  On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com
  wrote:
 
   Hey all,
   I'm trying my hand at writing some patches for open issues, but I'm
  running
   into issues with running gradlew test.
   It hangs every time when trying to run testBrokerFailure in the
   ProducerFailureHangingTest suite.
  
   It was working for a time, but I updated to trunk HEAD and it's no
 longer
   working.
   I'm running on OSX with JDK 1.6.0_65.
   I tried increasing the HeapSize for the test target, but hit the same
   issue.
   Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
  
   Would appreciate any help.
   Regards,
   Dave
  
 
 
 
  --
  -- Guozhang
 




-- 
-- Guozhang


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074465#comment-14074465
 ] 

Jun Rao commented on KAFKA-960:
---

This is probably going to affect many existing clients. The new clients will no 
longer depend on metrics. Perhaps we can defer the upgrade until then.

What are the main benefits from upgrading to metrics 3.0?

 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Cosmin Lehene

 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1542) normal IOException in the new producer is logged as ERROR

2014-07-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074474#comment-14074474
 ] 

Guozhang Wang commented on KAFKA-1542:
--

Thanks for the patch. LGTM.

 normal IOException in the new producer is logged as ERROR
 -

 Key: KAFKA-1542
 URL: https://issues.apache.org/jira/browse/KAFKA-1542
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1542.patch


 Saw the following error in the log. It seems this can happen if the broker is 
 down. So, this probably should be logged as WARN, instead ERROR.
 2014/07/16 00:12:51.799 [Selector] Error in I/O: 
 java.io.IOException: Connection timed out
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:241)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114)
 at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1477) add authentication layer and initial JKS x509 implementation for brokers, producers and consumer for network communication

2014-07-25 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074513#comment-14074513
 ] 

Joe Stein commented on KAFKA-1477:
--

[~junrao] While I think it make sense to not try to add too much to the 
existing producer and consumer they are being used in the wild quite heavily 
right now.  There is real need for this feature (besides Salesforce others are 
doing security with Kafka outside of the upstream project and starting to-do 
things differently enough that the solution is becoming inconsistent) and a lot 
of this is really about the changes to the broker... Once the patch is 
committed then any client (Go, Python, etc) can implement SSL with the Broker. 
The existing producer/consumer can also be used out of the box for folks that 
want to flip the feature on now.  

Waiting for the new producer won't work IMHO from a product perspective because 
then we only delivered half the solution (the other half being consuming over 
SSL). We can get the feature in now for something that folks are using already 
(this is an upgrade to Raja's original work to latest trunk).  I understand 
this may extend the life of the existing producer/consumer a little bit until 
the new producer and consumer support the feature too.  I think the reality 
though is that existing producer / consumer will be around as folks will be 
slow to upgrade though I do expect them to eventually move to shiny new once 
it becomes less new or they have a real need for the change (some do not).

If there are no objections (-1 s) to the patch Ivan posted he should have this 
done finalized within the next 2-3 weeks (minor things). Once done with review 
and testing I expect to +1 and commit the patch.  I also appreciate the extent 
of the change and also appreciate any effort that can be made to help vet this 
change to make sure Kafka keeps doing everything that everyone expects Kafka to 
be doing when the feature is not on.  


 add authentication layer and initial JKS x509 implementation for brokers, 
 producers and consumer for network communication
 --

 Key: KAFKA-1477
 URL: https://issues.apache.org/jira/browse/KAFKA-1477
 Project: Kafka
  Issue Type: New Feature
Reporter: Joe Stein
Assignee: Ivan Lyutov
 Fix For: 0.8.2

 Attachments: KAFKA-1477-binary.patch, KAFKA-1477.patch, 
 KAFKA-1477_2014-06-02_16:59:40.patch, KAFKA-1477_2014-06-02_17:24:26.patch, 
 KAFKA-1477_2014-06-03_13:46:17.patch






--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Jiang Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074522#comment-14074522
 ] 

Jiang Wu commented on KAFKA-1555:
-

Jun,

In your example, what are tthe replication and acks values? When you say m1 is 
committed, dies that mean the producer's send(m1) returns successfully?

Could you also comment on approach 1:
1. set replica.lag.max.messages=0. When new leader election happens, first 
update the ISR list. The replica without the lastest message will be removed 
out of ISR and has not chance to be new leader.

Thanks,
Jiang

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074532#comment-14074532
 ] 

Jun Rao commented on KAFKA-1555:


In our current design, a message is considered committed if it's written to all 
replicas in ISR. If ack=-1 is used, the producer will only be acked when the 
message is committed.

It seems that approach 1 also needs coordination among multiple replicas during 
leader election. So both my previous comments also apply.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1123) Broker IPv6 addresses parsed incorrectly

2014-07-25 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1123:
---

 Priority: Major  (was: Minor)
Affects Version/s: (was: 0.8.0)
   0.8.2
   Labels: newbie  (was: )

We probably should add the IPV6 support asap. It seems that for a given string 
of host:port, we can just look up for the last : to separate the host and 
port. This should cover both IPV4 and IPV6. We will need to do that for both 
the old and the new producer.

 Broker IPv6 addresses parsed incorrectly
 

 Key: KAFKA-1123
 URL: https://issues.apache.org/jira/browse/KAFKA-1123
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Andrew Otto
Assignee: Jun Rao
  Labels: newbie

 It seems that broker addresses are parsed incorrectly when IPv6 addresses are 
 supplied.  IPv6 addresses have colons in them, and Kafka seems to be 
 interpreting the first : as the address:port separator.
 I have only tried this with the console-producer --broker-list option, so I 
 don't know if this affects anything deeper than the CLI.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23767: Fix KAFKA-1430

2014-07-25 Thread Guozhang Wang


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, lines 120-130
  https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line120
 
  We may not be able to remove the readlock here. The issue is that this 
  method accesses not only leaderReplicaIdOpt, but other internal data 
  structures like assignedReplicaMap. Without the lock, the read from the Map 
  could fail even it's being concurrently modified.
  
  In general, we can get away with the lock only if we want to read a 
  single internal value. Perhaps we can introduce another function 
  isLeaderLocal() that returns a boolean. This method will only need to 
  access leaderReplicaIdOpt. Then all callers will first call 
  leaderReplicaIfLocal and hold onto the leader replica. They can then use 
  isLeaderLocal to see if the leader has changed subsequently.

Would this be the same as what we did now? In getReplica(localBrokerId), if the 
replica map has changed and the id is no longer in the map, it will return 
None; if the replica map has changed and the id is no longer the leader, it is 
just the same as when we callled leaderReplicaIfLocal() to get the leader, and 
then immediately the leader changed?


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, lines 268-269
  https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line268
 
  Instead of using negation, could we do 
  leaderHW.precedes(replica.logEndOffset)?
  
  Also, could we move  to the previous line?

We can not simply leaderHW.precedes(replica.logEndOffset) since we need to 
consider =, will use messagesDiff instead.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Replica.scala, lines 33-36
  https://reviews.apache.org/r/23767/diff/2/?file=637562#file637562line33
 
  Should we rename highWatermarkValue and logEndOffsetValue to 
  highWatermarkMetadata and logEndOffsetMetadata?

Fixed.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, lines 372-373
  https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line372
 
  Move  to previous line?

Fixed.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, lines 324-325
  https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line324
 
  Perhaps we could create TopicPartionRequestKey just once?

Fixed.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, lines 296-297
  https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line296
 
  This seems to be an existing problem. If ack=-1, a safer check is HW = 
  requiredOffset. This way, we will be sure that if ISR expands, the acked 
  message is guaranteed to be in the replicas newly added to ISR.
  
  The following is an example that shows the issue with the existing 
  check. Suppose that all replicas in ISR are at offset 10, but HW is still 
  at 8 and we call checkEnoughReplicasReachOffset on offset 9.  The check 
  will be satisfied and the message is considered committed. We will be 
  updating HW to 10 pretty soon. However, before that happens, another 
  replica whose LEO is at 8 can be added to ISR. This replica won't have 
  message 9, which is acked as committed.

Fixed.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/cluster/Partition.scala, lines 194-195
  https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line194
 
  It's possible that we get an UnknownOffsetMetadata during the 
  conversion. In this case, we probably should set HW to logEndOffset.

Fixed.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/log/Log.scala, lines 173-174
  https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line173
 
  It would be a bit confusing to reason about the consistency btw 
  nextOffset and nextOffsetMetadata since they are not updated atomically. 
  Could we just keep nextOffsetMetadata?

Good point. Fixed.


 On July 22, 2014, 5:02 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/log/Log.scala, lines 429-431
  https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line429
 
  Perhaps we can add a bit details to the comment. So, we are in the 
  situation that the startOffset is in range, but we can't find a single 
  message whose offset is = startOffset. One possibility seems to be that 
  all messages after startOffset have been deleted due to compaction. Is that 
  the only case? Let's describe all situations when this can happen.

Great catch. I was originally thinking this can only happen when the regular 
consumer's fetch request is issued with a max offset set to the HW, and all 
messages beyond start offset is also beyond max offset. But later I realized in 
this case LogSegment.read() will not actually return 

Re: Review Request 23767: Fix KAFKA-1430

2014-07-25 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
---

(Updated July 25, 2014, 4:52 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
---

Address Jun's comments: 1. LogSegment.read() will also return fetch info, even 
if the corresponding message set is empty; 2. Purgatory checking satisfactory 
in checkAndMaybeWatch synchronously, and will only return false if this thread 
successfully set the satisfactory bit to true; 3. Remove the read lock on 
Partition's reading of the leaderOpt and epoch and making them volatile instead 
since these two functions are just single read; 4. Fix some minor issues in 
TestEndToEndLatency; 5. Other minor fixes


Diffs (updated)
-

  core/src/main/scala/kafka/api/FetchRequest.scala 
55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
  core/src/main/scala/kafka/api/FetchResponse.scala 
d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala 
f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
  core/src/main/scala/kafka/cluster/Replica.scala 
5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
f8c1b4e674f7515c377c6c30d212130f1ff022dd 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/Log.scala 
b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 
afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/main/scala/kafka/log/LogSegment.scala 
0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/KafkaApis.scala 
fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 
3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
af4783646803e58714770c21f8c3352370f26854 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 
5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 
8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 
6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 
1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 
6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 
558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 
4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Updated] (KAFKA-1430) Purgatory redesign

2014-07-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1430:
-

Attachment: KAFKA-1430_2014-07-25_09:52:43.patch

 Purgatory redesign
 --

 Key: KAFKA-1430
 URL: https://issues.apache.org/jira/browse/KAFKA-1430
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Guozhang Wang
 Attachments: KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, 
 KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430_2014-06-05_17:07:37.patch, 
 KAFKA-1430_2014-06-05_17:13:13.patch, KAFKA-1430_2014-06-05_17:40:53.patch, 
 KAFKA-1430_2014-06-10_11:22:06.patch, KAFKA-1430_2014-06-10_11:26:02.patch, 
 KAFKA-1430_2014-07-11_10:59:13.patch, KAFKA-1430_2014-07-21_12:53:39.patch, 
 KAFKA-1430_2014-07-25_09:52:43.patch


 We have seen 2 main issues with the Purgatory.
 1. There is no atomic checkAndWatch functionality. So, a client typically 
 first checks whether a request is satisfied or not and then register the 
 watcher. However, by the time the watcher is registered, the registered item 
 could already be satisfied. This item won't be satisfied until the next 
 update happens or the delayed time expires, which means the watched item 
 could be delayed. 
 2. FetchRequestPurgatory doesn't quite work. This is because the current 
 design tries to incrementally maintain the accumulated bytes ready for fetch. 
 However, this is difficult since the right time to check whether a fetch (for 
 regular consumer) request is satisfied is when the high watermark moves. At 
 that point, it's hard to figure out how many bytes we should incrementally 
 add to each pending fetch request.
 The problem has been reported in KAFKA-1150 and KAFKA-703.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1430) Purgatory redesign

2014-07-25 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074567#comment-14074567
 ] 

Guozhang Wang commented on KAFKA-1430:
--

Updated reviewboard https://reviews.apache.org/r/23767/
 against branch origin/trunk

 Purgatory redesign
 --

 Key: KAFKA-1430
 URL: https://issues.apache.org/jira/browse/KAFKA-1430
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Guozhang Wang
 Attachments: KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430.patch, 
 KAFKA-1430.patch, KAFKA-1430.patch, KAFKA-1430_2014-06-05_17:07:37.patch, 
 KAFKA-1430_2014-06-05_17:13:13.patch, KAFKA-1430_2014-06-05_17:40:53.patch, 
 KAFKA-1430_2014-06-10_11:22:06.patch, KAFKA-1430_2014-06-10_11:26:02.patch, 
 KAFKA-1430_2014-07-11_10:59:13.patch, KAFKA-1430_2014-07-21_12:53:39.patch, 
 KAFKA-1430_2014-07-25_09:52:43.patch


 We have seen 2 main issues with the Purgatory.
 1. There is no atomic checkAndWatch functionality. So, a client typically 
 first checks whether a request is satisfied or not and then register the 
 watcher. However, by the time the watcher is registered, the registered item 
 could already be satisfied. This item won't be satisfied until the next 
 update happens or the delayed time expires, which means the watched item 
 could be delayed. 
 2. FetchRequestPurgatory doesn't quite work. This is because the current 
 design tries to incrementally maintain the accumulated bytes ready for fetch. 
 However, this is difficult since the right time to check whether a fetch (for 
 regular consumer) request is satisfied is when the high watermark moves. At 
 that point, it's hard to figure out how many bytes we should incrementally 
 add to each pending fetch request.
 The problem has been reported in KAFKA-1150 and KAFKA-703.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1542) normal IOException in the new producer is logged as ERROR

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074579#comment-14074579
 ] 

Jun Rao commented on KAFKA-1542:


Thanks for the patch. Perhaps we can just log the inetaddress. Getting the 
hostname on the client may not always be possible.

 normal IOException in the new producer is logged as ERROR
 -

 Key: KAFKA-1542
 URL: https://issues.apache.org/jira/browse/KAFKA-1542
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1542.patch


 Saw the following error in the log. It seems this can happen if the broker is 
 down. So, this probably should be logged as WARN, instead ERROR.
 2014/07/16 00:12:51.799 [Selector] Error in I/O: 
 java.io.IOException: Connection timed out
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:241)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114)
 at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1542) normal IOException in the new producer is logged as ERROR

2014-07-25 Thread David Corley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074582#comment-14074582
 ] 

David Corley commented on KAFKA-1542:
-

Sure. Will revise the patch to get the address instead.

 normal IOException in the new producer is logged as ERROR
 -

 Key: KAFKA-1542
 URL: https://issues.apache.org/jira/browse/KAFKA-1542
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1542.patch


 Saw the following error in the log. It seems this can happen if the broker is 
 down. So, this probably should be logged as WARN, instead ERROR.
 2014/07/16 00:12:51.799 [Selector] Error in I/O: 
 java.io.IOException: Connection timed out
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:241)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114)
 at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074625#comment-14074625
 ] 

Jun Rao commented on KAFKA-1414:


Thanks for patch v4. Looks good. Some minor comments below.

50. LogManager.checkpointLogsInDir(dir: File): I am not sure that we need a for 
loop there since we are just writing a single checkpoint file.

51. ReplicaMangerTest: unused imports

52. KafkaConfig: Would num.recovery.threads.per.data.dir be better than 
log.threads.per.data.dir? In the comment, we can explain that this is only used 
during startup and shutdown of the log.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Fix For: 0.8.2

 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
 KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure

2014-07-25 Thread David Corley
Sure. Attaching here.



On Fri, Jul 25, 2014 at 4:16 PM, Guozhang Wang wangg...@gmail.com wrote:

 Could you get a thread dump when it hangs?


 On Thu, Jul 24, 2014 at 11:01 PM, David Corley davidcor...@gmail.com
 wrote:

  Hey Gouzhang,
  Yes, I spotted that commit and had updated my working copy to that, but
 the
  test is still hanging. If it's any help, the test looks like it's doing
  _something_  as the CPU usage ramps up significantly and stays there
 until
  I kill the process.
  /Dave
 
 
  On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Dave,
  
   KAFKA-1533 has just been committed targeting at this issue. Did your
  update
   on trunk include this commit?
  
   commit ff05e9b3616a222e29a42f6e8fdf41945a417f41
   Author: Guozhang Wang guw...@linkedin.com
   Date:   Tue Jul 22 14:14:19 2014 -0700
  
   Guozhang
  
   kafka-1533; transient unit test failure in
  ProducerFailureHandlingTest;
   reviewed by Guozhang Wang; reviewed by Jun Rao
  
  
  
  
   On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com
   wrote:
  
Hey all,
I'm trying my hand at writing some patches for open issues, but I'm
   running
into issues with running gradlew test.
It hangs every time when trying to run testBrokerFailure in the
ProducerFailureHangingTest suite.
   
It was working for a time, but I updated to trunk HEAD and it's no
  longer
working.
I'm running on OSX with JDK 1.6.0_65.
I tried increasing the HeapSize for the test target, but hit the same
issue.
Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
   
Would appreciate any help.
Regards,
Dave
   
  
  
  
   --
   -- Guozhang
  
 



 --
 -- Guozhang



Re: Gradle hanging on unit test run against ProducerFailureHangingTest-testBrokerFailure

2014-07-25 Thread Guozhang Wang
Did you miss the attachment?


On Fri, Jul 25, 2014 at 10:44 AM, David Corley davidcor...@gmail.com
wrote:

 Sure. Attaching here.



 On Fri, Jul 25, 2014 at 4:16 PM, Guozhang Wang wangg...@gmail.com wrote:

 Could you get a thread dump when it hangs?


 On Thu, Jul 24, 2014 at 11:01 PM, David Corley davidcor...@gmail.com
 wrote:

  Hey Gouzhang,
  Yes, I spotted that commit and had updated my working copy to that, but
 the
  test is still hanging. If it's any help, the test looks like it's doing
  _something_  as the CPU usage ramps up significantly and stays there
 until
  I kill the process.
  /Dave
 
 
  On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Hi Dave,
  
   KAFKA-1533 has just been committed targeting at this issue. Did your
  update
   on trunk include this commit?
  
   commit ff05e9b3616a222e29a42f6e8fdf41945a417f41
   Author: Guozhang Wang guw...@linkedin.com
   Date:   Tue Jul 22 14:14:19 2014 -0700
  
   Guozhang
  
   kafka-1533; transient unit test failure in
  ProducerFailureHandlingTest;
   reviewed by Guozhang Wang; reviewed by Jun Rao
  
  
  
  
   On Thu, Jul 24, 2014 at 5:52 AM, David Corley davidcor...@gmail.com
   wrote:
  
Hey all,
I'm trying my hand at writing some patches for open issues, but I'm
   running
into issues with running gradlew test.
It hangs every time when trying to run testBrokerFailure in the
ProducerFailureHangingTest suite.
   
It was working for a time, but I updated to trunk HEAD and it's no
  longer
working.
I'm running on OSX with JDK 1.6.0_65.
I tried increasing the HeapSize for the test target, but hit the
 same
issue.
Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
   
Would appreciate any help.
Regards,
Dave
   
  
  
  
   --
   -- Guozhang
  
 



 --
 -- Guozhang





-- 
-- Guozhang


[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Oleg Lvovitch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Lvovitch updated KAFKA-1557:
-

Attachment: server2.properties
server1.properties

 ISR reported by TopicMetadataResponse most of the time doesn't match the 
 Zookeeper information (and the truth)
 --

 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: consumer, controller, core, replication
Affects Versions: 0.8.0, 0.8.1
 Environment: OSX 10.9.3, Linux Scientific 6.5
 It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
 Fix For: 0.8.1.1, 0.8.2

 Attachments: server1.properties, server2.properties


 TL;DR - after a topic is created, and at least one broker in the ISR is 
 restarted, the ISR reported by the TopicMetadataResponse is incorrect.
 Specific steps to repro:
 - Download 0.8.1 Kafka
 - Copy server.properties twice into server1.properties and server2.properties 
 (attached) - basically just ports and log paths changed to allow brokers to 
 co-exist
 - Start zookeper using sh bin/zookeeper-server-start.sh 
 config/zookeper.properties
 - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
 - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
 - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --create --topic test --replication-factor 2 --partitions 3
 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --describe --topic test - note that all ISRs are of length 2
 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
 state. Observer that all ISRs are of length 2 and match the information 
 output by the script
 - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
 - Restart broker 2 using the original command
 - Check the status of the topic again. Observe that the leader for all topics 
 is 0 (as expected), and all ISRs contain both brokers (as expected)
 - Run the attached Scala snippet again. 
 EXPECTED:
 - The ISR information are of length 2
 ACTUAL:
 - ALL ISRs contain just broker 0
 NOTE: depending on how long broker 2 was down, sometimes some ISRs will 
 contain the full list, but shutting it down for 15+ secs seem to always yeild 
 consistent repro



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Oleg Lvovitch (JIRA)
Oleg Lvovitch created KAFKA-1557:


 Summary: ISR reported by TopicMetadataResponse most of the time 
doesn't match the Zookeeper information (and the truth)
 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: consumer, controller, core, replication
Affects Versions: 0.8.1, 0.8.0
 Environment: OSX 10.9.3, Linux Scientific 6.5
It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
 Fix For: 0.8.2, 0.8.1.1
 Attachments: server1.properties, server2.properties

TL;DR - after a topic is created, and at least one broker in the ISR is 
restarted, the ISR reported by the TopicMetadataResponse is incorrect.

Specific steps to repro:
- Download 0.8.1 Kafka
- Copy server.properties twice into server1.properties and server2.properties 
(attached) - basically just ports and log paths changed to allow brokers to 
co-exist
- Start zookeper using sh bin/zookeeper-server-start.sh 
config/zookeper.properties
- Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
- Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
- Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic test --replication-factor 2 --partitions 3
- Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
--describe --topic test - note that all ISRs are of length 2
- Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
state. Observer that all ISRs are of length 2 and match the information output 
by the script
- Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
- Restart broker 2 using the original command
- Check the status of the topic again. Observe that the leader for all topics 
is 0 (as expected), and all ISRs contain both brokers (as expected)
- Run the attached Scala snippet again. 

EXPECTED:
- The ISR information are of length 2

ACTUAL:
- ALL ISRs contain just broker 0

NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain 
the full list, but shutting it down for 15+ secs seem to always yeild 
consistent repro





--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Oleg Lvovitch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Lvovitch updated KAFKA-1557:
-

Attachment: BrokenKafkaLink.scala

 ISR reported by TopicMetadataResponse most of the time doesn't match the 
 Zookeeper information (and the truth)
 --

 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: consumer, controller, core, replication
Affects Versions: 0.8.0, 0.8.1
 Environment: OSX 10.9.3, Linux Scientific 6.5
 It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
 Fix For: 0.8.1.1, 0.8.2

 Attachments: BrokenKafkaLink.scala, server1.properties, 
 server2.properties


 TL;DR - after a topic is created, and at least one broker in the ISR is 
 restarted, the ISR reported by the TopicMetadataResponse is incorrect.
 Specific steps to repro:
 - Download 0.8.1 Kafka
 - Copy server.properties twice into server1.properties and server2.properties 
 (attached) - basically just ports and log paths changed to allow brokers to 
 co-exist
 - Start zookeper using sh bin/zookeeper-server-start.sh 
 config/zookeper.properties
 - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
 - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
 - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --create --topic test --replication-factor 2 --partitions 3
 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --describe --topic test - note that all ISRs are of length 2
 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
 state. Observer that all ISRs are of length 2 and match the information 
 output by the script
 - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
 - Restart broker 2 using the original command
 - Check the status of the topic again. Observe that the leader for all topics 
 is 0 (as expected), and all ISRs contain both brokers (as expected)
 - Run the attached Scala snippet again. 
 EXPECTED:
 - The ISR information are of length 2
 ACTUAL:
 - ALL ISRs contain just broker 0
 NOTE: depending on how long broker 2 was down, sometimes some ISRs will 
 contain the full list, but shutting it down for 15+ secs seem to always yeild 
 consistent repro



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Oleg Lvovitch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Lvovitch updated KAFKA-1557:
-

Description: 
TL;DR - after a topic is created, and at least one broker in the ISR is 
restarted, the ISR reported by the TopicMetadataResponse is incorrect.

Specific steps to repro:
- Download 0.8.1 Kafka
- Copy server.properties twice into server1.properties and server2.properties 
(attached) - basically just ports and log paths changed to allow brokers to 
co-exist
- Start zookeper using sh bin/zookeeper-server-start.sh 
config/zookeper.properties
- Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
- Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
- Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic test --replication-factor 2 --partitions 3
- Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
--describe --topic test - note that all ISRs are of length 2
- Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
state. Observer that all ISRs are of length 2 and match the information output 
by the script
- Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
- Restart broker 2 using the original command
- Check the status of the topic again. Observe that the leader for all topics 
is 0 (as expected), and all ISRs contain both brokers (as expected)
- Run the attached Scala snippet again. 

EXPECTED:
- The ISR information are of length 2

ACTUAL:
- ALL ISRs contain just broker 0

NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain 
the full list, but shutting it down for 15+ secs seem to always yield 
consistent repro


Basically it appears that brokers have incorrect ISR information for the 
metadata cache.
Our production servers exhibit the same problem - after a topic gets created 
everything looks fine, but as brokers get restarted, ISR reported by the 
brokers is wrong, whereas the one in ZK appears to report the truth (it shrinks 
as brokers get shut down and grows back up after they get restarted)

I'm not sure if this has wider impact on the functioning of the cluster - bad 
metadata information is bad - but so far there has been no evidence of that


  was:
TL;DR - after a topic is created, and at least one broker in the ISR is 
restarted, the ISR reported by the TopicMetadataResponse is incorrect.

Specific steps to repro:
- Download 0.8.1 Kafka
- Copy server.properties twice into server1.properties and server2.properties 
(attached) - basically just ports and log paths changed to allow brokers to 
co-exist
- Start zookeper using sh bin/zookeeper-server-start.sh 
config/zookeper.properties
- Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
- Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
- Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
--create --topic test --replication-factor 2 --partitions 3
- Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
--describe --topic test - note that all ISRs are of length 2
- Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
state. Observer that all ISRs are of length 2 and match the information output 
by the script
- Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
- Restart broker 2 using the original command
- Check the status of the topic again. Observe that the leader for all topics 
is 0 (as expected), and all ISRs contain both brokers (as expected)
- Run the attached Scala snippet again. 

EXPECTED:
- The ISR information are of length 2

ACTUAL:
- ALL ISRs contain just broker 0

NOTE: depending on how long broker 2 was down, sometimes some ISRs will contain 
the full list, but shutting it down for 15+ secs seem to always yeild 
consistent repro




 ISR reported by TopicMetadataResponse most of the time doesn't match the 
 Zookeeper information (and the truth)
 --

 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: consumer, controller, core, replication
Affects Versions: 0.8.0, 0.8.1
 Environment: OSX 10.9.3, Linux Scientific 6.5
 It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
 Fix For: 0.8.1.1, 0.8.2

 Attachments: BrokenKafkaLink.scala, server1.properties, 
 server2.properties


 TL;DR - after a topic is created, and at least one broker in the ISR is 
 restarted, the ISR reported by the TopicMetadataResponse is incorrect.
 Specific steps to repro:
 - Download 0.8.1 Kafka
 - Copy 

[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-25 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074699#comment-14074699
 ] 

Anton Karamanov commented on KAFKA-1414:


50. in this case `for` used for safe and easy extraction of values from two 
{{Option}}'al values. The semantic here is that body will only be called if 
both {{Option}}'s are defined. It may be replaced with if's or pattern 
matching, although current version is more idiomatic by Scala standarts.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Fix For: 0.8.2

 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
 KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1542) normal IOException in the new producer is logged as ERROR

2014-07-25 Thread David Corley (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1542?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074582#comment-14074582
 ] 

David Corley edited comment on KAFKA-1542 at 7/25/14 6:07 PM:
--

Hey Jun, the current patch returns the IP address. getHostAddress() returns the 
address as a string, whereas getHostName() would be used if we wanted the 
hostname


was (Author: heavydawson):
Sure. Will revise the patch to get the address instead.

 normal IOException in the new producer is logged as ERROR
 -

 Key: KAFKA-1542
 URL: https://issues.apache.org/jira/browse/KAFKA-1542
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1542.patch


 Saw the following error in the log. It seems this can happen if the broker is 
 down. So, this probably should be logged as WARN, instead ERROR.
 2014/07/16 00:12:51.799 [Selector] Error in I/O: 
 java.io.IOException: Connection timed out
 at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
 at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
 at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
 at sun.nio.ch.IOUtil.read(IOUtil.java:197)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
 at 
 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:60)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:241)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:171)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:174)
 at 
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:114)
 at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1557:
--

Component/s: (was: consumer)
 Labels: newbie++  (was: )

 ISR reported by TopicMetadataResponse most of the time doesn't match the 
 Zookeeper information (and the truth)
 --

 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: controller, core, replication
Affects Versions: 0.8.0, 0.8.1
 Environment: OSX 10.9.3, Linux Scientific 6.5
 It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
  Labels: newbie++
 Fix For: 0.8.1.1, 0.8.2

 Attachments: BrokenKafkaLink.scala, server1.properties, 
 server2.properties


 TL;DR - after a topic is created, and at least one broker in the ISR is 
 restarted, the ISR reported by the TopicMetadataResponse is incorrect.
 Specific steps to repro:
 - Download 0.8.1 Kafka
 - Copy server.properties twice into server1.properties and server2.properties 
 (attached) - basically just ports and log paths changed to allow brokers to 
 co-exist
 - Start zookeper using sh bin/zookeeper-server-start.sh 
 config/zookeper.properties
 - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
 - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
 - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --create --topic test --replication-factor 2 --partitions 3
 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --describe --topic test - note that all ISRs are of length 2
 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
 state. Observer that all ISRs are of length 2 and match the information 
 output by the script
 - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
 - Restart broker 2 using the original command
 - Check the status of the topic again. Observe that the leader for all topics 
 is 0 (as expected), and all ISRs contain both brokers (as expected)
 - Run the attached Scala snippet again. 
 EXPECTED:
 - The ISR information are of length 2
 ACTUAL:
 - ALL ISRs contain just broker 0
 NOTE: depending on how long broker 2 was down, sometimes some ISRs will 
 contain the full list, but shutting it down for 15+ secs seem to always yield 
 consistent repro
 Basically it appears that brokers have incorrect ISR information for the 
 metadata cache.
 Our production servers exhibit the same problem - after a topic gets created 
 everything looks fine, but as brokers get restarted, ISR reported by the 
 brokers is wrong, whereas the one in ZK appears to report the truth (it 
 shrinks as brokers get shut down and grows back up after they get restarted)
 I'm not sure if this has wider impact on the functioning of the cluster - bad 
 metadata information is bad - but so far there has been no evidence of that



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1367) Broker topic metadata not kept in sync with ZooKeeper

2014-07-25 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1367:
-

Labels: newbie++  (was: )

 Broker topic metadata not kept in sync with ZooKeeper
 -

 Key: KAFKA-1367
 URL: https://issues.apache.org/jira/browse/KAFKA-1367
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.0, 0.8.1
Reporter: Ryan Berdeen
  Labels: newbie++
 Attachments: KAFKA-1367.txt


 When a broker is restarted, the topic metadata responses from the brokers 
 will be incorrect (different from ZooKeeper) until a preferred replica leader 
 election.
 In the metadata, it looks like leaders are correctly removed from the ISR 
 when a broker disappears, but followers are not. Then, when a broker 
 reappears, the ISR is never updated.
 I used a variation of the Vagrant setup created by Joe Stein to reproduce 
 this with latest from the 0.8.1 branch: 
 https://github.com/also/kafka/commit/dba36a503a5e22ea039df0f9852560b4fb1e067c



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14074766#comment-14074766
 ] 

Joel Koshy commented on KAFKA-1557:
---

For the first question: I don't think cluster health is at risk here since the 
ISR information is important for:
*  leadership decisions
* ack'ing producers when required.acks = -1
Both these use-cases don't rely directly on the metadata cache.

For the second question: given that this is not critical enough for a hot-fix 
and shouldn't be too difficult to fix it would be a good opportunity for 
someone new to the codebase to take a stab at it - that's why we label jiras 
with newbie/newbie++ tags.



 ISR reported by TopicMetadataResponse most of the time doesn't match the 
 Zookeeper information (and the truth)
 --

 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: controller, core, replication
Affects Versions: 0.8.0, 0.8.1
 Environment: OSX 10.9.3, Linux Scientific 6.5
 It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
  Labels: newbie++
 Fix For: 0.8.1.1, 0.8.2

 Attachments: BrokenKafkaLink.scala, server1.properties, 
 server2.properties


 TL;DR - after a topic is created, and at least one broker in the ISR is 
 restarted, the ISR reported by the TopicMetadataResponse is incorrect.
 Specific steps to repro:
 - Download 0.8.1 Kafka
 - Copy server.properties twice into server1.properties and server2.properties 
 (attached) - basically just ports and log paths changed to allow brokers to 
 co-exist
 - Start zookeper using sh bin/zookeeper-server-start.sh 
 config/zookeper.properties
 - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
 - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
 - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --create --topic test --replication-factor 2 --partitions 3
 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --describe --topic test - note that all ISRs are of length 2
 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
 state. Observer that all ISRs are of length 2 and match the information 
 output by the script
 - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
 - Restart broker 2 using the original command
 - Check the status of the topic again. Observe that the leader for all topics 
 is 0 (as expected), and all ISRs contain both brokers (as expected)
 - Run the attached Scala snippet again. 
 EXPECTED:
 - The ISR information are of length 2
 ACTUAL:
 - ALL ISRs contain just broker 0
 NOTE: depending on how long broker 2 was down, sometimes some ISRs will 
 contain the full list, but shutting it down for 15+ secs seem to always yield 
 consistent repro
 Basically it appears that brokers have incorrect ISR information for the 
 metadata cache.
 Our production servers exhibit the same problem - after a topic gets created 
 everything looks fine, but as brokers get restarted, ISR reported by the 
 brokers is wrong, whereas the one in ZK appears to report the truth (it 
 shrinks as brokers get shut down and grows back up after they get restarted)
 I'm not sure if this has wider impact on the functioning of the cluster - bad 
 metadata information is bad - but so far there has been no evidence of that



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1557) ISR reported by TopicMetadataResponse most of the time doesn't match the Zookeeper information (and the truth)

2014-07-25 Thread Oleg Lvovitch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1557?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Lvovitch resolved KAFKA-1557.
--

Resolution: Duplicate

 ISR reported by TopicMetadataResponse most of the time doesn't match the 
 Zookeeper information (and the truth)
 --

 Key: KAFKA-1557
 URL: https://issues.apache.org/jira/browse/KAFKA-1557
 Project: Kafka
  Issue Type: Bug
  Components: controller, core, replication
Affects Versions: 0.8.0, 0.8.1
 Environment: OSX 10.9.3, Linux Scientific 6.5
 It actually doesn't seem to matter and appears to be OS-agnostic
Reporter: Oleg Lvovitch
Assignee: Neha Narkhede
  Labels: newbie++
 Fix For: 0.8.2, 0.8.1.1

 Attachments: BrokenKafkaLink.scala, server1.properties, 
 server2.properties


 TL;DR - after a topic is created, and at least one broker in the ISR is 
 restarted, the ISR reported by the TopicMetadataResponse is incorrect.
 Specific steps to repro:
 - Download 0.8.1 Kafka
 - Copy server.properties twice into server1.properties and server2.properties 
 (attached) - basically just ports and log paths changed to allow brokers to 
 co-exist
 - Start zookeper using sh bin/zookeeper-server-start.sh 
 config/zookeper.properties
 - Start broker1: 'sh bin/kafka-server-start.sh config/server1.properties
 - Start broker2: 'sh bin/kafka-server-start.sh config/server2.properties
 - Create a new topic: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --create --topic test --replication-factor 2 --partitions 3
 - Examine topic state: sh bin/kafka-topics.sh --zookeeper localhost:2181 
 --describe --topic test - note that all ISRs are of length 2
 - Run the attached Scala code that uses TopicMetadataRequest to exmaine topic 
 state. Observer that all ISRs are of length 2 and match the information 
 output by the script
 - Shut down broker2 (simply hit Cntrl-C in the terminal), wait 5-10 seconds
 - Restart broker 2 using the original command
 - Check the status of the topic again. Observe that the leader for all topics 
 is 0 (as expected), and all ISRs contain both brokers (as expected)
 - Run the attached Scala snippet again. 
 EXPECTED:
 - The ISR information are of length 2
 ACTUAL:
 - ALL ISRs contain just broker 0
 NOTE: depending on how long broker 2 was down, sometimes some ISRs will 
 contain the full list, but shutting it down for 15+ secs seem to always yield 
 consistent repro
 Basically it appears that brokers have incorrect ISR information for the 
 metadata cache.
 Our production servers exhibit the same problem - after a topic gets created 
 everything looks fine, but as brokers get restarted, ISR reported by the 
 brokers is wrong, whereas the one in ZK appears to report the truth (it 
 shrinks as brokers get shut down and grows back up after they get restarted)
 I'm not sure if this has wider impact on the functioning of the cluster - bad 
 metadata information is bad - but so far there has been no evidence of that



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests

2014-07-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1420:
-

Description: 
This is a follow-up JIRA from KAFKA-1389.

There are a bunch of places in the unit tests where we mis-use 
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK

  was:
This is a follow-up JIRA from KAFKA-1389.

There are a bunch of places in the unit tests where we mis-use 


 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with 
 TestUtils.createTopic in all unit tests
 --

 Key: KAFKA-1420
 URL: https://issues.apache.org/jira/browse/KAFKA-1420
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.2


 This is a follow-up JIRA from KAFKA-1389.
 There are a bunch of places in the unit tests where we mis-use 
 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in all unit tests

2014-07-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1420:
-

Description: 
This is a follow-up JIRA from KAFKA-1389.

There are a bunch of places in the unit tests where we mis-use 

  was:This is a follow-up JIRA from KAFKA-1389


 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with 
 TestUtils.createTopic in all unit tests
 --

 Key: KAFKA-1420
 URL: https://issues.apache.org/jira/browse/KAFKA-1420
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.2


 This is a follow-up JIRA from KAFKA-1389.
 There are a bunch of places in the unit tests where we mis-use 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests

2014-07-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1420:
-

Summary: Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK 
with TestUtils.createTopic in unit tests  (was: Replace 
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with 
TestUtils.createTopic in all unit tests)

 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with 
 TestUtils.createTopic in unit tests
 --

 Key: KAFKA-1420
 URL: https://issues.apache.org/jira/browse/KAFKA-1420
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.2


 This is a follow-up JIRA from KAFKA-1389.
 There are a bunch of places in the unit tests where we mis-use 
 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1420) Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with TestUtils.createTopic in unit tests

2014-07-25 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1420:
-

Description: 
This is a follow-up JIRA from KAFKA-1389.

There are a bunch of places in the unit tests where we misuse 
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK to create topics, 
where TestUtils.createTopic needs to be used instead.

  was:
This is a follow-up JIRA from KAFKA-1389.

There are a bunch of places in the unit tests where we mis-use 
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK


 Replace AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK with 
 TestUtils.createTopic in unit tests
 --

 Key: KAFKA-1420
 URL: https://issues.apache.org/jira/browse/KAFKA-1420
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.2


 This is a follow-up JIRA from KAFKA-1389.
 There are a bunch of places in the unit tests where we misuse 
 AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK to create topics, 
 where TestUtils.createTopic needs to be used instead.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread saurabh agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075038#comment-14075038
 ] 

saurabh agarwal commented on KAFKA-1555:


Jun, 

I agree that ack=-1 works fine for the most of the use cases. Here is another 
suggestion that might address our use case. Can we add an additional option in 
producer property - min.isr.required ( similar to dfs.replication.min) for 
durability. ack=-1 ensures that every replicas in ISR will receive the 
message before producer get the ack. And min.isr.required=2 ensures that 
there are minimum two replicas in ISR to publish a message. Otherwise it will 
throw the exception that Number of the required replicas is not in ISR. 

Here is the example where this will be very useful. Take a scenario where the 
producer was publishing at very high rate. We bring down two follower replicas. 
Now when we bring back up those replicas, it took a while for those replicas to 
catch up as there are still messages getting published at the higher rate. So 
there is no replicas in ISR for a while. During this time, if the disk at the 
leader replica fail, then we will not have any replica who has those messages. 
It would be good if we have more than one copy in ISR all the time. And this 
will address our usecase where we need strong consistency, high durability with 
reasonable availability. 

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2014-07-25 Thread Daniel Compton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075215#comment-14075215
 ] 

Daniel Compton commented on KAFKA-960:
--

The main (only?) benefit for us is fixing the issue where Mbean names are 
wrapped in quotes. It looks like there are some feature and performance 
benefits as well. What do you mean by 'client'? Is that a producer or consumer 
library?

 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Cosmin Lehene

 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075244#comment-14075244
 ] 

Jun Rao commented on KAFKA-960:
---

Yes.

 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Cosmin Lehene

 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-25 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075263#comment-14075263
 ] 

Jun Rao commented on KAFKA-1555:


So, you are suggesting to reject a message if current replicas in ISR is less 
than 2? However, immediately after a message is successfully published, some 
replicas could go down, which could bring the ISR to below 2. So, I am not sure 
if this is any better than just running things with a larger replication 
factor, say 4.

My understanding is that with dfs.replication.min in HDFS is that as you are 
writing data to HDFS, you can actually write data to replicas fewer than that 
min value. For example, suppose that you are writing 100 bytes to 3 replicas in 
HDFS with dfs.replication.min=2. If after the 100 bytes are written to first 
replica, the other 2 replicas die, HDFS will complete the write with just 1 
replica. However, in the background, HDFS will try to create new replicas to 
make sure the total # of replicas reaches the min value. This is sth that you 
can do with Kafka admin tools too. We can potentially automate this somehow.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-960) Upgrade Metrics to 3.x

2014-07-25 Thread Daniel Compton (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14075266#comment-14075266
 ] 

Daniel Compton commented on KAFKA-960:
--

Probably worth parking for now then.

 Upgrade Metrics to 3.x
 --

 Key: KAFKA-960
 URL: https://issues.apache.org/jira/browse/KAFKA-960
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.8.1
Reporter: Cosmin Lehene

 Now that metrics 3.0 has been released 
 (http://metrics.codahale.com/about/release-notes/) we can upgrade back



--
This message was sent by Atlassian JIRA
(v6.2#6252)