[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-23 Thread Anton Karamanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Karamanov updated KAFKA-1414:
---

Attachment: KAFKA-1414-rev4.patch

I've updated the patch.
* replaced {{log.io.parallelism}} with {{log.threads.per.data.dir}};
* added separate thread pools for each directory to simplify thread control;
* rebased.

[Patch v4|^KAFKA-1414-rev4.patch]

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Fix For: 0.8.2

 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
 KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-23 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071573#comment-14071573
 ] 

Anton Karamanov commented on KAFKA-1414:


[~junrao], yes, caches were dropped before each run. No compression is used.

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Fix For: 0.8.2

 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
 KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
 KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jiang Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiang Wu updated KAFKA-1555:


Description: 
In a mission critical application, we expect a kafka cluster with 3 brokers can 
satisfy two requirements:
1. When 1 broker is down, no message loss or service blocking happens.
2. In worse cases such as two brokers are down, service can be blocked, but no 
message loss happens.

We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
due to its three behaviors:
1. when choosing a new leader from 2 followers in ISR, the one with less 
messages may be chosen as the leader.
2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has 
less messages than the leader.
3. ISR can contains only 1 broker, therefore acknowledged messages may be 
stored in only 1 broker.

The following is an analytical proof. 
We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
that at the beginning, all 3 replicas, leader A, followers B and C, are in 
sync, i.e., they have the same messages and are all in ISR.
According to the value of request.required.acks (acks for short), there are the 
following cases.
1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
time, although C hasn't received m, C is still in ISR. If A is killed, C can be 
elected as the new leader, and consumers will miss m.
3. acks=-1. B and C restart and are removed from ISR. Producer sends a message 
m to A, and receives an acknowledgement. Disk failure happens in A before B and 
C replicate m. Message m is lost.

In summary, any existing configuration cannot satisfy the requirements.

  was:
In a mission critical application, we expect a kafka cluster with 3 brokers can 
satisfy two requirements:
1. When 1 broker is down, no message loss or service blocking happens.
2. In worse cases such as two brokers are down, service can be blocked, but no 
message loss happens.

We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
due to its two behaviors:
1. when choosing a new leader from 2 followers in ISR, the one with less 
messages may be chosen as the leader.
2. even when replica.lag.max.messages=0, a follower can stay in ISR when it has 
less messages than the leader.

The following is an analytical proof. 
We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
that at the beginning, all 3 replicas, leader A, followers B and C, are in 
sync, i.e., they have the same messages and are all in ISR.
According to the value of request.required.acks (acks for short), there are the 
following cases.
1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
time, although C hasn't received m, C is still in ISR. If A is killed, C can be 
elected as the new leader, and consumers will miss m.
3. acks=-1. Suppose replica.lag.max.messages=M. There are two sub-cases:
3.1 M0. Suppose C be killed. C will be out of ISR after 
replica.lag.time.max.ms. Then the producer publishes M messages to A and B. C 
restarts. C will join in ISR since it is M messages behind A and B. Before C 
replicates all messages, A is killed, and C becomes leader, then message loss 
happens.
3.2 M=0. In this case, when the producer publishes at a high speed, B and C 
will fail out of ISR; only A keeps receiving messages. Then A is killed. Either 
message loss or service blocking will happen, depending on whether unclean 
leader election is disabled (a new feature will be in 0.8.2, see jira 
KAFKA-1028).

In summary, any existing configuration cannot satisfy the requirements.


 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The 

[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector

2014-07-23 Thread Nicolae Marasoiu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071718#comment-14071718
 ] 

Nicolae Marasoiu commented on KAFKA-1282:
-

[~junrao] You agree with the approach, do you?

 Disconnect idle socket connection in Selector
 -

 Key: KAFKA-1282
 URL: https://issues.apache.org/jira/browse/KAFKA-1282
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: nicu marasoiu
  Labels: newbie++
 Fix For: 0.9.0


 To reduce # socket connections, it would be useful for the new producer to 
 close socket connections that are idle. We can introduce a new producer 
 config for the idle time.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071792#comment-14071792
 ] 

Jun Rao commented on KAFKA-1555:


In case 3.1, when C restarts, the protocol is that C can only join ISR if it 
has received all messages up to the current high watermark.

For example, let's assume that M is 10. Let's say A, B, C all have messages at 
offset 100 and all those messages are committed (therefore high watermark is at 
100). Then C dies. After that, we commit 5 more messages with both A and B 
(high watermark is at 105). Now, C is restarted. C is actually not allowed to 
rejoin ISR until its log end offset has passed 105. This means that C must 
first fetch the 5 newly committed messages before being added to ISR.

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1282) Disconnect idle socket connection in Selector

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071803#comment-14071803
 ] 

Jun Rao commented on KAFKA-1282:


Yes. Thanks for picking it up.

 Disconnect idle socket connection in Selector
 -

 Key: KAFKA-1282
 URL: https://issues.apache.org/jira/browse/KAFKA-1282
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: nicu marasoiu
  Labels: newbie++
 Fix For: 0.9.0


 To reduce # socket connections, it would be useful for the new producer to 
 close socket connections that are idle. We can introduce a new producer 
 config for the idle time.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23705: Addressing Jun's comments

2014-07-23 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23705/
---

(Updated July 23, 2014, 2:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1192
https://issues.apache.org/jira/browse/KAFKA-1192


Repository: kafka


Description
---

Support given for custom deserialization of messages and keys


Diffs (updated)
-

  core/src/main/scala/kafka/tools/DumpLogSegments.scala 
6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f 

Diff: https://reviews.apache.org/r/23705/diff/


Testing
---


Thanks,

Manikumar Reddy O



[jira] [Commented] (KAFKA-1192) Enable DumpLogSegments tool to deserialize messages

2014-07-23 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071812#comment-14071812
 ] 

Manikumar Reddy commented on KAFKA-1192:


Updated reviewboard https://reviews.apache.org/r/23705/diff/
 against branch origin/trunk

 Enable DumpLogSegments tool to deserialize messages
 ---

 Key: KAFKA-1192
 URL: https://issues.apache.org/jira/browse/KAFKA-1192
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Guozhang Wang
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1192_2014-07-21_20:44:08.patch, 
 KAFKA-1192_2014-07-23_20:23:08.patch


 Currently the DumpLogSegments tool reads the message payloads as strings by 
 default, which will not display the messages correctly if the messages are 
 deserialized with another class. By enablding deserialization with a 
 customized class we can use this tool to debug more issues where I need to 
 read the message content.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1192) Enable DumpLogSegments tool to deserialize messages

2014-07-23 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy updated KAFKA-1192:
---

Attachment: KAFKA-1192_2014-07-23_20:23:08.patch

 Enable DumpLogSegments tool to deserialize messages
 ---

 Key: KAFKA-1192
 URL: https://issues.apache.org/jira/browse/KAFKA-1192
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Guozhang Wang
Assignee: Guozhang Wang
  Labels: newbie
 Attachments: KAFKA-1192_2014-07-21_20:44:08.patch, 
 KAFKA-1192_2014-07-23_20:23:08.patch


 Currently the DumpLogSegments tool reads the message payloads as strings by 
 default, which will not display the messages correctly if the messages are 
 deserialized with another class. By enablding deserialization with a 
 customized class we can use this tool to debug more issues where I need to 
 read the message content.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23705: Addressing Jun's comments

2014-07-23 Thread Manikumar Reddy O


 On July 21, 2014, 6:16 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/DumpLogSegments.scala, line 165
  https://reviews.apache.org/r/23705/diff/3/?file=636778#file636778line165
 
  Would key also be null possibly?
 
 Manikumar Reddy O wrote:
 This line is surrounded by if(msg.hasKey) check. 
 msg.hasKey returns true for non-zero length keys. So we don't need any 
 additional null check.

 
 Guozhang Wang wrote:
 I see. Instead of not printing anything if key is null, could you still 
 print  key: null to be aligned with other keyed messages. This will also 
 make text editing for analysis like grep/awk easy.

We need to use if(msg.hasKey) to differentiate keyed messages with non-keyed 
messages.

Key will be null/zero length for all non-keyed messages. So it is not 
appropriate to print  key: null  for all non-keyed messages.

If msg.hasKey true means it is a keyed-message, So it key will not be null. 

Are you saying that there can be a message with non-zero key length but with 
null reference? 


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23705/#review48265
---


On July 23, 2014, 2:55 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23705/
 ---
 
 (Updated July 23, 2014, 2:55 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1192
 https://issues.apache.org/jira/browse/KAFKA-1192
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Support given for custom deserialization of messages and keys
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/DumpLogSegments.scala 
 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f 
 
 Diff: https://reviews.apache.org/r/23705/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071886#comment-14071886
 ] 

Robert Withers commented on KAFKA-1555:
---

In military satellite communications, when the statically assigned bandwidth is 
exceeded, a dynamic block is available from which to grab some leased 
bandwidth.  If we apply this idea to data production into Kafka, though 
different due to data replication, could we have replicas to the replicas: 
replica shadows?

Say we have 10 brokers with replication 2.  So partition 1 has a leader on 
broker 1 and a follower in ISR on both broker 2 and 3.  If we have replica 
shadows on brokers 4, 5 and 6 not in ISR but receiving msg production 
opportunistically, then we could have the option to dynamically assign a new 
follower into ISR if an ISR follower fails.

- Rob



 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23705: Addressing Jun's comments

2014-07-23 Thread Guozhang Wang


 On July 21, 2014, 6:16 p.m., Guozhang Wang wrote:
  core/src/main/scala/kafka/tools/DumpLogSegments.scala, line 165
  https://reviews.apache.org/r/23705/diff/3/?file=636778#file636778line165
 
  Would key also be null possibly?
 
 Manikumar Reddy O wrote:
 This line is surrounded by if(msg.hasKey) check. 
 msg.hasKey returns true for non-zero length keys. So we don't need any 
 additional null check.

 
 Guozhang Wang wrote:
 I see. Instead of not printing anything if key is null, could you still 
 print  key: null to be aligned with other keyed messages. This will also 
 make text editing for analysis like grep/awk easy.
 
 Manikumar Reddy O wrote:
 We need to use if(msg.hasKey) to differentiate keyed messages with 
 non-keyed messages.
 
 Key will be null/zero length for all non-keyed messages. So it is not 
 appropriate to print  key: null  for all non-keyed messages.
 
 If msg.hasKey true means it is a keyed-message, So it key will not be 
 null. 
 
 Are you saying that there can be a message with non-zero key length but 
 with null reference? 
 
 
 
 
 


Any keyed messages' key should be not null, what I originally was suggesting is 
that we can still print key: null for non-keyed messages.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23705/#review48265
---


On July 23, 2014, 2:55 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23705/
 ---
 
 (Updated July 23, 2014, 2:55 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1192
 https://issues.apache.org/jira/browse/KAFKA-1192
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Support given for custom deserialization of messages and keys
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/tools/DumpLogSegments.scala 
 6daf87b25a48a51aafb7dbe8d0c0371e0ea7501f 
 
 Diff: https://reviews.apache.org/r/23705/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

2014-07-23 Thread Simon Cooper (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071919#comment-14071919
 ] 

Simon Cooper commented on KAFKA-1150:
-

Yes, looks like that patch fixes the issue.

 Fetch on a replicated topic does not return as soon as possible
 ---

 Key: KAFKA-1150
 URL: https://issues.apache.org/jira/browse/KAFKA-1150
 Project: Kafka
  Issue Type: Bug
  Components: core, replication
Affects Versions: 0.8.0
Reporter: Andrey Balmin
Assignee: Neha Narkhede
 Attachments: Test.java


 I see a huge performance difference between replicated and not replicated 
 topics. On my laptop, running two brokers, I see producer-2-consumer latency 
 of under 1ms for topics with one replica. 
 However,  with two replicas the same latency equals to the max fetch delay. 
 Here is a simple test I just did:
 one producer thread in a loop sending one message and sleeping for 2500ms, 
 and one consumer thread looping on the long poll with max fetch delay of 1000 
 ms.
 Here is what happens with no replication:
 Produced 1 key: key1 at time: 15:33:52.822
 Consumed up to 1 at time: 15:33:52.822
 Consumed up to 1 at time: 15:33:53.823
 Consumed up to 1 at time: 15:33:54.825
 Produced 2 key: key2 at time: 15:33:55.324
 Consumed up to 2 at time: 15:33:55.324
 Consumed up to 2 at time: 15:33:56.326
 Consumed up to 2 at time: 15:33:57.328
 Produced 3 key: key3 at time: 15:33:57.827
 Consumed up to 3 at time: 15:33:57.827
 The are no delays between the message being produced and consumed -- this is 
 the behavior I expected. 
 Here is the same test, but for a topic with two replicas:
 Consumed up to 0 at time: 15:50:29.575
 Produced 1 key: key1 at time: 15:50:29.575
 Consumed up to 1 at time: 15:50:30.577
 Consumed up to 1 at time: 15:50:31.579
 Consumed up to 1 at time: 15:50:32.078
 Produced 2 key: key2 at time: 15:50:32.078
 Consumed up to 2 at time: 15:50:33.081
 Consumed up to 2 at time: 15:50:34.081
 Consumed up to 2 at time: 15:50:34.581
 Produced 3 key: key3 at time: 15:50:34.581
 Consumed up to 3 at time: 15:50:35.584
 Notice how the fetch always returns as soon as the produce request is issued, 
 but without the new message, which consistently arrives ~1002 ms later.
 Below is the request log snippet for this part:
 Produced 2 key: key2 at time: 15:50:32.078
 Consumed up to 2 at time: 15:50:33.081
 You can see the first FetchRequest returns at the same time as the replica 
 FetchRequest, but this fetch response is *empty* -- the message is not 
 committed yet, so it cannot be returned. The message is committed at 
 15:50:32,079. However, the next FetchRequest (that does return the message) 
 comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it 
 waiting for the full 1000 ms, instead of returning right away?
 [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: 
 ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 
 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 
 (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
 Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; 
 MinBytes: 1 bytes; RequestInfo: [test_topic,0] - 
 PartitionFetchInfo(129,1024000) from client 
 /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
 Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; 
 ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] 
 - PartitionFetchInfo(129,1048576) from client 
 /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: 
 FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; 
 MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - 
 PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; 
 Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 
 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client 
 /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: 
 FetchRequest; Version: 0; CorrelationId: 3464; ClientId: 
 ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
 RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576) 
 (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,581] TRACE 

Review Request 23858: Patch for KAFKA-1544

2014-07-23 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23858/
---

Review request for kafka.


Bugs: KAFKA-1544
https://issues.apache.org/jira/browse/KAFKA-1544


Repository: kafka


Description
---

Implemented Jun's solution to decrease LogCleaner shutdown time during shutdown


Diffs
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
2faa196a4dc612bc634d5ff5f5f275d09073f13b 

Diff: https://reviews.apache.org/r/23858/diff/


Testing
---


Thanks,

Manikumar Reddy O



[jira] [Commented] (KAFKA-1544) LogCleaner may take a long time to shutdown

2014-07-23 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071937#comment-14071937
 ] 

Manikumar Reddy commented on KAFKA-1544:


Created reviewboard https://reviews.apache.org/r/23858/diff/
 against branch origin/trunk

 LogCleaner may take a long time to shutdown
 ---

 Key: KAFKA-1544
 URL: https://issues.apache.org/jira/browse/KAFKA-1544
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1544.patch


 We have the following code in LogCleaner. Since the cleaner thread is 
 shutdown w/o interrupt. If may take up to backoff time for the cleaner thread 
 to detect the shutdown flag.
 private def cleanOrSleep() {
   cleanerManager.grabFilthiestLog() match {
 case None =
   // there are no cleanable logs, sleep a while
   time.sleep(config.backOffMs)
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1544) LogCleaner may take a long time to shutdown

2014-07-23 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy updated KAFKA-1544:
---

Attachment: KAFKA-1544.patch

 LogCleaner may take a long time to shutdown
 ---

 Key: KAFKA-1544
 URL: https://issues.apache.org/jira/browse/KAFKA-1544
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie
 Attachments: KAFKA-1544.patch


 We have the following code in LogCleaner. Since the cleaner thread is 
 shutdown w/o interrupt. If may take up to backoff time for the cleaner thread 
 to detect the shutdown flag.
 private def cleanOrSleep() {
   cleanerManager.grabFilthiestLog() match {
 case None =
   // there are no cleanable logs, sleep a while
   time.sleep(config.backOffMs)
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

2014-07-23 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14071949#comment-14071949
 ] 

Guozhang Wang commented on KAFKA-1150:
--

Hi Simon, just wondering did you apply the patch and re-do the test ?

 Fetch on a replicated topic does not return as soon as possible
 ---

 Key: KAFKA-1150
 URL: https://issues.apache.org/jira/browse/KAFKA-1150
 Project: Kafka
  Issue Type: Bug
  Components: core, replication
Affects Versions: 0.8.0
Reporter: Andrey Balmin
Assignee: Neha Narkhede
 Attachments: Test.java


 I see a huge performance difference between replicated and not replicated 
 topics. On my laptop, running two brokers, I see producer-2-consumer latency 
 of under 1ms for topics with one replica. 
 However,  with two replicas the same latency equals to the max fetch delay. 
 Here is a simple test I just did:
 one producer thread in a loop sending one message and sleeping for 2500ms, 
 and one consumer thread looping on the long poll with max fetch delay of 1000 
 ms.
 Here is what happens with no replication:
 Produced 1 key: key1 at time: 15:33:52.822
 Consumed up to 1 at time: 15:33:52.822
 Consumed up to 1 at time: 15:33:53.823
 Consumed up to 1 at time: 15:33:54.825
 Produced 2 key: key2 at time: 15:33:55.324
 Consumed up to 2 at time: 15:33:55.324
 Consumed up to 2 at time: 15:33:56.326
 Consumed up to 2 at time: 15:33:57.328
 Produced 3 key: key3 at time: 15:33:57.827
 Consumed up to 3 at time: 15:33:57.827
 The are no delays between the message being produced and consumed -- this is 
 the behavior I expected. 
 Here is the same test, but for a topic with two replicas:
 Consumed up to 0 at time: 15:50:29.575
 Produced 1 key: key1 at time: 15:50:29.575
 Consumed up to 1 at time: 15:50:30.577
 Consumed up to 1 at time: 15:50:31.579
 Consumed up to 1 at time: 15:50:32.078
 Produced 2 key: key2 at time: 15:50:32.078
 Consumed up to 2 at time: 15:50:33.081
 Consumed up to 2 at time: 15:50:34.081
 Consumed up to 2 at time: 15:50:34.581
 Produced 3 key: key3 at time: 15:50:34.581
 Consumed up to 3 at time: 15:50:35.584
 Notice how the fetch always returns as soon as the produce request is issued, 
 but without the new message, which consistently arrives ~1002 ms later.
 Below is the request log snippet for this part:
 Produced 2 key: key2 at time: 15:50:32.078
 Consumed up to 2 at time: 15:50:33.081
 You can see the first FetchRequest returns at the same time as the replica 
 FetchRequest, but this fetch response is *empty* -- the message is not 
 committed yet, so it cannot be returned. The message is committed at 
 15:50:32,079. However, the next FetchRequest (that does return the message) 
 comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it 
 waiting for the full 1000 ms, instead of returning right away?
 [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: 
 ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 
 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] - 2078 
 (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
 Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; 
 MinBytes: 1 bytes; RequestInfo: [test_topic,0] - 
 PartitionFetchInfo(129,1024000) from client 
 /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
 Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; 
 ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] 
 - PartitionFetchInfo(129,1048576) from client 
 /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: 
 FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; 
 MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] - 
 PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
 [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; 
 Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 
 20 ms; TopicAndPartition: [test_topic,0] - 2078 from client 
 /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
  (kafka.request.logger)
 [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: 
 FetchRequest; Version: 0; CorrelationId: 3464; ClientId: 
 ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
 RequestInfo: [test_topic,0] - PartitionFetchInfo(130,1048576) 
 (kafka.network.RequestChannel$)
 

Re: Review Request 23858: Patch for KAFKA-1544

2014-07-23 Thread Jay Kreps

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23858/#review48532
---


Is this better than using a condition? I was imagining replacing sleep with 
shutdownCondition.await(backoffMs, TimeUnit.MILLISECONDS) and then in 
shutdown() we call shutdownCondition.signal().

- Jay Kreps


On July 23, 2014, 4:44 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23858/
 ---
 
 (Updated July 23, 2014, 4:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1544
 https://issues.apache.org/jira/browse/KAFKA-1544
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Implemented Jun's solution to decrease LogCleaner shutdown time during 
 shutdown
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/LogCleaner.scala 
 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
 
 Diff: https://reviews.apache.org/r/23858/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




[jira] [Assigned] (KAFKA-1544) LogCleaner may take a long time to shutdown

2014-07-23 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy reassigned KAFKA-1544:
--

Assignee: Manikumar Reddy

 LogCleaner may take a long time to shutdown
 ---

 Key: KAFKA-1544
 URL: https://issues.apache.org/jira/browse/KAFKA-1544
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Manikumar Reddy
  Labels: newbie
 Attachments: KAFKA-1544.patch


 We have the following code in LogCleaner. Since the cleaner thread is 
 shutdown w/o interrupt. If may take up to backoff time for the cleaner thread 
 to detect the shutdown flag.
 private def cleanOrSleep() {
   cleanerManager.grabFilthiestLog() match {
 case None =
   // there are no cleanable logs, sleep a while
   time.sleep(config.backOffMs)
  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23858: Patch for KAFKA-1544

2014-07-23 Thread Jay Kreps


 On July 23, 2014, 4:56 p.m., Jay Kreps wrote:
  Is this better than using a condition? I was imagining replacing sleep with 
  shutdownCondition.await(backoffMs, TimeUnit.MILLISECONDS) and then in 
  shutdown() we call shutdownCondition.signal().

Actually a condition isn't quite right as there is a subtle race between signal 
and await. Better would be the same approach but using 
CountDownLatch.await(backoff, TimeUnit.MS)


- Jay


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23858/#review48532
---


On July 23, 2014, 4:44 p.m., Manikumar Reddy O wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23858/
 ---
 
 (Updated July 23, 2014, 4:44 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1544
 https://issues.apache.org/jira/browse/KAFKA-1544
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Implemented Jun's solution to decrease LogCleaner shutdown time during 
 shutdown
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/log/LogCleaner.scala 
 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
 
 Diff: https://reviews.apache.org/r/23858/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Manikumar Reddy O
 




question about message sets and produce requests/responses

2014-07-23 Thread Dave Peterson
Hello,

I have a question about produce requests and responses.
Suppose I create a produce request consisting of a single topic T
which contains two message sets S1 and S2, with S1 preceding
S2.  In other words, the request looks like this:

( request header ( T ( S1, S2 ) ) )

Is it possible that I may get a response in which the order of
the ACKs for the individual message sets is reversed?  In other
words, a response that looks like this:

( response header ( T ( ack for S2, ack for S1 ) ) )

I know that if a request contains multiple topics, then the topics
may arrive out of order in the response.  So I was wondering if
ACKs for individual message sets within a topic can likewise
arrive out of order in the response.

Thanks,
Dave


[jira] [Updated] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka

2014-07-23 Thread Joel Koshy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joel Koshy updated KAFKA-1510:
--

Summary: Force offset commits when migrating consumer offsets from 
zookeeper to kafka  (was: Force offset commits at a minimum interval when 
migrating consumer offsets from zookeeper to kafka)

 Force offset commits when migrating consumer offsets from zookeeper to kafka
 

 Key: KAFKA-1510
 URL: https://issues.apache.org/jira/browse/KAFKA-1510
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: nicu marasoiu
  Labels: newbie
 Fix For: 0.8.2


 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on 
 dual-commit (i.e., the consumers will commit offsets to both zookeeper and 
 kafka) in addition to setting offsets.storage to kafka. However, when we 
 commit offsets we only commit offsets if they have changed (since the last 
 commit). For low-volume topics or for topics that receive data in bursts 
 offsets may not move for a long period of time. Therefore we may want to 
 force the commit (even if offsets have not changed) when migrating (i.e., 
 when dual-commit is enabled) - we can add a minimum interval threshold (say 
 force commit after every 10 auto-commits) as well as on rebalance and 
 shutdown.
 Also, I think it is safe to switch the default for offsets.storage from 
 zookeeper to kafka and set the default to dual-commit (for people who have 
 not migrated yet). We have deployed this to the largest consumers at linkedin 
 and have not seen any issues so far (except for the migration caveat that 
 this jira will resolve).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka

2014-07-23 Thread Joel Koshy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072437#comment-14072437
 ] 

Joel Koshy commented on KAFKA-1510:
---

[~nmarasoiu] do you think you will be able to take this on in the next couple 
days?

 Force offset commits when migrating consumer offsets from zookeeper to kafka
 

 Key: KAFKA-1510
 URL: https://issues.apache.org/jira/browse/KAFKA-1510
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
Reporter: Joel Koshy
Assignee: nicu marasoiu
  Labels: newbie
 Fix For: 0.8.2


 When migrating consumer offsets from ZooKeeper to kafka, we have to turn on 
 dual-commit (i.e., the consumers will commit offsets to both zookeeper and 
 kafka) in addition to setting offsets.storage to kafka. However, when we 
 commit offsets we only commit offsets if they have changed (since the last 
 commit). For low-volume topics or for topics that receive data in bursts 
 offsets may not move for a long period of time. Therefore we may want to 
 force the commit (even if offsets have not changed) when migrating (i.e., 
 when dual-commit is enabled) - we can add a minimum interval threshold (say 
 force commit after every 10 auto-commits) as well as on rebalance and 
 shutdown.
 Also, I think it is safe to switch the default for offsets.storage from 
 zookeeper to kafka and set the default to dual-commit (for people who have 
 not migrated yet). We have deployed this to the largest consumers at linkedin 
 and have not seen any issues so far (except for the migration caveat that 
 this jira will resolve).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23655: Patch for KAFKA-687

2014-07-23 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23655/#review48454
---



core/src/main/scala/kafka/consumer/ConsumerConfig.scala
https://reviews.apache.org/r/23655/#comment85261

Will do.



core/src/main/scala/kafka/consumer/PartitionAllocator.scala
https://reviews.apache.org/r/23655/#comment85262

Will fix.



core/src/main/scala/kafka/consumer/PartitionAllocator.scala
https://reviews.apache.org/r/23655/#comment85265

After thinking about it and discussing off-thread, we are thinking of:

def allocate(partitions, consumerIdsPerTopic)

At the same time, this code is not particularly complex so reusability is 
nice-have - worst case we can just adapt it to use in the new consumer.




core/src/main/scala/kafka/consumer/PartitionAllocator.scala
https://reviews.apache.org/r/23655/#comment85268

Thanks for catching that - will fix.



core/src/main/scala/kafka/consumer/PartitionAllocator.scala
https://reviews.apache.org/r/23655/#comment85269

After thinking about it I think we can actually merge the two and get rid 
of symmetric. So we will just have range and roundrobin (or maybe name it 
uniform). Will see how that turns out after I update the patch.



core/src/main/scala/kafka/consumer/PartitionAllocator.scala
https://reviews.apache.org/r/23655/#comment85074

This uses the  operator defined in StringOps - so it probably translates 
to compareTo underneath the hood. That said, I can change it to compareTo just 
to be sure.



core/src/main/scala/kafka/consumer/PartitionAllocator.scala
https://reviews.apache.org/r/23655/#comment85075

Yeah I didn't bother with describing the old allocator as it was 
copied/moved from the consumer connector - I will add brief comments.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/23655/#comment85271

Will do.



core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
https://reviews.apache.org/r/23655/#comment85077

I think it is very useful. While the offset checker is useful, it can take 
a while to run if you want to (say) get a count of all partitions owned by the 
consumer. An mbean on the other hand helps continuously monitor how even your 
partitions are distributed across all your consumers.



core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala
https://reviews.apache.org/r/23655/#comment85272

Will add it.


- Joel Koshy


On July 18, 2014, 10:57 p.m., Joel Koshy wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23655/
 ---
 
 (Updated July 18, 2014, 10:57 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-687
 https://issues.apache.org/jira/browse/KAFKA-687
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The updated diff contains the mbeans for ownership counts.
 The comments in the code and the summary are pretty self-explanatory.
 
 Things to think about:
 * Naming - do symmetric/range/roundrobin make sense?
 * The comments briefly summarize why we needed a separate symmetric mode but 
 let me know if that is unclear.
 * Rebalance time will be slightly higher - I have not measured (will do that)
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
 1cf2f62ba02e4aa66bfa7575865e5d57baf82212 
   core/src/main/scala/kafka/consumer/PartitionAllocator.scala PRE-CREATION 
   core/src/main/scala/kafka/consumer/TopicCount.scala 
 c79311097c5bd6718cb6a7fc403f804a1a939353 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 65f518d47c7555c42c4bff39c211814831f4b8b6 
   core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala 
 a20ab90165cc7ebb1cf44078efe23a53938c8df6 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/consumer/PartitionAllocatorTest.scala 
 PRE-CREATION 
 
 Diff: https://reviews.apache.org/r/23655/diff/
 
 
 Testing
 ---
 
 * I did the unit tests (including the new one) as well as mirror maker system 
 test suite with roundrobin. While this is being reviewed I will run the 
 system tests with symmetric
 
 
 Thanks,
 
 Joel Koshy
 




Re: question about message sets and produce requests/responses

2014-07-23 Thread Joel Koshy
Not sure if I followed the question correctly, but S1 and S2 would be
message-sets to different partitions of T - the producer request
object contains a map of topic-partition - messageset). So each
partition would get its own ack. The response object is just a map of
partition - response status so the order of acks in the response is
the response status map's order.

Joel

On Wed, Jul 23, 2014 at 02:09:40PM -0700, Dave Peterson wrote:
 Hello,
 
 I have a question about produce requests and responses.
 Suppose I create a produce request consisting of a single topic T
 which contains two message sets S1 and S2, with S1 preceding
 S2.  In other words, the request looks like this:
 
 ( request header ( T ( S1, S2 ) ) )
 
 Is it possible that I may get a response in which the order of
 the ACKs for the individual message sets is reversed?  In other
 words, a response that looks like this:
 
 ( response header ( T ( ack for S2, ack for S1 ) ) )
 
 I know that if a request contains multiple topics, then the topics
 may arrive out of order in the response.  So I was wondering if
 ACKs for individual message sets within a topic can likewise
 arrive out of order in the response.
 
 Thanks,
 Dave



Re: question about message sets and produce requests/responses

2014-07-23 Thread Dave Peterson
Ok, that's all I need to know.  Thanks!


On Wed, Jul 23, 2014 at 4:33 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Not sure if I followed the question correctly, but S1 and S2 would be
 message-sets to different partitions of T - the producer request
 object contains a map of topic-partition - messageset). So each
 partition would get its own ack. The response object is just a map of
 partition - response status so the order of acks in the response is
 the response status map's order.

 Joel

 On Wed, Jul 23, 2014 at 02:09:40PM -0700, Dave Peterson wrote:
  Hello,
 
  I have a question about produce requests and responses.
  Suppose I create a produce request consisting of a single topic T
  which contains two message sets S1 and S2, with S1 preceding
  S2.  In other words, the request looks like this:
 
  ( request header ( T ( S1, S2 ) ) )
 
  Is it possible that I may get a response in which the order of
  the ACKs for the individual message sets is reversed?  In other
  words, a response that looks like this:
 
  ( response header ( T ( ack for S2, ack for S1 ) ) )
 
  I know that if a request contains multiple topics, then the topics
  may arrive out of order in the response.  So I was wondering if
  ACKs for individual message sets within a topic can likewise
  arrive out of order in the response.
 
  Thanks,
  Dave




Re: question about message sets and produce requests/responses

2014-07-23 Thread Joel Koshy
Also, to be clear, by ack I mean an error code as described here:
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceResponse

On Wed, Jul 23, 2014 at 04:33:29PM -0700, Joel Koshy wrote:
 Not sure if I followed the question correctly, but S1 and S2 would be
 message-sets to different partitions of T - the producer request
 object contains a map of topic-partition - messageset). So each
 partition would get its own ack. The response object is just a map of
 partition - response status so the order of acks in the response is
 the response status map's order.
 
 Joel
 
 On Wed, Jul 23, 2014 at 02:09:40PM -0700, Dave Peterson wrote:
  Hello,
  
  I have a question about produce requests and responses.
  Suppose I create a produce request consisting of a single topic T
  which contains two message sets S1 and S2, with S1 preceding
  S2.  In other words, the request looks like this:
  
  ( request header ( T ( S1, S2 ) ) )
  
  Is it possible that I may get a response in which the order of
  the ACKs for the individual message sets is reversed?  In other
  words, a response that looks like this:
  
  ( response header ( T ( ack for S2, ack for S1 ) ) )
  
  I know that if a request contains multiple topics, then the topics
  may arrive out of order in the response.  So I was wondering if
  ACKs for individual message sets within a topic can likewise
  arrive out of order in the response.
  
  Thanks,
  Dave
 



Re: Review Request 23339: Patch for KAFKA-1507

2014-07-23 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23339/
---

(Updated July 24, 2014, 12:07 a.m.)


Review request for kafka.


Bugs: KAFKA-1507
https://issues.apache.org/jira/browse/KAFKA-1507


Repository: kafka


Description (updated)
---

KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic 
unintentionally. Changes as per Jun's suggestions.


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
d8f9ce663ee24d2b0852c974136741280c39f8f8 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 
4aa5b01d611631db72df47d50bbe30edb8c478db 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b22ca1dce65f665d84c2a980fd82f816e93d9960 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
7dca09ce637a40e125de05703dc42e8b611971ac 
  core/src/main/scala/kafka/client/ClientUtils.scala 
ce7ede3f6d60e756e252257bd8c6fedc21f21e1c 
  core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
  core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
b0b7be14d494ae8c87f4443b52db69d273c20316 
  core/src/main/scala/kafka/server/KafkaApis.scala 
fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/tools/GetOffsetShell.scala 
9c6064e201eebbcd5b276a0dedd02937439edc94 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
af4783646803e58714770c21f8c3352370f26854 
  core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
36314f412a8281aece2789fd2b74a106b82c57d2 
  core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
1bf2667f47853585bc33ffb3e28256ec5f24ae84 
  core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
35dc071b1056e775326981573c9618d8046e601d 

Diff: https://reviews.apache.org/r/23339/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Updated] (KAFKA-1507) Using GetOffsetShell against non-existent topic creates the topic unintentionally

2014-07-23 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1507:
--

Attachment: KAFKA-1507_2014-07-23_17:07:20.patch

 Using GetOffsetShell against non-existent topic creates the topic 
 unintentionally
 -

 Key: KAFKA-1507
 URL: https://issues.apache.org/jira/browse/KAFKA-1507
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: centos
Reporter: Luke Forehand
Assignee: Sriharsha Chintalapani
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-1507.patch, KAFKA-1507_2014-07-22_10:27:45.patch, 
 KAFKA-1507_2014-07-23_17:07:20.patch


 A typo in using GetOffsetShell command can cause a
 topic to be created which cannot be deleted (because deletion is still in
 progress)
 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
 kafka10:9092,kafka11:9092,kafka12:9092,kafka13:9092 --topic typo --time 1
 ./kafka-topics.sh --zookeeper stormqa1/kafka-prod --describe --topic typo
 Topic:typo  PartitionCount:8ReplicationFactor:1 Configs:
  Topic: typo Partition: 0Leader: 10  Replicas: 10
   Isr: 10
 ...



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1507) Using GetOffsetShell against non-existent topic creates the topic unintentionally

2014-07-23 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072603#comment-14072603
 ] 

Sriharsha Chintalapani commented on KAFKA-1507:
---

Updated reviewboard https://reviews.apache.org/r/23339/diff/
 against branch origin/trunk

 Using GetOffsetShell against non-existent topic creates the topic 
 unintentionally
 -

 Key: KAFKA-1507
 URL: https://issues.apache.org/jira/browse/KAFKA-1507
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: centos
Reporter: Luke Forehand
Assignee: Sriharsha Chintalapani
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-1507.patch, KAFKA-1507_2014-07-22_10:27:45.patch, 
 KAFKA-1507_2014-07-23_17:07:20.patch


 A typo in using GetOffsetShell command can cause a
 topic to be created which cannot be deleted (because deletion is still in
 progress)
 ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
 kafka10:9092,kafka11:9092,kafka12:9092,kafka13:9092 --topic typo --time 1
 ./kafka-topics.sh --zookeeper stormqa1/kafka-prod --describe --topic typo
 Topic:typo  PartitionCount:8ReplicationFactor:1 Configs:
  Topic: typo Partition: 0Leader: 10  Replicas: 10
   Isr: 10
 ...



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1192) Enable DumpLogSegments tool to deserialize messages

2014-07-23 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1192.


   Resolution: Fixed
Fix Version/s: 0.8.2
 Assignee: Manikumar Reddy  (was: Guozhang Wang)

Thanks for the patch. +1. Committed to trunk with the following minor change.

* change the property name decoder-class to value-decoder-class

 Enable DumpLogSegments tool to deserialize messages
 ---

 Key: KAFKA-1192
 URL: https://issues.apache.org/jira/browse/KAFKA-1192
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Guozhang Wang
Assignee: Manikumar Reddy
  Labels: newbie
 Fix For: 0.8.2

 Attachments: KAFKA-1192_2014-07-21_20:44:08.patch, 
 KAFKA-1192_2014-07-23_20:23:08.patch


 Currently the DumpLogSegments tool reads the message payloads as strings by 
 default, which will not display the messages correctly if the messages are 
 deserialized with another class. By enablding deserialization with a 
 customized class we can use this tool to debug more issues where I need to 
 read the message content.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23339: Patch for KAFKA-1507

2014-07-23 Thread Sriharsha Chintalapani


 On July 22, 2014, 6:52 p.m., Jun Rao wrote:
  core/src/main/scala/kafka/client/ClientUtils.scala, lines 86-87
  https://reviews.apache.org/r/23339/diff/2/?file=639404#file639404line86
 
  Perhaps this method can be named better since it may not just be for 
  the consumer.

Changed it to fetchTopicMetadataForNonProducer please let me know if you have a 
suggestion.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23339/#review48397
---


On July 24, 2014, 12:07 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23339/
 ---
 
 (Updated July 24, 2014, 12:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1507
 https://issues.apache.org/jira/browse/KAFKA-1507
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic 
 unintentionally. Changes as per Jun's suggestions.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 d8f9ce663ee24d2b0852c974136741280c39f8f8 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
  4aa5b01d611631db72df47d50bbe30edb8c478db 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e 
   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
 b22ca1dce65f665d84c2a980fd82f816e93d9960 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
 7dca09ce637a40e125de05703dc42e8b611971ac 
   core/src/main/scala/kafka/client/ClientUtils.scala 
 ce7ede3f6d60e756e252257bd8c6fedc21f21e1c 
   core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
 b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
 b0b7be14d494ae8c87f4443b52db69d273c20316 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
   core/src/main/scala/kafka/tools/GetOffsetShell.scala 
 9c6064e201eebbcd5b276a0dedd02937439edc94 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
 36314f412a8281aece2789fd2b74a106b82c57d2 
   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
 1bf2667f47853585bc33ffb3e28256ec5f24ae84 
   core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
 35dc071b1056e775326981573c9618d8046e601d 
 
 Diff: https://reviews.apache.org/r/23339/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 23339: Patch for KAFKA-1507

2014-07-23 Thread Sriharsha Chintalapani


 On July 22, 2014, 6:52 p.m., Jun Rao wrote:
  Thanks for the patch and doing the rebasing. Some comments below.

Thanks for the review. I've noticed NetworkClient.java always uses the 
latestVersion of an api. There doesn't look like the way I can send specific 
version of a request since the nextRequestHeader doesn't accept a version. Even 
if I am creating a version 0 of a request the requestheader will go with latest 
version of that particular request. Is there any reason to keep version as part 
of request header rather than inside of a request structure and we expect 
NetworkClient to use the latest version?.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23339/#review48397
---


On July 24, 2014, 12:07 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23339/
 ---
 
 (Updated July 24, 2014, 12:07 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1507
 https://issues.apache.org/jira/browse/KAFKA-1507
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1507. Using GetOffsetShell against non-existent topic creates the topic 
 unintentionally. Changes as per Jun's suggestions.
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
 d8f9ce663ee24d2b0852c974136741280c39f8f8 
   
 clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
  4aa5b01d611631db72df47d50bbe30edb8c478db 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 444e69e7c95d5ffad19896fff0ab15cb4f5c9b4e 
   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
 b22ca1dce65f665d84c2a980fd82f816e93d9960 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/TopicMetadataRequest.scala 
 7dca09ce637a40e125de05703dc42e8b611971ac 
   core/src/main/scala/kafka/client/ClientUtils.scala 
 ce7ede3f6d60e756e252257bd8c6fedc21f21e1c 
   core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
 b9e2bea7b442a19bcebd1b350d39541a8c9dd068 
   core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala 
 b0b7be14d494ae8c87f4443b52db69d273c20316 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
   core/src/main/scala/kafka/tools/GetOffsetShell.scala 
 9c6064e201eebbcd5b276a0dedd02937439edc94 
   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
 af4783646803e58714770c21f8c3352370f26854 
   core/src/main/scala/kafka/tools/SimpleConsumerShell.scala 
 36314f412a8281aece2789fd2b74a106b82c57d2 
   core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala 
 1bf2667f47853585bc33ffb3e28256ec5f24ae84 
   core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala 
 35dc071b1056e775326981573c9618d8046e601d 
 
 Diff: https://reviews.apache.org/r/23339/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Updated] (KAFKA-1549) dead brokers coming in the TopicMetadataResponse

2014-07-23 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1549:
-

Assignee: nicu marasoiu  (was: Jun Rao)

 dead brokers coming in the TopicMetadataResponse
 

 Key: KAFKA-1549
 URL: https://issues.apache.org/jira/browse/KAFKA-1549
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2
 Environment: trunk
Reporter: nicu marasoiu
Assignee: nicu marasoiu
 Attachments: 
 kafka-1549__only_last_seen_alive_brokers_to_be_responded_part_of_the_topic_metadata_refres.patch,
  
 kafka-1549__without_implicit__only_last_seen_alive_brokers_to_be_responded_part_of_the_top.patch


 JunRao confirming my observation that brokers are only added to the 
 metadataCache, never removed: The way that we update liveBrokers in 
 MetadataCache.updateCache() doesn't seem right. We only add newly received 
 live brokers to the list. However, there could be existing brokers in that 
 list that are now dead. Those dead brokers shouldn't be returned to the 
 clients. We should probably just take the new live broker list and cache it.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Build failed in Jenkins: Kafka-trunk #231

2014-07-23 Thread Apache Jenkins Server
See https://builds.apache.org/job/Kafka-trunk/231/changes

Changes:

[junrao] kafka-1192; Enable DumpLogSegments tool to deserialize messages; 
patched by Manikumar Reddy; reviewed by Guozhang Wang and Jun Rao

--
[...truncated 534 lines...]
org.apache.kafka.common.record.RecordTest  testChecksum[65] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[65] PASSED

org.apache.kafka.common.record.RecordTest  testFields[66] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[66] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[66] PASSED

org.apache.kafka.common.record.RecordTest  testFields[67] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[67] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[67] PASSED

org.apache.kafka.common.record.RecordTest  testFields[68] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[68] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[68] PASSED

org.apache.kafka.common.record.RecordTest  testFields[69] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[69] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[69] PASSED

org.apache.kafka.common.record.RecordTest  testFields[70] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[70] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[70] PASSED

org.apache.kafka.common.record.RecordTest  testFields[71] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[71] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[71] PASSED

org.apache.kafka.common.record.RecordTest  testFields[72] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[72] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[72] PASSED

org.apache.kafka.common.record.RecordTest  testFields[73] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[73] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[73] PASSED

org.apache.kafka.common.record.RecordTest  testFields[74] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[74] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[74] PASSED

org.apache.kafka.common.record.RecordTest  testFields[75] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[75] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[75] PASSED

org.apache.kafka.common.record.RecordTest  testFields[76] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[76] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[76] PASSED

org.apache.kafka.common.record.RecordTest  testFields[77] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[77] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[77] PASSED

org.apache.kafka.common.record.RecordTest  testFields[78] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[78] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[78] PASSED

org.apache.kafka.common.record.RecordTest  testFields[79] PASSED

org.apache.kafka.common.record.RecordTest  testChecksum[79] PASSED

org.apache.kafka.common.record.RecordTest  testEquality[79] PASSED

org.apache.kafka.common.record.MemoryRecordsTest  testIterator[0] PASSED

org.apache.kafka.common.record.MemoryRecordsTest  testIterator[1] PASSED

org.apache.kafka.common.record.MemoryRecordsTest  testIterator[2] PASSED

org.apache.kafka.common.record.MemoryRecordsTest  testIterator[3] PASSED

org.apache.kafka.common.record.MemoryRecordsTest  testIterator[4] PASSED

org.apache.kafka.common.config.ConfigDefTest  testBasicTypes PASSED

org.apache.kafka.common.config.ConfigDefTest  testInvalidDefault PASSED

org.apache.kafka.common.config.ConfigDefTest  testNullDefault PASSED

org.apache.kafka.common.config.ConfigDefTest  testMissingRequired PASSED

org.apache.kafka.common.config.ConfigDefTest  testDefinedTwice PASSED

org.apache.kafka.common.config.ConfigDefTest  testBadInputs PASSED
:contrib:compileJava UP-TO-DATE
:contrib:processResources UP-TO-DATE
:contrib:classes UP-TO-DATE
:contrib:compileTestJava UP-TO-DATE
:contrib:processTestResources UP-TO-DATE
:contrib:testClasses UP-TO-DATE
:contrib:test UP-TO-DATE
:clients:jar
:core:compileJava UP-TO-DATE
:core:compileScala
https://builds.apache.org/job/Kafka-trunk/ws/core/src/main/scala/kafka/admin/AdminUtils.scala:253:
 non-variable type argument String in type pattern 
scala.collection.Map[String,_] is unchecked since it is eliminated by erasure
case Some(map: Map[String, _]) = 
   ^
https://builds.apache.org/job/Kafka-trunk/ws/core/src/main/scala/kafka/admin/AdminUtils.scala:256:
 non-variable type argument String in type pattern 
scala.collection.Map[String,String] is unchecked since it is eliminated by 
erasure
case Some(config: Map[String, String]) =
  ^

Re: Review Request 23702: Patch for KAFKA-1070

2014-07-23 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48598
---



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment85309

any reason you want to move this here from the beginning of the class?



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment85311

uses it *as* the brokerId



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment85319

If there is another exception (other than FileNotFound which we should 
ignore) while reading the file, it will be good to log a meaningful error 
before rethrowing it.



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment85314

we should also fsync the file to disk and close out the file output stream



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/23702/#comment85320

It will be good to refer to 1000 as a static variable inside KafkaConfig 
eg. KafkaConfig.MinimumBrokerSequenceId



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/23702/#comment85322

this logic should ideally be encapsulated under 
updatePersistentSequentialPath inside ZkUtils. 



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/23702/#comment85323

Shouldn't we be using createPersistentSequential?



core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala
https://reviews.apache.org/r/23702/#comment85325

can we add more test cases that cover all possible combinations of using 
the config based broker id and the zk based one? It will also help to 
explicitly separate some of these test cases. 


- Neha Narkhede


On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 22, 2014, 6:34 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-1421) Error: Could not find or load main class kafka.perf.SimpleConsumerPerformance

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072677#comment-14072677
 ] 

Jun Rao commented on KAFKA-1421:


You will need to include kafka-perf jar too.

 Error: Could not find or load main class kafka.perf.SimpleConsumerPerformance
 -

 Key: KAFKA-1421
 URL: https://issues.apache.org/jira/browse/KAFKA-1421
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.0
Reporter: Daryl Erwin
Assignee: Jun Rao
Priority: Minor
  Labels: performance
   Original Estimate: 24h
  Remaining Estimate: 24h

 Did a base install with 
 sbt update
 sbt package
 I am able to successfully run the console-producer, consumer. I am trying to 
 run the perf scripts (./kafka-simple-consumer-perf-test.sh) but it appears 
 the jar file is not generated. 
 Are the steps that I need to run to create this jar file?
 .. same as:
 Error: Could not find or load main class kafka.perf.ProducerPerformance



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23702: Patch for KAFKA-1070

2014-07-23 Thread Sriharsha Chintalapani


 On July 24, 2014, 1:21 a.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/utils/ZkUtils.scala, line 718
  https://reviews.apache.org/r/23702/diff/3/?file=639483#file639483line718
 
  Shouldn't we be using createPersistentSequential?

if I used createPersistentSequential it keeps creating a new node in zk with a 
sequential number. It will leave too many nodes in zk depending on the number 
of brokers that the user run. Instead I used a  path each time updating with a 
expected version -1 and using returned stat.getVersion for the sequential 
number.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48598
---


On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 22, 2014, 6:34 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 23702: Patch for KAFKA-1070

2014-07-23 Thread Sriharsha Chintalapani


 On July 24, 2014, 1:21 a.m., Neha Narkhede wrote:
  core/src/main/scala/kafka/server/KafkaServer.scala, line 96
  https://reviews.apache.org/r/23702/diff/3/?file=639482#file639482line96
 
  any reason you want to move this here from the beginning of the class?

if the user didn't provide a brokerId in server.properties I am setting 
config.brokerId=-1 in KafkaConfig and in KafkaServer I check and generate a new 
sequence id from zookeeper assign it to config.brokerId.
If this line at the start of the class it won't get the newly generated 
sequenceId.
ex:
 [Kafka Server -1], started (kafka.server.KafkaServer)
 New leader is 1001 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
 [Kafka Server -1], shut down completed (kafka.server.KafkaServer)


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48598
---


On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 22, 2014, 6:34 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: Review Request 23702: Patch for KAFKA-1070

2014-07-23 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48608
---



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment85328

Does this cover the case for metaBrokerIdSet.size == 1 and brokerId  0? In 
this case, we should use the brokerId in the metadata file.



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/23702/#comment85327

This probably only needs to be done after line 365?


- Jun Rao


On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 22, 2014, 6:34 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072690#comment-14072690
 ] 

Jun Rao commented on KAFKA-1555:


Rob,

Is that any different from just running with a higher replication factor?

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23702: Patch for KAFKA-1070

2014-07-23 Thread Sriharsha Chintalapani


 On July 24, 2014, 1:41 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaServer.scala, lines 370-371
  https://reviews.apache.org/r/23702/diff/3/?file=639482#file639482line370
 
  This probably only needs to be done after line 365?

I was thinking of a case where a user has one logDir initially and we stored 
the brokerId in that dir and later if the user added few dirs to the config and 
to make sure that we copied the meta.properties in all of those new dirs I 
added it at the bottom. I am not sure if that case exists please let me know.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48608
---


On July 22, 2014, 6:34 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23702/
 ---
 
 (Updated July 22, 2014, 6:34 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1070
 https://issues.apache.org/jira/browse/KAFKA-1070
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1070. Auto assign node id.
 
 
 KAFKA-1070. Auto-assign node id.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
 PRE-CREATION 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 50b09edb73af1b45f88f919ac8c46ae056878c8e 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 dcdc1ce2b02c996294e19cf480736106aaf29511 
   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
 PRE-CREATION 
   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
 3faa884f8eb83c7c00baab416d0acfb488dc39c1 
 
 Diff: https://reviews.apache.org/r/23702/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Re: [jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers
Hi Jun,

Yes, that's what I am thinking. It allows maintaining a pool of offline, but 
current and consistent replica shadows, ready to be flipped into ISR.  Due to 
their being out of ISR prevents them being counted in quorum, yet ready to go, 
so no impact to the producers.

Looking at it through algebra sunglasses means we would establish a secondary 
space of replication but with a different dimensional projection into the 
parent meta space, which is the current ISR replication space, itself projected 
into consumers' meta space as the leader partition.  I am thinking it adds 
another layer of depth, to shore the defenses.

- Rob

 On Jul 23, 2014, at 7:46 PM, Jun Rao (JIRA) j...@apache.org wrote:
 
 
[ 
 https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072690#comment-14072690
  ] 
 
 Jun Rao commented on KAFKA-1555:
 
 
 Rob,
 
 Is that any different from just running with a higher replication factor?
 
 provide strong consistency with reasonable availability
 ---
 
Key: KAFKA-1555
URL: https://issues.apache.org/jira/browse/KAFKA-1555
Project: Kafka
 Issue Type: Improvement
 Components: controller
   Affects Versions: 0.8.1.1
   Reporter: Jiang Wu
   Assignee: Neha Narkhede
 
 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the 
 requirements due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.
 
 
 
 --
 This message was sent by Atlassian JIRA
 (v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Robert Withers (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072700#comment-14072700
 ] 

Robert Withers commented on KAFKA-1555:
---

Hi Jun,

Yes, that's what I am thinking. It allows maintaining a pool of offline, but 
current and consistent replica shadows, ready to be flipped into ISR.  Due to 
their being out of ISR prevents them being counted in quorum, yet ready to go, 
so no impact to the producers.

Looking at it through algebra sunglasses means we would establish a secondary 
space of replication but with a different dimensional projection into the 
parent meta space, which is the current ISR replication space, itself projected 
into consumers' meta space as the leader partition.  I am thinking it adds 
another layer of depth, to shore the defenses.

- Rob

 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-23 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14072756#comment-14072756
 ] 

Jun Rao commented on KAFKA-1555:


Rob,

In order for those shadow replicas to be ready to be flipped into ISR, they 
have to be very in-sync with the leader. If that's the case, even if you count 
them in acking the producer, it won't delay the produce request much. So, I 
still don't see a clear benefit of this approach vs just having a larger 
replication factor. I am also not sure how easy the implementation will be.


 provide strong consistency with reasonable availability
 ---

 Key: KAFKA-1555
 URL: https://issues.apache.org/jira/browse/KAFKA-1555
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 0.8.1.1
Reporter: Jiang Wu
Assignee: Neha Narkhede

 In a mission critical application, we expect a kafka cluster with 3 brokers 
 can satisfy two requirements:
 1. When 1 broker is down, no message loss or service blocking happens.
 2. In worse cases such as two brokers are down, service can be blocked, but 
 no message loss happens.
 We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
 due to its three behaviors:
 1. when choosing a new leader from 2 followers in ISR, the one with less 
 messages may be chosen as the leader.
 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
 has less messages than the leader.
 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
 stored in only 1 broker.
 The following is an analytical proof. 
 We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
 that at the beginning, all 3 replicas, leader A, followers B and C, are in 
 sync, i.e., they have the same messages and are all in ISR.
 According to the value of request.required.acks (acks for short), there are 
 the following cases.
 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
 time, although C hasn't received m, C is still in ISR. If A is killed, C can 
 be elected as the new leader, and consumers will miss m.
 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
 message m to A, and receives an acknowledgement. Disk failure happens in A 
 before B and C replicate m. Message m is lost.
 In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23474: Patch for KAFKA-1483

2014-07-23 Thread Neha Narkhede

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23474/#review48624
---

Ship it!


Ship It!

- Neha Narkhede


On July 16, 2014, 6:07 p.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23474/
 ---
 
 (Updated July 16, 2014, 6:07 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1483
 https://issues.apache.org/jira/browse/KAFKA-1483
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1483. Split Brain about Leader Partitions.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
 
 Diff: https://reviews.apache.org/r/23474/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani