[jira] [Commented] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data

2014-07-15 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061724#comment-14061724
 ] 

Dmitry Bugaychenko commented on KAFKA-1539:
---

It looks like even after flush data are not necesary written to HDD. In XFS by 
default it could be cached up to 30 secodns, it also can be cached by a disk 
controller and etc. Wrtiting to temp file is a good idea, but it is better to 
keep the previous file untouched (do not replace it with the temp one).

On a 20 HDD server with XFS it is pretty easy to reproduce - after power 
failure we got corrupted offset files on 4-5 disks.

 Due to OS caching Kafka might loose offset files which causes full reset of 
 data
 

 Key: KAFKA-1539
 URL: https://issues.apache.org/jira/browse/KAFKA-1539
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps

 Seen this while testing power failure and disk failures. Due to chaching on 
 OS level (eg. XFS can cache data for 30 seconds) after failure we got offset 
 files of zero length. This dramatically slows down broker startup (it have to 
 re-check all segments) and if high watermark offsets lost it simply erases 
 all data and start recovering from other brokers (looks funny - first 
 spending 2-3 hours re-checking logs and then deleting them all due to missing 
 high watermark).
 Proposal: introduce offset files rotation. Keep two version of offset file, 
 write to oldest, read from the newest valid. In this case we would be able to 
 configure offset checkpoint time in a way that at least one file is alway 
 flushed and valid.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-15 Thread Nicolae Marasoiu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061766#comment-14061766
 ] 

Nicolae Marasoiu commented on KAFKA-1535:
-

Which branch should this be done on? trunk?

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-15 Thread Nicolae Marasoiu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061766#comment-14061766
 ] 

Nicolae Marasoiu edited comment on KAFKA-1535 at 7/15/14 6:46 AM:
--

Which branch should this be done on?
Seems trunk == 0.9.0

Should this be done in the default branch 0.8.1 ?


was (Author: nmarasoiu):
Which branch should this be done on? trunk?

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-15 Thread Nicolae Marasoiu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061766#comment-14061766
 ] 

Nicolae Marasoiu edited comment on KAFKA-1535 at 7/15/14 6:48 AM:
--

Should this be done in the default branch 0.8.1 ?


was (Author: nmarasoiu):
Which branch should this be done on?
Seems trunk == 0.9.0

Should this be done in the default branch 0.8.1 ?

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-15 Thread Alexey Ozeritskiy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14061957#comment-14061957
 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
--

Anton, you forget to use new config option (log.recovery.threads).

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1325) Fix inconsistent per topic log configs

2014-07-15 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1325:
-

Labels: newbie usability  (was: usability)

 Fix inconsistent per topic log configs
 --

 Key: KAFKA-1325
 URL: https://issues.apache.org/jira/browse/KAFKA-1325
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.1
Reporter: Neha Narkhede
Assignee: Manikumar Reddy
  Labels: newbie, usability
 Fix For: 0.8.2

 Attachments: KAFKA-1325.patch, KAFKA-1325.patch


 Related thread from the user mailing list - 
 http://mail-archives.apache.org/mod_mbox/kafka-users/201403.mbox/%3Cd8f6f3857b82c4ccd8725aba6fd68cb8%40cweb01.nmdf.nhnsystem.com%3E
 Our documentation is a little confusing on the log configs. 
 The log property for retention.ms is in millis but the server default it maps 
 to is in minutes.
 Same is true for segment.ms as well. We could either improve the docs or
 change the per-topic configs to be consistent with the server defaults.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1535) return all live brokers in TopicMetadataResponse

2014-07-15 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062214#comment-14062214
 ] 

Jay Kreps commented on KAFKA-1535:
--

Yeah we do all development on trunk and just cut branches as a stable point for 
critical point fixes needed after the release.

 return all live brokers in TopicMetadataResponse
 

 Key: KAFKA-1535
 URL: https://issues.apache.org/jira/browse/KAFKA-1535
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Currently, we only return the brokers that have assigned replicas for a topic 
 in TopicMetadataResponse. The new producer will use those brokers for 
 refreshing metadata. Now suppose that we stop all those brokers, copy all 
 local data to some new hosts and then restart those hosts (with the original 
 broker id). There is no way for the new producer to automatically get the 
 information about the new brokers since all old brokers are gone.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1519) Console consumer: expose configuration option to enable/disable writing the line separator

2014-07-15 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1519:
-

Labels: newbie  (was: )

 Console consumer: expose configuration option to enable/disable writing the 
 line separator
 --

 Key: KAFKA-1519
 URL: https://issues.apache.org/jira/browse/KAFKA-1519
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Affects Versions: 0.8.1.1
Reporter: Michael Noll
Assignee: Gwen Shapira
Priority: Minor
  Labels: newbie
 Attachments: KAFKA-1519.patch


 The current console consumer includes a {{DefaultMessageFormatter}}, which 
 exposes a few user-configurable options which can be set on the command line 
 via --property, e.g. --property line.separator=XYZ.
 Unfortunately, the current implementation does not allow the user to 
 completely disable writing any such line separator.  However, this 
 functionality would be helpful to enable users to capture data as is from a 
 Kafka topic to snapshot file.  Capturing data as is -- without an 
 artificial line separator -- is particularly nice for data in a binary format 
 (including Avro).
 *No workaround*
 A potential workaround would be to pass an empty string as the property value 
 of line.separator, but this doesn't work in the current implementation.
 The following variants throw an Invalid parser arguments exception:
 {code}
 --property line.separator=   # nothing
 --property line.separator= # double quotes
 --property line.separator='' # single quotes
 {code}
 Escape tricks via a backslash don't work either.
 If there actually is a workaround please let me know.
 *How to fix*
 We can introduce a print.line option to enable/disable writing 
 line.separator similar to how the code already uses print.key to 
 enable/disable writing key.separator.
 This change is trivial.  To preserve backwards compatibility, the 
 print.line option would be set to true by default (unlike the print.key 
 option, which defaults to false).
 *Alternatives*
 Apart from modifying the built-in {{DefaultMessageFormatter}}, users could of 
 course implement their own custom {{MessageFormatter}}.  But given that it's 
 a) a trivial change to the {{DefaultMessageFormatter}} and b) a nice user 
 feature I'd say changing the built-in {{DefaultMessageFormatter}} would be 
 the better approach.  This way, Kafka would support writing data as-is to a 
 file out of the box.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1521) Producer Graceful Shutdown issue in Container (Kafka version 0.8.x.x)

2014-07-15 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1521.
--

Resolution: Won't Fix

Closing as per [~junrao]'s comment. [~Bmis13] Please reopen if you have 
concerns.

 Producer Graceful Shutdown issue in Container (Kafka version 0.8.x.x)
 -

 Key: KAFKA-1521
 URL: https://issues.apache.org/jira/browse/KAFKA-1521
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.0, 0.8.1.1
 Environment: Tomcat Container or Any other J2EE container
Reporter: Bravesh Mistry
Assignee: Jun Rao
Priority: Minor

 Hi Kafka Team,
 We are running multiple webapps in tomcat container, and we have producer 
 which are managed by the ServletContextListener (Lifecycle).  Upon  
 contextInitialized we create and on contextDestroyed we call the 
 producer.close() but underlying Metric Lib does not shutdown.  So we have 
 thread leak due to this issue.  I had to call 
 Metrics.defaultRegistry().shutdown() to resolve this issue.  is this know 
 issue ? I know the metric lib have JVM Shutdown hook, but it will not be 
 invoke since the contain thread is un-deploying the web app and class loader 
 goes way and leaking thread does not find the under lying Kafka class.
 Because of this tomcat, it not shutting down gracefully.
 Are you guys planing to un-register metrics when Producer close is called or 
 shutdown Metrics pool for client.id ? 
 Here is logs:
 SEVERE: The web application [  ] appears to have started a thread named 
 [metrics-meter-tick-thread-1] but has failed to stop it. This is very likely 
 to create a memory leak.
 SEVERE: The web application [] appears to have started a thread named 
 [metrics-meter-tick-thread-2] but has failed to stop it. This is very likely 
 to create a memory leak.
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1517) Messages is a required argument to Producer Performance Test

2014-07-15 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1517?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1517:
-

Labels: newbie  (was: )

 Messages is a required argument to Producer Performance Test
 

 Key: KAFKA-1517
 URL: https://issues.apache.org/jira/browse/KAFKA-1517
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
Reporter: Daniel Compton
Priority: Trivial
  Labels: newbie

 When running the producer performance test without providing a messages 
 argument, you get an error:
 {noformat}
 $bin/kafka-producer-perf-test.sh --topics mirrormirror --broker-list 
 kafka-dc21:9092
 Missing required argument [messages]
 Option  Description
 --  ---
 ..
 --messages Long: countThe number of messages to send or
   consume (default:
   9223372036854775807)
 {noformat}
 However the [shell command 
 documentation|https://github.com/apache/kafka/blob/c66e408b244de52f1c5c5bbd7627aa1f028f9a87/perf/src/main/scala/kafka/perf/PerfConfig.scala#L25]
  doesn't say that this is required and implies that 
 [2^63-1|http://en.wikipedia.org/wiki/9223372036854775807] (Long.MaxValue) 
 messages will be sent. It should probably look like the 
 [ConsoleProducer|https://github.com/apache/kafka/blob/c66e408b244de52f1c5c5bbd7627aa1f028f9a87/core/src/main/scala/kafka/producer/ConsoleProducer.scala#L32]
  and prefix the documentation with REQUIRED. Or should we make this a 
 non-required argument and set the default value to something sane like 
 100,000 messages.
 Which option is preferable for this?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1019) kafka-preferred-replica-election.sh will fail without clear error message if /brokers/topics/[topic]/partitions does not exist

2014-07-15 Thread Neha Narkhede (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062323#comment-14062323
 ] 

Neha Narkhede commented on KAFKA-1019:
--

[~draiwn] were you able to confirm the issue with zookeeper 3.4.6? In any case, 
we can at least fix the error message when the path doesn't exist.

 kafka-preferred-replica-election.sh will fail without clear error message if 
 /brokers/topics/[topic]/partitions does not exist
 --

 Key: KAFKA-1019
 URL: https://issues.apache.org/jira/browse/KAFKA-1019
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Guozhang Wang
  Labels: newbie
 Fix For: 0.8.2


 From Libo Yu:
 I tried to run kafka-preferred-replica-election.sh on our kafka cluster.
 But I got this expection:
 Failed to start preferred replica election
 org.I0Itec.zkclient.exception.ZkNoNodeException: 
 org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
 NoNode for /brokers/topics/uattoqaaa.default/partitions
 I checked zookeeper and there is no 
 /brokers/topics/uattoqaaa.default/partitions. All I found is
 /brokers/topics/uattoqaaa.default.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1534) transient unit test failure in testBasicPreferredReplicaElection

2014-07-15 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede updated KAFKA-1534:
-

Labels: newbie  (was: )

 transient unit test failure in testBasicPreferredReplicaElection
 

 Key: KAFKA-1534
 URL: https://issues.apache.org/jira/browse/KAFKA-1534
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
  Labels: newbie

 Saw the following transient failure. 
 kafka.admin.AdminTest  testBasicPreferredReplicaElection FAILED
 junit.framework.AssertionFailedError: Timing out after 5000 ms since 
 leader is not elected or changed for partition [test,1]
 at junit.framework.Assert.fail(Assert.java:47)
 at 
 kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:542)
 at 
 kafka.admin.AdminTest.testBasicPreferredReplicaElection(AdminTest.scala:310)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23474: Patch for KAFKA-1483

2014-07-15 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23474/#review47785
---



core/src/main/scala/kafka/server/ReplicaManager.scala
https://reviews.apache.org/r/23474/#comment83980

I am wondering if this function will introduce more lock contention on 
leaderIsrUpdateLock? Every leaderReplicaIfLocal call will need to hold on its 
read lock while other requests at the same time may try to modify the leader 
option? 


- Guozhang Wang


On July 15, 2014, 1:18 a.m., Sriharsha Chintalapani wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23474/
 ---
 
 (Updated July 15, 2014, 1:18 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1483
 https://issues.apache.org/jira/browse/KAFKA-1483
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1483. Split Brain about Leader Partitions.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
 
 Diff: https://reviews.apache.org/r/23474/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Sriharsha Chintalapani
 




Review Request 23516: Patch for KAFKA-1462

2014-07-15 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23516/
---

Review request for kafka.


Bugs: KAFKA-1462
https://issues.apache.org/jira/browse/KAFKA-1462


Repository: kafka


Description
---

remove partition from all PartitionData since it's redundant


minor fixes


initial patch


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
a016269512b6d6d6e0fd3fab997e9c8265024eb4 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
c62707ab3aba26771fc4b993df28bf8c44f32309 
  clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
6fe7573973832615976defa37fe0dfbb8f911939 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
044b03061802ee5e8ea4f1995fb0988e1a70e9a7 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
8cecba50bf067713184208552af36469962cd628 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
f35bd87cf0c52a30ed779b25d60bbe64a60b9502 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
2652c32f123b3bc4b0456d4bc9fbba52c051724c 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
6036f6af1c55c1b0a15471e79b229b17f50ce31c 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
6cf4fb714916f1a318d788cde8fc0aad9dfb83ca 
  clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 
66cc2fea6443968e525419a203dbc4227e0b1cdf 
  clients/src/main/java/org/apache/kafka/common/requests/ResponseHeader.java 
257b8287757e40349ea041ed7a651993007a55a8 
  clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
2f98192b064d1ce7c0779e901293edb8c3801915 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
PRE-CREATION 
  core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
dfad6e6534dd9b00099d110804899080e8d832ab 
  core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
c72ca14708a3625cb89d5fb92630138d2afa2bf0 
  core/src/main/scala/kafka/api/ControlledShutdownRequest.scala 
7dacb2023788064b736df8b775aaf12281d545b5 
  core/src/main/scala/kafka/api/ControlledShutdownResponse.scala 
46ec3db28f88bbf9e0b0de2133807dc552bcae13 
  core/src/main/scala/kafka/api/FetchRequest.scala 
a8b73acd1a813284744359e8434cb52d22063c99 
  core/src/main/scala/kafka/api/GenericRequestOrResponseAndHeader.scala 
PRE-CREATION 
  core/src/main/scala/kafka/api/HeartbeatRequestAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/HeartbeatResponseAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/JoinGroupRequestAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/JoinGroupResponseAndHeader.scala PRE-CREATION 
  core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala 
3e408174dcc7e8dd9097bae41277ee4f7160afb3 
  core/src/main/scala/kafka/api/LeaderAndIsrResponse.scala 
f6368bb5a1d560f79427284ccbac9d46b789 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
630768ab57afb579049bcbc5d44ee6823b0e7cc2 
  core/src/main/scala/kafka/api/OffsetCommitResponse.scala 
4946e9729ecbf3da35bdab5c832d26977c107e9e 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
a32f8588ff02f5fb3c99fb8e5508f462923e8edc 
  core/src/main/scala/kafka/api/OffsetFetchResponse.scala 
c1222f422ddb6413bbb2e5da2980903ee70b9156 
  core/src/main/scala/kafka/api/OffsetRequest.scala 

Re: Review Request 23516: Patch for KAFKA-1462

2014-07-15 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23516/
---

(Updated July 15, 2014, 6:36 p.m.)


Review request for kafka.


Bugs: KAFKA-1462
https://issues.apache.org/jira/browse/KAFKA-1462


Repository: kafka


Description (updated)
---

1. I kept the request objects in the server separated from those in the client. 
This is because (1) some of the existing request objects are part of old client 
api (FetechRequest, OffsetCommitRequest, etc) and we can't remove them until 
the old clients are removed, (2) changing existing request objects on the 
server side requires significant refactoring.

2. On the client side, I refactored existing request objects a bit. Now, every 
request/response object extends from a GenericStruct. GenericStruct provides a 
standard way for doing serialization and toString so that we don't have to do 
that on every request. Each request/response can be constructed in two ways: 
(1) by providing request specific fields; (2) by providing a struct. The latter 
is used for getting a request/response from its serialized format.

3. On the server side. What I did is to keep the existing requests more or less 
untouched. For new types of requests, create a thin wrapper on the server side 
so that it can leverage the request objects created on the client side. This 
way the server side object will share the serialization and the toString logic 
with the client side object. In order to do this, I removed correlationId from 
the RequestOrResponse object. There is only one place where correlationId is 
directly referenced and it is not necessary.

4. Multi-version support. We now need to support two versions of 
OffsetCommitRequest since for the new consumer work, we added two extra fields 
in the request. For simplicity, both versions are converted to the same request 
object. Since the old version doesn't have the new fields, defaults will be 
used.

5. The new requests/responses are based on the format described in 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Requestformats.
 I made some minor changes to the wiki so that the new requests follow the 
current standard.

6. Added some missing util functions and added unit test for testing the 
serialization/deserialization logic.


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
a016269512b6d6d6e0fd3fab997e9c8265024eb4 
  clients/src/main/java/org/apache/kafka/common/Cluster.java 
c62707ab3aba26771fc4b993df28bf8c44f32309 
  clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
6fe7573973832615976defa37fe0dfbb8f911939 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
044b03061802ee5e8ea4f1995fb0988e1a70e9a7 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
8cecba50bf067713184208552af36469962cd628 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/GenericStruct.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
f35bd87cf0c52a30ed779b25d60bbe64a60b9502 
  clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
2652c32f123b3bc4b0456d4bc9fbba52c051724c 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 
PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
6036f6af1c55c1b0a15471e79b229b17f50ce31c 
  clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
6cf4fb714916f1a318d788cde8fc0aad9dfb83ca 
  

[jira] [Updated] (KAFKA-1462) Add new request and response formats for the new consumer and coordinator communication

2014-07-15 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1462:
---

Attachment: KAFKA-1462.patch

 Add new request and response formats for the new consumer and coordinator 
 communication
 ---

 Key: KAFKA-1462
 URL: https://issues.apache.org/jira/browse/KAFKA-1462
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: Guozhang Wang
Assignee: Jun Rao
 Fix For: 0.9.0

 Attachments: KAFKA-1462.patch


 We need to add the request / response formats according to the new format 
 protocol once their design is final:
 https://cwiki.apache.org/confluence/display/KAFKA
 /Kafka+0.9+Consumer+Rewrite+Design



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Review Request 23521: Fix KAFKA-1533

2014-07-15 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23521/
---

Review request for kafka.


Bugs: KAFKA-1533
https://issues.apache.org/jira/browse/KAFKA-1533


Repository: kafka


Description
---

1. Add the metadataRefreshAttemptMS in NetworkClient for backing off; 2. 
Refactor Producer API tests using KafkaTestHarness; 3. Change default backoff 
time to 100ms for test utils


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
d8f9ce663ee24d2b0852c974136741280c39f8f8 
  
clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java 
4aa5b01d611631db72df47d50bbe30edb8c478db 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
15fd5bcbaf175a0f7d7cf0b142e63f705ca9b6ae 
  core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
34a7db4b4ea2b720476c2b1f22a623a997faffbc 
  core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
194dd70919a5f301d3131c56594e40a0ebb27311 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
3faa884f8eb83c7c00baab416d0acfb488dc39c1 

Diff: https://reviews.apache.org/r/23521/diff/


Testing
---


Thanks,

Guozhang Wang



[jira] [Commented] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest

2014-07-15 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062614#comment-14062614
 ] 

Guozhang Wang commented on KAFKA-1533:
--

Created reviewboard https://reviews.apache.org/r/23521/
 against branch origin/trunk

 transient unit test failure in ProducerFailureHandlingTest
 --

 Key: KAFKA-1533
 URL: https://issues.apache.org/jira/browse/KAFKA-1533
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
 Attachments: KAFKA-1533.patch


 Occasionally, saw the test hang on tear down. The following is the stack 
 trace.
 Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() 
 [10e075000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet)
 at java.lang.Object.wait(Object.java:485)
 at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344)
 - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet)
 at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732)
 at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91)
 at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720)
 at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
 at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716)
 at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416)
 at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184)
 at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50)
 at 
 kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243)
 at kafka.utils.Utils$.swallow(Utils.scala:172)
 at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
 at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
 at kafka.utils.Logging$class.swallow(Logging.scala:94)
 at kafka.utils.Utils$.swallow(Utils.scala:45)
 at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243)
 at 
 kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1533) transient unit test failure in ProducerFailureHandlingTest

2014-07-15 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1533:
-

Attachment: KAFKA-1533.patch

 transient unit test failure in ProducerFailureHandlingTest
 --

 Key: KAFKA-1533
 URL: https://issues.apache.org/jira/browse/KAFKA-1533
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
 Attachments: KAFKA-1533.patch


 Occasionally, saw the test hang on tear down. The following is the stack 
 trace.
 Test worker prio=5 tid=7f9246956000 nid=0x10e078000 in Object.wait() 
 [10e075000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet)
 at java.lang.Object.wait(Object.java:485)
 at org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1344)
 - locked 7f4e69578 (a org.apache.zookeeper.ClientCnxn$Packet)
 at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:732)
 at org.I0Itec.zkclient.ZkConnection.delete(ZkConnection.java:91)
 at org.I0Itec.zkclient.ZkClient$8.call(ZkClient.java:720)
 at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
 at org.I0Itec.zkclient.ZkClient.delete(ZkClient.java:716)
 at kafka.utils.ZkUtils$.deletePath(ZkUtils.scala:416)
 at kafka.utils.ZkUtils$.deregisterBrokerInZk(ZkUtils.scala:184)
 at kafka.server.KafkaHealthcheck.shutdown(KafkaHealthcheck.scala:50)
 at 
 kafka.server.KafkaServer$$anonfun$shutdown$2.apply$mcV$sp(KafkaServer.scala:243)
 at kafka.utils.Utils$.swallow(Utils.scala:172)
 at kafka.utils.Logging$class.swallowWarn(Logging.scala:92)
 at kafka.utils.Utils$.swallowWarn(Utils.scala:45)
 at kafka.utils.Logging$class.swallow(Logging.scala:94)
 at kafka.utils.Utils$.swallow(Utils.scala:45)
 at kafka.server.KafkaServer.shutdown(KafkaServer.scala:243)
 at 
 kafka.api.ProducerFailureHandlingTest.tearDown(ProducerFailureHandlingTest.scala:90)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[GitHub] kafka pull request: KAFKA-1414: Speedup broker startup after hard ...

2014-07-15 Thread ataraxer
GitHub user ataraxer opened a pull request:

https://github.com/apache/kafka/pull/26

KAFKA-1414: Speedup broker startup after hard reset and shutdown

This patch increases speed of both hard reset and shutdown by introducing 
`log.recovery.threads` and `log.shutdown.threads` properties, which allows to 
perform work required for them in parallel, grained by log directories.

Best performance can be achieved by setting thread count to number of log 
directories, provided that they are located on dedicated drives. Although that 
option should be used with caution due to the possibility of native JVM out of 
memory error.

Patch is compiled of changes proposed by Jay Kreps, Alexey Ozeritskiy, 
Dmitry Bugaychenko by Anton Karamanov.

All tests are passing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ataraxer/kafka kafka-1414

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/26.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #26


commit e4a86709d07030c44f077ab20d4329ddb84c4aec
Author: Anton Karamanov atara...@gmail.com
Date:   2014-07-15T17:42:15Z

KAFKA-1414 Speedup broker startup after hard reset and shutdown; patched by 
Jay Kreps,  Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062680#comment-14062680
 ] 

ASF GitHub Bot commented on KAFKA-1414:
---

GitHub user ataraxer opened a pull request:

https://github.com/apache/kafka/pull/26

KAFKA-1414: Speedup broker startup after hard reset and shutdown

This patch increases speed of both hard reset and shutdown by introducing 
`log.recovery.threads` and `log.shutdown.threads` properties, which allows to 
perform work required for them in parallel, grained by log directories.

Best performance can be achieved by setting thread count to number of log 
directories, provided that they are located on dedicated drives. Although that 
option should be used with caution due to the possibility of native JVM out of 
memory error.

Patch is compiled of changes proposed by Jay Kreps, Alexey Ozeritskiy, 
Dmitry Bugaychenko by Anton Karamanov.

All tests are passing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ataraxer/kafka kafka-1414

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/26.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #26


commit e4a86709d07030c44f077ab20d4329ddb84c4aec
Author: Anton Karamanov atara...@gmail.com
Date:   2014-07-15T17:42:15Z

KAFKA-1414 Speedup broker startup after hard reset and shutdown; patched by 
Jay Kreps,  Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov




 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-15 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062686#comment-14062686
 ] 

Anton Karamanov commented on KAFKA-1414:


Indeed, patch was malformed.

I've fixed it, added shutdown parallelization, cleaned up a bit and created an 
[issue|https://github.com/apache/kafka/pull/26] on Kafka GitHub mirror for 
review of the changes summary.

[https://github.com/apache/kafka/pull/26]

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062695#comment-14062695
 ] 

Jun Rao commented on KAFKA-1414:


Anton,

Thanks for the patch. Could you add the patch as an attachment to this jira? 
This will take care of the Apache copyright stuff.


 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-15 Thread Anton Karamanov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Karamanov updated KAFKA-1414:
---

Attachment: 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch

Sure.
[^0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch]

 Speedup broker startup after hard reset
 ---

 Key: KAFKA-1414
 URL: https://issues.apache.org/jira/browse/KAFKA-1414
 Project: Kafka
  Issue Type: Improvement
  Components: log
Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps
 Attachments: 
 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
 parallel-dir-loading-0.8.patch, 
 parallel-dir-loading-trunk-fixed-threadpool.patch, 
 parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch


 After hard reset due to power failure broker takes way too much time 
 recovering unflushed segments in a single thread. This could be easiliy 
 improved launching multiple threads (one per data dirrectory, assuming that 
 typically each data directory is on a dedicated drive). Localy we trie this 
 simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
 to scala, so do not take it literally:
 {code}
   /**
* Recover and load all logs in the given data directories
*/
   private def loadLogs(dirs: Seq[File]) {
 val threads : Array[Thread] = new Array[Thread](dirs.size)
 var i: Int = 0
 val me = this
 for(dir - dirs) {
   val thread = new Thread( new Runnable {
 def run()
 {
   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
   /* load the logs */
   val subDirs = dir.listFiles()
   if(subDirs != null) {
 val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
 if(cleanShutDownFile.exists())
   info(Found clean shutdown file. Skipping recovery for all logs 
 in data directory '%s'.format(dir.getAbsolutePath))
 for(dir - subDirs) {
   if(dir.isDirectory) {
 info(Loading log ' + dir.getName + ')
 val topicPartition = Log.parseTopicPartitionName(dir.getName)
 val config = topicConfigs.getOrElse(topicPartition.topic, 
 defaultConfig)
 val log = new Log(dir,
   config,
   recoveryPoints.getOrElse(topicPartition, 0L),
   scheduler,
   time)
 val previous = addLogWithLock(topicPartition, log)
 if(previous != null)
   throw new IllegalArgumentException(Duplicate log 
 directories found: %s, %s!.format(log.dir.getAbsolutePath, 
 previous.dir.getAbsolutePath))
   }
 }
 cleanShutDownFile.delete()
   }
 }
   })
   thread.start()
   threads(i) = thread
   i = i + 1
 }
 for(thread - threads) {
   thread.join()
 }
   }
   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
 logCreationOrDeletionLock synchronized {
   this.logs.put(topicPartition, log)
 }
   }
 {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1529) transient unit test failure in testAutoCreateAfterDeleteTopic

2014-07-15 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1529.


   Resolution: Fixed
Fix Version/s: 0.8.2

Thanks for the reviews. Committed to trunk.

 transient unit test failure in testAutoCreateAfterDeleteTopic
 -

 Key: KAFKA-1529
 URL: https://issues.apache.org/jira/browse/KAFKA-1529
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.8.2
Reporter: Jun Rao
Assignee: Jun Rao
 Fix For: 0.8.2

 Attachments: KAFKA-1529.patch


 Saw the following transient failure.
 kafka.admin.DeleteTopicTest  testAutoCreateAfterDeleteTopic FAILED
 org.scalatest.junit.JUnitTestFailedError: Topic should have been auto 
 created
 at 
 org.scalatest.junit.AssertionsForJUnit$class.newAssertionFailedException(AssertionsForJUnit.scala:101)
 at 
 org.scalatest.junit.JUnit3Suite.newAssertionFailedException(JUnit3Suite.scala:149)
 at org.scalatest.Assertions$class.fail(Assertions.scala:711)
 at org.scalatest.junit.JUnit3Suite.fail(JUnit3Suite.scala:149)
 at 
 kafka.admin.DeleteTopicTest.testAutoCreateAfterDeleteTopic(DeleteTopicTest.scala:222)



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23208: Patch for KAFKA-1512

2014-07-15 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23208/#review47841
---

Ship it!


Ship It!

- Jun Rao


On July 14, 2014, 8:28 p.m., Jay Kreps wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/23208/
 ---
 
 (Updated July 14, 2014, 8:28 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1512
 https://issues.apache.org/jira/browse/KAFKA-1512
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-1512 Add per-ip connection limits.
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/network/SocketServer.scala 
 4976d9c3a66bc965f5870a0736e21c7b32650bab 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 bb2e654b9bd63daa88a4de14131859b75c00e607 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 5a56f57e36c4eab849a0b0e66f20f94690283af2 
   core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
 1c492de8fde6582ca2342842a551739575d1f46c 
 
 Diff: https://reviews.apache.org/r/23208/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jay Kreps
 




[jira] [Commented] (KAFKA-1539) Due to OS caching Kafka might loose offset files which causes full reset of data

2014-07-15 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14062973#comment-14062973
 ] 

Jun Rao commented on KAFKA-1539:


If flush is not guaranteed, will keeping two versions of the file help? At some 
point, we will have flushed both versions and neither one is guaranteed to 
persist.

 Due to OS caching Kafka might loose offset files which causes full reset of 
 data
 

 Key: KAFKA-1539
 URL: https://issues.apache.org/jira/browse/KAFKA-1539
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 0.8.1.1
Reporter: Dmitry Bugaychenko
Assignee: Jay Kreps

 Seen this while testing power failure and disk failures. Due to chaching on 
 OS level (eg. XFS can cache data for 30 seconds) after failure we got offset 
 files of zero length. This dramatically slows down broker startup (it have to 
 re-check all segments) and if high watermark offsets lost it simply erases 
 all data and start recovering from other brokers (looks funny - first 
 spending 2-3 hours re-checking logs and then deleting them all due to missing 
 high watermark).
 Proposal: introduce offset files rotation. Keep two version of offset file, 
 write to oldest, read from the newest valid. In this case we would be able to 
 configure offset checkpoint time in a way that at least one file is alway 
 flushed and valid.



--
This message was sent by Atlassian JIRA
(v6.2#6252)