[jira] [Comment Edited] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074122#comment-14074122
 ] 

Joe Stein edited comment on KAFKA-1555 at 7/25/14 6:11 AM:
---

The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want "message loss" 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three). Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to """MUST""" be in different data 
centers.  After sync writing to them them "join" their result (I say "join" 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful "transaction".

If that doesn't work for you then you must accept "data loss" as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding (returning to your caller or 
fulfilling the promise) that the "message" has been "written".  

Hopefully this make sense and I appreciate that not all (even most) use cases 
need this multi data center + multi rack type of sustainability but it works 
with Kafka if you go by what Kafka guarantees without trying to change it 
unnecessarily.

If there are defects we should fix them but going up and down this thread I am 
getting a bit lost in what we should be doing (if anything) to the current code 
now.




was (Author: joestein):
The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want "message loss" 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three. Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to """MUST""" be in different data 
centers.  After sync writing to them them "join" their result (I say "join" 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful "transaction".

If that doesn't work for you then you must accept "data loss" as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribin

[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074122#comment-14074122
 ] 

Joe Stein commented on KAFKA-1555:
--

The simplification and guarantee provided with ack=-1 works properly if the 
client implements it in a way that works best for them.

Kafka guarantees durability on R-1 failures (with replication factor R) without 
message loss if you use ack=-1 ... so...  that is the foundation we build upon.

When we (the client application f.k.a. the producer) do not want "message loss" 
then setup at least 3 data centers (since you need a majority of data centers 
for a Zookeeper ensemble to work which requires at least three. Then you write 
to two topics each with ack=-1. The broker leaders for the partitions of each 
of these topics that you are writing to """MUST""" be in different data 
centers.  After sync writing to them them "join" their result (I say "join" 
since your specific implementation may use another function name like subscribe 
or onComplete or something) then you get... a durable write from the client 
perspective without loosing data after it has had a successful "transaction".

If that doesn't work for you then you must accept "data loss" as a failure of 
at least one data center (which has nothing todo with Kafka).  

This can be (should be) further extended to make sure that at least two racks 
in each data center get the data. 

If you are not concerned with being able to sustain a loss of 1 data center + a 
loss of 1 rack in another available data center (at the same time) then this is 
not a solution for you.

Now, all of what I just said is manual (to make sure replicas and partitions 
are in different data centers and racks) and static but it works with Kafka 
tools out of the box and some (lots) of software engineering power (scripting 
effort with a couple of late nights burning the midnight oil).

As far as producing this to multiple topics there are lots of ways to-do this 
on the client side running in parallel without much (if any) latency cost (with 
a little bit more software engineering).  You can use Akka or anything else 
where you can get a future after sending off multiple events and then 
subscribing to them onComplete before deciding (returning to your caller or 
fulfilling the promise) that the "message" has been "written".  

Hopefully this make sense and I appreciate that not all (even most) use cases 
need this multi data center + multi rack type of sustainability but it works 
with Kafka if you go by what Kafka guarantees without trying to change it 
unnecessarily.

If there are defects we should fix them but going up and down this thread I am 
getting a bit lost in what we should be doing (if anything) to the current code 
now.



> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-24 Thread Dmitry Bugaychenko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074119#comment-14074119
 ] 

Dmitry Bugaychenko commented on KAFKA-1414:
---

I'm not a scala expert, but the threading schema (at least how I understood it) 
looks fine.

> Speedup broker startup after hard reset
> ---
>
> Key: KAFKA-1414
> URL: https://issues.apache.org/jira/browse/KAFKA-1414
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
>Reporter: Dmitry Bugaychenko
>Assignee: Jay Kreps
> Fix For: 0.8.2
>
> Attachments: 
> 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
> KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
> KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
> KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
> parallel-dir-loading-trunk-fixed-threadpool.patch, 
> parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch
>
>
> After hard reset due to power failure broker takes way too much time 
> recovering unflushed segments in a single thread. This could be easiliy 
> improved launching multiple threads (one per data dirrectory, assuming that 
> typically each data directory is on a dedicated drive). Localy we trie this 
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
> to scala, so do not take it literally:
> {code}
>   /**
>* Recover and load all logs in the given data directories
>*/
>   private def loadLogs(dirs: Seq[File]) {
> val threads : Array[Thread] = new Array[Thread](dirs.size)
> var i: Int = 0
> val me = this
> for(dir <- dirs) {
>   val thread = new Thread( new Runnable {
> def run()
> {
>   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>   /* load the logs */
>   val subDirs = dir.listFiles()
>   if(subDirs != null) {
> val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
> if(cleanShutDownFile.exists())
>   info("Found clean shutdown file. Skipping recovery for all logs 
> in data directory '%s'".format(dir.getAbsolutePath))
> for(dir <- subDirs) {
>   if(dir.isDirectory) {
> info("Loading log '" + dir.getName + "'")
> val topicPartition = Log.parseTopicPartitionName(dir.getName)
> val config = topicConfigs.getOrElse(topicPartition.topic, 
> defaultConfig)
> val log = new Log(dir,
>   config,
>   recoveryPoints.getOrElse(topicPartition, 0L),
>   scheduler,
>   time)
> val previous = addLogWithLock(topicPartition, log)
> if(previous != null)
>   throw new IllegalArgumentException("Duplicate log 
> directories found: %s, %s!".format(log.dir.getAbsolutePath, 
> previous.dir.getAbsolutePath))
>   }
> }
> cleanShutDownFile.delete()
>   }
> }
>   })
>   thread.start()
>   threads(i) = thread
>   i = i + 1
> }
> for(thread <- threads) {
>   thread.join()
> }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
> logCreationOrDeletionLock synchronized {
>   this.logs.put(topicPartition, log)
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Gradle hanging on unit test run against ProducerFailureHangingTest->testBrokerFailure

2014-07-24 Thread David Corley
Hey Gouzhang,
Yes, I spotted that commit and had updated my working copy to that, but the
test is still hanging. If it's any help, the test looks like it's doing
_something_  as the CPU usage ramps up significantly and stays there until
I kill the process.
/Dave


On Thu, Jul 24, 2014 at 4:10 PM, Guozhang Wang  wrote:

> Hi Dave,
>
> KAFKA-1533 has just been committed targeting at this issue. Did your update
> on trunk include this commit?
>
> commit ff05e9b3616a222e29a42f6e8fdf41945a417f41
> Author: Guozhang Wang 
> Date:   Tue Jul 22 14:14:19 2014 -0700
>
> Guozhang
>
> kafka-1533; transient unit test failure in ProducerFailureHandlingTest;
> reviewed by Guozhang Wang; reviewed by Jun Rao
>
>
>
>
> On Thu, Jul 24, 2014 at 5:52 AM, David Corley 
> wrote:
>
> > Hey all,
> > I'm trying my hand at writing some patches for open issues, but I'm
> running
> > into issues with running gradlew test.
> > It hangs every time when trying to run testBrokerFailure in the
> > ProducerFailureHangingTest suite.
> >
> > It was working for a time, but I updated to trunk HEAD and it's no longer
> > working.
> > I'm running on OSX with JDK 1.6.0_65.
> > I tried increasing the HeapSize for the test target, but hit the same
> > issue.
> > Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
> >
> > Would appreciate any help.
> > Regards,
> > Dave
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Commented] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074081#comment-14074081
 ] 

Jun Rao commented on KAFKA-1029:


I think the proposed fix in KAFKA-1451 will fix this issue too.

> Zookeeper leader election stuck in ephemeral node retry loop
> 
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Sam Meder
>Priority: Blocker
> Fix For: 0.8.0
>
> Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>   
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074070#comment-14074070
 ] 

Jun Rao commented on KAFKA-1555:


Sudarshan,

Our replication guarantees are the following.

1. If you use ack=-1 with a replication factor R, you can tolerate R-1 failures 
without message loss.

2. If you use ack>=1 and  provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074064#comment-14074064
 ] 

Jun Rao commented on KAFKA-1211:


Not quite. The case that I described above could happen with ack = -1 too.

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23702: Patch for KAFKA-1070

2014-07-24 Thread Sriharsha Chintalapani


> On July 24, 2014, 1:41 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaServer.scala, lines 359-369
> > 
> >
> > Does this cover the case for metaBrokerIdSet.size == 1 and brokerId < 
> > 0? In this case, we should use the brokerId in the metadata file.

My bad I missed it. Thanks.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48608
---


On July 25, 2014, 4:05 a.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23702/
> ---
> 
> (Updated July 25, 2014, 4:05 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1070
> https://issues.apache.org/jira/browse/KAFKA-1070
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1070. Auto assign node id.
> 
> 
> KAFKA-1070. Auto-assign node id.
> 
> 
> KAFKA-1070. Auto-assign node id.
> 
> 
> KAFKA-1070. Auto-assign node id.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 50b09edb73af1b45f88f919ac8c46ae056878c8e 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> dcdc1ce2b02c996294e19cf480736106aaf29511 
>   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
> ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 4d01d25c73e1f986f58801296015efc491f9e45a 
> 
> Diff: https://reviews.apache.org/r/23702/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074036#comment-14074036
 ] 

Sriharsha Chintalapani commented on KAFKA-1070:
---

Updated reviewboard https://reviews.apache.org/r/23702/diff/
 against branch origin/trunk

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
> KAFKA-1070_2014-07-24_21:05:33.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1070:
--

Attachment: KAFKA-1070_2014-07-24_21:05:33.patch

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch, 
> KAFKA-1070_2014-07-24_21:05:33.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23702: Patch for KAFKA-1070

2014-07-24 Thread Sriharsha Chintalapani


> On July 24, 2014, 1:21 a.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/utils/ZkUtils.scala, line 133
> > 
> >
> > It will be good to refer to 1000 as a static variable inside 
> > KafkaConfig eg. KafkaConfig.MinimumBrokerSequenceId

need to change KafkaConfig to object to add a static variable instead passing 
it as a argument to ZkUtils.getBrokerIdSequenceId


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/#review48598
---


On July 25, 2014, 4:05 a.m., Sriharsha Chintalapani wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23702/
> ---
> 
> (Updated July 25, 2014, 4:05 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1070
> https://issues.apache.org/jira/browse/KAFKA-1070
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> KAFKA-1070. Auto assign node id.
> 
> 
> KAFKA-1070. Auto-assign node id.
> 
> 
> KAFKA-1070. Auto-assign node id.
> 
> 
> KAFKA-1070. Auto-assign node id.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 50b09edb73af1b45f88f919ac8c46ae056878c8e 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> dcdc1ce2b02c996294e19cf480736106aaf29511 
>   core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
> ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 4d01d25c73e1f986f58801296015efc491f9e45a 
> 
> Diff: https://reviews.apache.org/r/23702/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Sriharsha Chintalapani
> 
>



Re: Review Request 23702: Patch for KAFKA-1070

2014-07-24 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/
---

(Updated July 25, 2014, 4:05 a.m.)


Review request for kafka.


Bugs: KAFKA-1070
https://issues.apache.org/jira/browse/KAFKA-1070


Repository: kafka


Description (updated)
---

KAFKA-1070. Auto assign node id.


KAFKA-1070. Auto-assign node id.


KAFKA-1070. Auto-assign node id.


KAFKA-1070. Auto-assign node id.


Diffs (updated)
-

  core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
50b09edb73af1b45f88f919ac8c46ae056878c8e 
  core/src/main/scala/kafka/server/KafkaServer.scala 
def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
dcdc1ce2b02c996294e19cf480736106aaf29511 
  core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
4d01d25c73e1f986f58801296015efc491f9e45a 

Diff: https://reviews.apache.org/r/23702/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14074028#comment-14074028
 ] 

Sriharsha Chintalapani commented on KAFKA-1070:
---

Updated reviewboard https://reviews.apache.org/r/23702/diff/
 against branch origin/trunk

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1070:
--

Attachment: KAFKA-1070_2014-07-24_20:58:17.patch

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch, KAFKA-1070_2014-07-24_20:58:17.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23702: Patch for KAFKA-1070

2014-07-24 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23702/
---

(Updated July 25, 2014, 3:58 a.m.)


Review request for kafka.


Bugs: KAFKA-1070
https://issues.apache.org/jira/browse/KAFKA-1070


Repository: kafka


Description (updated)
---

KAFKA-1070. Auto assign node id.


KAFKA-1070. Auto-assign node id.


KAFKA-1070. Auto-assign node id.


Diffs (updated)
-

  core/src/main/scala/kafka/common/InconsistentBrokerIdException.scala 
PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
50b09edb73af1b45f88f919ac8c46ae056878c8e 
  core/src/main/scala/kafka/server/KafkaServer.scala 
def1dc2a5818d45d9ee0881137ff989cec4eb9b1 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
dcdc1ce2b02c996294e19cf480736106aaf29511 
  core/src/test/scala/unit/kafka/server/ServerGenerateBrokerIdTest.scala 
PRE-CREATION 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/utils/TestUtils.scala 
4d01d25c73e1f986f58801296015efc491f9e45a 

Diff: https://reviews.apache.org/r/23702/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Created] (KAFKA-1556) Sorry it's Just Test

2014-07-24 Thread sungjubong (JIRA)
sungjubong created KAFKA-1556:
-

 Summary: Sorry it's Just Test
 Key: KAFKA-1556
 URL: https://issues.apache.org/jira/browse/KAFKA-1556
 Project: Kafka
  Issue Type: Test
  Components: core
Reporter: sungjubong
Priority: Trivial


Sorry this is Test for me.
I Want to Commiter so i'm Test now

Sorry.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Build failed in Jenkins: Kafka-trunk #232

2014-07-24 Thread Apache Jenkins Server
See 

Changes:

[jay.kreps] KAFKA-1544 Log cleaner takes a long time to shut down. Patch from 
Manikumar Reddy.

[neha.narkhede] KAFKA-1483 Split Brain about Leader Partitions; reviewed by 
Guozhang, Jun and Neha

--
[...truncated 1031 lines...]
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:125)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:32)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest > testReassigningNonExistingPartition FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:144)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:125)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:32)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest > testResumePartitionReassignmentThatWasCompleted FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:144)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:125)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:32)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest > testPreferredReplicaJsonData FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:144)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:125)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:32)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest > testBasicPreferredReplicaElection FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:144)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:125)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:32)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest > testShutdownBroker FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:144)
at 
org.apache.zookeeper.server.NIOServerCnxn$Factory.(NIOServerCnxn.java:125)
at kafka.zk.EmbeddedZookeeper.(EmbeddedZookeeper.scala:32)
at 
kafka.zk.ZooKeeperTestHarness$class.setUp(ZooKeeperTestHarness.scala:33)
at kafka.admin.AdminTest.setUp(AdminTest.scala:33)

kafka.admin.AdminTest > testTopicConfigChange FAILED
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind(Native Method)
at 
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:124)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:59)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:52)
at 
org.apache.zookeeper.server.NIOSe

[jira] [Resolved] (KAFKA-1483) Split Brain about Leader Partitions

2014-07-24 Thread Neha Narkhede (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1483?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-1483.
--

Resolution: Fixed
  Assignee: Sriharsha Chintalapani  (was: Neha Narkhede)

Sorry about the delay. Pushed the latest reviewed patch to trunk.

> Split Brain about Leader Partitions
> ---
>
> Key: KAFKA-1483
> URL: https://issues.apache.org/jira/browse/KAFKA-1483
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Guozhang Wang
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.9.0
>
> Attachments: KAFKA-1483.patch, KAFKA-1483_2014-07-16_11:07:44.patch
>
>
> Today in the server there are two places storing the leader partition info:
> 1) leaderPartitions list in the ReplicaManager.
> 2) leaderBrokerIdOpt in the Partition.
> 1) is used as the ground truth to decide if the server is the current leader 
> for serving requests; 2) is used as the ground truth for reporting leader 
> counts metrics, etc and for the background Shrinking-ISR thread to decide 
> which partition to check. There is a risk that these two ground truth caches 
> are not consistent, and we'd better only make one of them as the ground truth.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Resolved] (KAFKA-1544) LogCleaner may take a long time to shutdown

2014-07-24 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1544.
--

   Resolution: Fixed
Fix Version/s: 0.8.2

Committed, thanks!

> LogCleaner may take a long time to shutdown
> ---
>
> Key: KAFKA-1544
> URL: https://issues.apache.org/jira/browse/KAFKA-1544
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Manikumar Reddy
>  Labels: newbie
> Fix For: 0.8.2
>
> Attachments: KAFKA-1544.patch, KAFKA-1544_2014-07-24_20:38:52.patch
>
>
> We have the following code in LogCleaner. Since the cleaner thread is 
> shutdown w/o interrupt. If may take up to backoff time for the cleaner thread 
> to detect the shutdown flag.
> private def cleanOrSleep() {
>   cleanerManager.grabFilthiestLog() match {
> case None =>
>   // there are no cleanable logs, sleep a while
>   time.sleep(config.backOffMs)
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1527) SimpleConsumer should be transaction-aware

2014-07-24 Thread Raul Castro Fernandez (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Raul Castro Fernandez updated KAFKA-1527:
-

Attachment: KAFKA-1527.patch

> SimpleConsumer should be transaction-aware
> --
>
> Key: KAFKA-1527
> URL: https://issues.apache.org/jira/browse/KAFKA-1527
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Joel Koshy
>Assignee: Raul Castro Fernandez
>  Labels: transactions
> Attachments: KAFKA-1527.patch
>
>
> This will help in further integration testing of the transactional producer. 
> This could be implemented in the consumer-iterator level or at a higher level.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1527) SimpleConsumer should be transaction-aware

2014-07-24 Thread Raul Castro Fernandez (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073830#comment-14073830
 ] 

Raul Castro Fernandez commented on KAFKA-1527:
--

Created reviewboard https://reviews.apache.org/r/23906/diff/
 against branch origin/transactional_messaging

> SimpleConsumer should be transaction-aware
> --
>
> Key: KAFKA-1527
> URL: https://issues.apache.org/jira/browse/KAFKA-1527
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Joel Koshy
>Assignee: Raul Castro Fernandez
>  Labels: transactions
> Attachments: KAFKA-1527.patch
>
>
> This will help in further integration testing of the transactional producer. 
> This could be implemented in the consumer-iterator level or at a higher level.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Review Request 23906: Patch for KAFKA-1527

2014-07-24 Thread Raul Castro Fernandez

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23906/
---

Review request for kafka.


Bugs: KAFKA-1527
https://issues.apache.org/jira/browse/KAFKA-1527


Repository: kafka


Description
---

KAFKA-1527; SimpleConsumer should be transaction aware


Diffs
-

  bin/kafka-simple-consumer-perf-test.sh PRE-CREATION 
  core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
1cf2f62ba02e4aa66bfa7575865e5d57baf82212 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala 
7602b8d705970a5dab49ed36d117346a960701ac 

Diff: https://reviews.apache.org/r/23906/diff/


Testing
---


Thanks,

Raul Castro Fernandez



Review Request 23905: Patch for KAFKA-1527

2014-07-24 Thread Raul Castro Fernandez

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23905/
---

Review request for kafka.


Bugs: KAFKA-1527
https://issues.apache.org/jira/browse/KAFKA-1527


Repository: kafka


Description
---

KAFKA-1527; SimpleConsumer should be transaction aware


Diffs
-

  bin/kafka-simple-consumer-perf-test.sh PRE-CREATION 
  core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
1cf2f62ba02e4aa66bfa7575865e5d57baf82212 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala 
7602b8d705970a5dab49ed36d117346a960701ac 

Diff: https://reviews.apache.org/r/23905/diff/


Testing
---


Thanks,

Raul Castro Fernandez



[jira] [Comment Edited] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073666#comment-14073666
 ] 

Joe Stein edited comment on KAFKA-1419 at 7/24/14 9:15 PM:
---

Thanks [~joecrobak] for the feedback/review. Yes, we can drop support for Scala 
2.8 https://issues.apache.org/jira/browse/KAFKA-1399 lets do that in this 
ticket.

[~edgefox] can you take care of that and the other feedback so far and that 
comes along when you get back to this too please.  Thanks!




was (Author: joestein):
Thanks [~joecrobak] for the feedback/review. Yes, we can drop support for Scala 
2.8 https://issues.apache.org/jira/browse/KAFKA-1399 lets do that in this 
ticket.

[~edgefox] can you take care of that and the other feedback so far too please.  
Thanks!



> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch, KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073666#comment-14073666
 ] 

Joe Stein commented on KAFKA-1419:
--

Thanks [~joecrobak] for the feedback/review. Yes, we can drop support for Scala 
2.8 https://issues.apache.org/jira/browse/KAFKA-1399 lets do that in this 
ticket.

[~edgefox] can you take care of that and the other feedback so far too please.  
Thanks!



> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch, KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Jon Bringhurst (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073665#comment-14073665
 ] 

Jon Bringhurst commented on KAFKA-1070:
---

[~clarkhaskins], [~sriharsha], having ReservedBrokerIdMaxValue as a 
configurable option (perhaps with a default of 1000) would probably solve 
clark's use case.

Also, if I'm reading this right, it will ignore the broker ID if it's set in 
the config but out of the reserved range (set to -1). It will then create a 
generated broker ID to overwrite the one in the config. It might be nice to 
have the broker error out on startup if a broker.id is set in the config, but 
is out of range. A new broker ID should only be generated if an id isn't 
specified in the config AND the meta.properties doesn't exist (otherwise the 
config file wouldn't match the actual broker ID).

{noformat}
 var brokerId: Int = if (props.containsKey("broker.id")) 
props.getIntInRange("broker.id", (0, ReservedBrokerIdMaxValue)) else -1

... later on ...

} else if(metaBrokerIdSet.size == 0) {
  if(brokerId < 0) {
brokerId = ZkUtils.getBrokerSequenceId(zkClient)
  } else {
{noformat}

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Jon Bringhurst (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073665#comment-14073665
 ] 

Jon Bringhurst edited comment on KAFKA-1070 at 7/24/14 9:16 PM:


[~clarkhaskins], [~sriharsha], having ReservedBrokerIdMaxValue as a 
configurable option (perhaps with a default of 1000) would probably solve 
clark's use case.

-Also, if I'm reading this right, it will ignore the broker ID if it's set in 
the config but out of the reserved range (set to -1). It will then create a 
generated broker ID to overwrite the one in the config. It might be nice to 
have the broker error out on startup if a broker.id is set in the config, but 
is out of range. A new broker ID should only be generated if an id isn't 
specified in the config AND the meta.properties doesn't exist (otherwise the 
config file wouldn't match the actual broker ID).- Edit: nevermind, my Scala 
reading skills aren't up to par. This isn't the case here.


was (Author: jonbringhurst):
[~clarkhaskins], [~sriharsha], having ReservedBrokerIdMaxValue as a 
configurable option (perhaps with a default of 1000) would probably solve 
clark's use case.

Also, if I'm reading this right, it will ignore the broker ID if it's set in 
the config but out of the reserved range (set to -1). It will then create a 
generated broker ID to overwrite the one in the config. It might be nice to 
have the broker error out on startup if a broker.id is set in the config, but 
is out of range. A new broker ID should only be generated if an id isn't 
specified in the config AND the meta.properties doesn't exist (otherwise the 
config file wouldn't match the actual broker ID).

{noformat}
 var brokerId: Int = if (props.containsKey("broker.id")) 
props.getIntInRange("broker.id", (0, ReservedBrokerIdMaxValue)) else -1

... later on ...

} else if(metaBrokerIdSet.size == 0) {
  if(brokerId < 0) {
brokerId = ZkUtils.getBrokerSequenceId(zkClient)
  } else {
{noformat}

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1399) Drop Scala 2.8.x support

2014-07-24 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073661#comment-14073661
 ] 

Joe Stein commented on KAFKA-1399:
--

+1 to drop scala 2.8 support, lets do it in the change for Scala 2.11 
https://issues.apache.org/jira/browse/KAFKA-1419 though

> Drop Scala 2.8.x support
> 
>
> Key: KAFKA-1399
> URL: https://issues.apache.org/jira/browse/KAFKA-1399
> Project: Kafka
>  Issue Type: Task
>  Components: packaging
>Affects Versions: 0.8.1
>Reporter: Stevo Slavic
>  Labels: gradle, scala
>
> It's been almost 4 years since [Scala 2.8 has been 
> released|http://www.scala-lang.org/old/node/7009] and 3 years since [Scala 
> 2.9 has been released|http://www.scala-lang.org/old/node/9483], so there was 
> more than plenty of time to migrate.
> Continued support of old Scala 2.8 is causing issues like 
> [this|https://issues.apache.org/jira/browse/KAFKA-1362?focusedCommentId=13970390&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13970390].



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Joe Crobak (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073651#comment-14073651
 ] 

Joe Crobak commented on KAFKA-1419:
---

I tried out this patch earlier today, and I have a few pieces of feedback:

* I'm not sure it's possible to support both scala 2.8 and 2.11 at the same 
time while using the zinc compiler. 2.8 seems to require gradle 1.6/zinc 0.2.5, 
and 2.11 requires gradle 2.0/zinc 0.3.0+. It's possible that there's a version 
in between that supports them, but I didn't do the binary search. Switching to 
{{scalaCompileOptions.useAnt = true}} seems to compile all scala versions for 
me.
* I think the {{jarAll}}, {{srcJarAll}} and other `*All` tasks should be 
updated to include scala 2.11.
* scala 2.9 support seems to be broken by two things: 1) several {{else{}}} 
clauses were formerly scala 2.9 specific but were converted to be scala 2.11 
specific. 2) wrong zinc version being selected.
* Moving {{BeanProperty}} from {{scala.reflect}} to {{scala.beans}} is 
backwards incompatible with scala <2.10. it's likely better to remove the 
{{BeanProperty}} dep and implement {{def getBuffer = buffer}} in those two 
places.
* scala.gradle needs an override for 2.11 like it has for 2.10.

I have a feeling that trying to support 4 versions of scala is going to be a 
pain in the long run—when can scala 2.8 (and also 2.9) be dropped?

> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch, KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

2014-07-24 Thread Anton Karamanov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073649#comment-14073649
 ] 

Anton Karamanov commented on KAFKA-1414:


So what about the last version of a patch — does it look good, any feedback?

> Speedup broker startup after hard reset
> ---
>
> Key: KAFKA-1414
> URL: https://issues.apache.org/jira/browse/KAFKA-1414
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2, 0.9.0
>Reporter: Dmitry Bugaychenko
>Assignee: Jay Kreps
> Fix For: 0.8.2
>
> Attachments: 
> 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
> KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
> KAFKA-1414-rev3-interleaving.patch, KAFKA-1414-rev3.patch, 
> KAFKA-1414-rev4.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
> parallel-dir-loading-trunk-fixed-threadpool.patch, 
> parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch
>
>
> After hard reset due to power failure broker takes way too much time 
> recovering unflushed segments in a single thread. This could be easiliy 
> improved launching multiple threads (one per data dirrectory, assuming that 
> typically each data directory is on a dedicated drive). Localy we trie this 
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
> to scala, so do not take it literally:
> {code}
>   /**
>* Recover and load all logs in the given data directories
>*/
>   private def loadLogs(dirs: Seq[File]) {
> val threads : Array[Thread] = new Array[Thread](dirs.size)
> var i: Int = 0
> val me = this
> for(dir <- dirs) {
>   val thread = new Thread( new Runnable {
> def run()
> {
>   val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>   /* load the logs */
>   val subDirs = dir.listFiles()
>   if(subDirs != null) {
> val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
> if(cleanShutDownFile.exists())
>   info("Found clean shutdown file. Skipping recovery for all logs 
> in data directory '%s'".format(dir.getAbsolutePath))
> for(dir <- subDirs) {
>   if(dir.isDirectory) {
> info("Loading log '" + dir.getName + "'")
> val topicPartition = Log.parseTopicPartitionName(dir.getName)
> val config = topicConfigs.getOrElse(topicPartition.topic, 
> defaultConfig)
> val log = new Log(dir,
>   config,
>   recoveryPoints.getOrElse(topicPartition, 0L),
>   scheduler,
>   time)
> val previous = addLogWithLock(topicPartition, log)
> if(previous != null)
>   throw new IllegalArgumentException("Duplicate log 
> directories found: %s, %s!".format(log.dir.getAbsolutePath, 
> previous.dir.getAbsolutePath))
>   }
> }
> cleanShutDownFile.delete()
>   }
> }
>   })
>   thread.start()
>   threads(i) = thread
>   i = i + 1
> }
> for(thread <- threads) {
>   thread.join()
> }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
> logCreationOrDeletionLock synchronized {
>   this.logs.put(topicPartition, log)
> }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1070) Auto-assign node id

2014-07-24 Thread Clark Haskins (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073563#comment-14073563
 ] 

Clark Haskins commented on KAFKA-1070:
--

We currently have node IDs in the 0-1 range. I think some better logic for 
identifying used node IDs is necessary rather than just starting from 1000. 

> Auto-assign node id
> ---
>
> Key: KAFKA-1070
> URL: https://issues.apache.org/jira/browse/KAFKA-1070
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
>  Labels: usability
> Attachments: KAFKA-1070.patch, KAFKA-1070_2014-07-19_16:06:13.patch, 
> KAFKA-1070_2014-07-22_11:34:18.patch
>
>
> It would be nice to have Kafka brokers auto-assign node ids rather than 
> having that be a configuration. Having a configuration is irritating because 
> (1) you have to generate a custom config for each broker and (2) even though 
> it is in configuration, changing the node id can cause all kinds of bad 
> things to happen.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] Subscription: outstanding kafka patches

2014-07-24 Thread jira
Issue Subscription
Filter: outstanding kafka patches (111 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-1550  Patch review tool should use git format-patch to generate patch
https://issues.apache.org/jira/browse/KAFKA-1550
KAFKA-1549  dead brokers coming in the TopicMetadataResponse
https://issues.apache.org/jira/browse/KAFKA-1549
KAFKA-1544  LogCleaner may take a long time to shutdown
https://issues.apache.org/jira/browse/KAFKA-1544
KAFKA-1543  Changing replication factor
https://issues.apache.org/jira/browse/KAFKA-1543
KAFKA-1541   Add transactional request definitions to clients package
https://issues.apache.org/jira/browse/KAFKA-1541
KAFKA-1536  Change the status of the JIRA to "Patch Available" in the 
kafka-review-tool
https://issues.apache.org/jira/browse/KAFKA-1536
KAFKA-1528  Normalize all the line endings
https://issues.apache.org/jira/browse/KAFKA-1528
KAFKA-1526  Producer performance tool should have an option to enable 
transactions
https://issues.apache.org/jira/browse/KAFKA-1526
KAFKA-1525  DumpLogSegments should print transaction IDs
https://issues.apache.org/jira/browse/KAFKA-1525
KAFKA-1524  Implement transactional producer
https://issues.apache.org/jira/browse/KAFKA-1524
KAFKA-1523  Implement transaction manager module
https://issues.apache.org/jira/browse/KAFKA-1523
KAFKA-1522  Transactional messaging request/response definitions
https://issues.apache.org/jira/browse/KAFKA-1522
KAFKA-1517  Messages is a required argument to Producer Performance Test
https://issues.apache.org/jira/browse/KAFKA-1517
KAFKA-1509  Restart of destination broker after unreplicated partition move 
leaves partitions without leader
https://issues.apache.org/jira/browse/KAFKA-1509
KAFKA-1507  Using GetOffsetShell against non-existent topic creates the topic 
unintentionally
https://issues.apache.org/jira/browse/KAFKA-1507
KAFKA-1500  adding new consumer requests using the new protocol
https://issues.apache.org/jira/browse/KAFKA-1500
KAFKA-1498  new producer performance and bug improvements
https://issues.apache.org/jira/browse/KAFKA-1498
KAFKA-1496  Using batch message in sync producer only sends the first message 
if we use a Scala Stream as the argument 
https://issues.apache.org/jira/browse/KAFKA-1496
KAFKA-1483  Split Brain about Leader Partitions
https://issues.apache.org/jira/browse/KAFKA-1483
KAFKA-1481  Stop using dashes AND underscores as separators in MBean names
https://issues.apache.org/jira/browse/KAFKA-1481
KAFKA-1477  add authentication layer and initial JKS x509 implementation for 
brokers, producers and consumer for network communication
https://issues.apache.org/jira/browse/KAFKA-1477
KAFKA-1476  Get a list of consumer groups
https://issues.apache.org/jira/browse/KAFKA-1476
KAFKA-1475  Kafka consumer stops LeaderFinder/FetcherThreads, but application 
does not know
https://issues.apache.org/jira/browse/KAFKA-1475
KAFKA-1471  Add Producer Unit Tests for LZ4 and LZ4HC compression
https://issues.apache.org/jira/browse/KAFKA-1471
KAFKA-1468  Improve perf tests
https://issues.apache.org/jira/browse/KAFKA-1468
KAFKA-1460  NoReplicaOnlineException: No replica for partition
https://issues.apache.org/jira/browse/KAFKA-1460
KAFKA-1450  check invalid leader in a more robust way
https://issues.apache.org/jira/browse/KAFKA-1450
KAFKA-1430  Purgatory redesign
https://issues.apache.org/jira/browse/KAFKA-1430
KAFKA-1419  cross build for scala 2.11
https://issues.apache.org/jira/browse/KAFKA-1419
KAFKA-1414  Speedup broker startup after hard reset
https://issues.apache.org/jira/browse/KAFKA-1414
KAFKA-1394  Ensure last segment isn't deleted on expiration when there are 
unflushed messages
https://issues.apache.org/jira/browse/KAFKA-1394
KAFKA-1372  Upgrade to Gradle 1.10
https://issues.apache.org/jira/browse/KAFKA-1372
KAFKA-1367  Broker topic metadata not kept in sync with ZooKeeper
https://issues.apache.org/jira/browse/KAFKA-1367
KAFKA-1351  String.format is very expensive in Scala
https://issues.apache.org/jira/browse/KAFKA-1351
KAFKA-1343  Kafka consumer iterator thread stalls
https://issues.apache.org/jira/browse/KAFKA-1343
KAFKA-1330  Implement subscribe(TopicPartition...partitions) in the new consumer
https://issues.apache.org/jira/browse/KAFKA-1330
KAFKA-1329  Add metadata fetch and refresh functionality to the consumer
https://issues.apache.org/jira/browse/KAFKA-1329
KAFKA-1324  Debian packaging
https://issues.apache.org/jira/browse/KAFKA-1324
KAFKA-1303  metadata request in the new producer can be d

[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073478#comment-14073478
 ] 

Guozhang Wang commented on KAFKA-1211:
--

What I meant is that the ack=-1 should be already handled in KAFKA-1430, as we 
are not wait for leader HW. Right?

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073449#comment-14073449
 ] 

Jun Rao commented on KAFKA-1211:


The issue is that this problem not only affects ack>1, but only affects ack=-1. 
Suppose that you have 3 replicas A, B, and C and A is the leader initially. If 
A fails and B takes over as the new leader, C will first truncate its log, 
which could include committed data. Now, if immediately after the truncation, B 
fails, C has to be the new leader. Now, we may have lost previously committed 
messages, even though we had only 2 failures.

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073422#comment-14073422
 ] 

Guozhang Wang commented on KAFKA-1211:
--

Jun, I think in your review of KAFKA-1430's patch, you are already suggesting 
to wait for leader HW to be larger than the produce offset instead of just log 
end offset for ack=-1.

So as for ack > 1, but not = to num.replicas, since data loss may happen 
anyways because of the leader election logic may choose a follower which does 
not have all the committed data, this issue would just potentially increase the 
data loss by a bit under such scenarios. For its complexity and the benefit 
maybe it is not an optimization worth doing?

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao updated KAFKA-1211:
---

Fix Version/s: 0.9.0

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.9.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-07-24 Thread Balaji Seshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073400#comment-14073400
 ] 

Balaji Seshadri commented on KAFKA-328:
---

[~nehanarkhede] i didnot see any such unit tests i have mentioned in my comment 
in current codebase.

> Write unit test for kafka server startup and shutdown API 
> --
>
> Key: KAFKA-328
> URL: https://issues.apache.org/jira/browse/KAFKA-328
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: BalajiSeshadri
>  Labels: newbie
>
> Background discussion in KAFKA-320
> People often try to embed KafkaServer in an application that ends up calling 
> startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
> works correctly we have to be very careful about cleaning up resources. This 
> is a good practice for making unit tests reliable anyway.
> A good first step would be to add some unit tests on startup and shutdown to 
> cover various cases:
> 1. A Kafka server can startup if it is not already starting up, if it is not 
> currently being shutdown, or if it hasn't been already started
> 2. A Kafka server can shutdown if it is not already shutting down, if it is 
> not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-42) Support rebalancing the partitions with replication

2014-07-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073394#comment-14073394
 ] 

Guozhang Wang commented on KAFKA-42:


I think this is just due to the review-tool, which use the magic number "42" 
when no jira number is specified.

> Support rebalancing the partitions with replication
> ---
>
> Key: KAFKA-42
> URL: https://issues.apache.org/jira/browse/KAFKA-42
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: features
> Fix For: 0.8.0
>
> Attachments: KAFKA-42.patch, KAFKA-42_2014-07-24_15:36:04.patch, 
> KAFKA-42_2014-07-24_15:37:26.patch, kafka-42-v1.patch, kafka-42-v2.patch, 
> kafka-42-v3.patch, kafka-42-v4.patch, kafka-42-v5.patch
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from 
> one set of brokers to another, online.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-328) Write unit test for kafka server startup and shutdown API

2014-07-24 Thread Balaji Seshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073391#comment-14073391
 ] 

Balaji Seshadri commented on KAFKA-328:
---

[~nehanarkhede] can you please advise on my comments on this unit test ?.

> Write unit test for kafka server startup and shutdown API 
> --
>
> Key: KAFKA-328
> URL: https://issues.apache.org/jira/browse/KAFKA-328
> Project: Kafka
>  Issue Type: Bug
>Reporter: Neha Narkhede
>Assignee: BalajiSeshadri
>  Labels: newbie
>
> Background discussion in KAFKA-320
> People often try to embed KafkaServer in an application that ends up calling 
> startup() and shutdown() repeatedly and sometimes in odd ways. To ensure this 
> works correctly we have to be very careful about cleaning up resources. This 
> is a good practice for making unit tests reliable anyway.
> A good first step would be to add some unit tests on startup and shutdown to 
> cover various cases:
> 1. A Kafka server can startup if it is not already starting up, if it is not 
> currently being shutdown, or if it hasn't been already started
> 2. A Kafka server can shutdown if it is not already shutting down, if it is 
> not currently starting up, or if it hasn't been already shutdown. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-42) Support rebalancing the partitions with replication

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073383#comment-14073383
 ] 

Jun Rao commented on KAFKA-42:
--

This jira is already closed. Is the patch for this jira?

> Support rebalancing the partitions with replication
> ---
>
> Key: KAFKA-42
> URL: https://issues.apache.org/jira/browse/KAFKA-42
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: features
> Fix For: 0.8.0
>
> Attachments: KAFKA-42.patch, KAFKA-42_2014-07-24_15:36:04.patch, 
> KAFKA-42_2014-07-24_15:37:26.patch, kafka-42-v1.patch, kafka-42-v2.patch, 
> kafka-42-v3.patch, kafka-42-v4.patch, kafka-42-v5.patch
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from 
> one set of brokers to another, online.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1476) Get a list of consumer groups

2014-07-24 Thread Balaji Seshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073374#comment-14073374
 ] 

Balaji Seshadri commented on KAFKA-1476:


Hi  [~neha.narkh...@gmail.com],

Looks like we are in design mode before going into implementation.Removing 
offset checker and making ConsumerCommand for handling all management related 
to consumers is definitely good idea.

This would add below tasks

1) --list -> will list all consumer groups

2) --describeGroup  -> describes about this group,who are all the 
consumers of the group whats the lag on it.

3) --resetOffset  --group  --groupName -> this might reset the 
offset.

This would be good list to add as usage would be easy.Also it would be better 
expose these as management API's instead of just main program,so anybody can 
build UI on top it.
I added the list all already for remaining ones i need more inputs on where can 
i get the data.

Please advise.





> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: BalajiSeshadri
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1476) Get a list of consumer groups

2014-07-24 Thread Balaji Seshadri (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073374#comment-14073374
 ] 

Balaji Seshadri edited comment on KAFKA-1476 at 7/24/14 4:50 PM:
-

Hi  [~nehanarkhede],

Looks like we are in design mode before going into implementation.Removing 
offset checker and making ConsumerCommand for handling all management related 
to consumers is definitely good idea.

This would add below tasks

1) --list -> will list all consumer groups

2) --describeGroup  -> describes about this group,who are all the 
consumers of the group whats the lag on it.

3) --resetOffset  --group  --groupName -> this might reset the 
offset.

This would be good list to add as usage would be easy.Also it would be better 
expose these as management API's instead of just main program,so anybody can 
build UI on top it.
I added the list all already for remaining ones i need more inputs on where can 
i get the data.

Please advise.






was (Author: balaji.seshadri):
Hi  [~neha.narkh...@gmail.com],

Looks like we are in design mode before going into implementation.Removing 
offset checker and making ConsumerCommand for handling all management related 
to consumers is definitely good idea.

This would add below tasks

1) --list -> will list all consumer groups

2) --describeGroup  -> describes about this group,who are all the 
consumers of the group whats the lag on it.

3) --resetOffset  --group  --groupName -> this might reset the 
offset.

This would be good list to add as usage would be easy.Also it would be better 
expose these as management API's instead of just main program,so anybody can 
build UI on top it.
I added the list all already for remaining ones i need more inputs on where can 
i get the data.

Please advise.





> Get a list of consumer groups
> -
>
> Key: KAFKA-1476
> URL: https://issues.apache.org/jira/browse/KAFKA-1476
> Project: Kafka
>  Issue Type: Wish
>  Components: tools
>Affects Versions: 0.8.1.1
>Reporter: Ryan Williams
>Assignee: BalajiSeshadri
>  Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1476-RENAME.patch, KAFKA-1476.patch
>
>
> It would be useful to have a way to get a list of consumer groups currently 
> active via some tool/script that ships with kafka. This would be helpful so 
> that the system tools can be explored more easily.
> For example, when running the ConsumerOffsetChecker, it requires a group 
> option
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic test --group 
> ?
> But, when just getting started with kafka, using the console producer and 
> consumer, it is not clear what value to use for the group option.  If a list 
> of consumer groups could be listed, then it would be clear what value to use.
> Background:
> http://mail-archives.apache.org/mod_mbox/kafka-users/201405.mbox/%3cCAOq_b1w=slze5jrnakxvak0gu9ctdkpazak1g4dygvqzbsg...@mail.gmail.com%3e



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Comment Edited] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073346#comment-14073346
 ] 

Jun Rao edited comment on KAFKA-1211 at 7/24/14 4:34 PM:
-

Yes, this is a potential problem. Waiting for HW to be propagated to the 
followers will introduce another round of network delay on every message to be 
committed though. The following is another potential solution that avoid this 
overhead.

Note that the follower in ISR always has all committed messages. On follower 
startup, if we can figure out accurately which messages are committed and which 
ones are not, we won't unnecessarily truncate committed messages. Not that when 
a follower takes over as the new leader, it always tries to commit all existing 
messages that are obtained from the previous generation of the leader. After 
that, it will start committing new messages received in its own generation. If 
we can track the leader generation of each message, we can do the truncation 
accurately. To do that, in each replica, we maintain a leader generation vector 
that contains the leader generation id and its starting offset (the offset of 
the first message written by the leader in that generation) and we persist that 
vector in a LeaderGeneration file locally.

If a replica becomes a leader, before it accepts any new message, it first 
appends the current leader generation id and its current log end offset to the 
LeaderGeneration file. If a replica becomes a follower, it first gets the 
leader generation vector from the leader and then determines the offset where 
its highest leader generation ends in the leader. It will then truncate its log 
up to that offset (if there are messages beyond that offset). After that, the 
follower will store the leader generation vector obtained from the leader in 
its local LeaderGeneration file and starts fetching messages from the leader 
from its log end offset.

Let's consider a couple of examples. 

Example 1. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offset   AB
1   m1  m1
2   m2
{noformat}
 
Let's assume that message m1 is committed, but m2 is not. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0m1  m1
1m2  m3
2m4
{noformat}

When replica A comes back, it's important for A to get rid of m2 from offset 1 
since m2 is never committed. In this case, the leader generation vector in A 
and B will look like the following.

{noformat}
 AB
leaderGenId   startOffsetleaderGenId   startOffset
1 01   0
   2   1
{noformat}

By comparing A's leader generation vector with that from the current leader B, 
A knows that its latest messages are produced by the leader in generation 1, 
which ends at offset 0. So any message in its local log after offset 0 are not 
committed and can be truncated. Any message at or before offset 0 is guaranteed 
to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 
from B afterward. At that point, A's log is consistent with that of B. All 
committed messages are preserved and all uncommitted messages are removed.

Example 2. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offsetAB
1  m1  m1
2  m2  m2
{noformat}
 
Let's assume that both message m1 and m2 are committed. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0  m1  m1
1  m2  m2
2  m3
3  m4
{noformat}

In this case, the leader generation vector in A and B will look like the 
following.

{noformat}
 A   B
leaderGenId   startOffsetleaderGenId   startOffset
1 010
   22
{noformat}

When A comes back, by comparing its leader generation vector with that from the 
current leader B, A knows that its latest messages are produced by the leader 
in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and 
get m3 and m4 from B. Again, this will make A's log consistent with B.

This approach doesn't pay the extra network roundtrip to commit a message. The 
becoming follower process will be a bit slower since It now needs to issue a 
new request to get the leader vector before it can start fetching from the 
leader. However, since leader changes are rare, this probably provides a bet

[jira] [Comment Edited] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073346#comment-14073346
 ] 

Jun Rao edited comment on KAFKA-1211 at 7/24/14 4:32 PM:
-

Yes, this is a potential problem. Waiting for HW to be propagated to the 
followers will introduce another round of network delay on every message to be 
committed though. The following is another potential solution that avoid this 
overhead.

Note that the follower in ISR always has all committed messages. On follower 
startup, if we can figure out accurately which messages are committed and which 
ones are not, we won't unnecessarily truncate committed messages. Not that when 
a follower takes over as the new leader, it always tries to commit all existing 
messages that are obtained from the previous generation of the leader. After 
that, it will start committing new messages received in its own generation. If 
we can track the leader generation of each message, we can do the truncation 
accurately. To do that, in each replica, we maintain a leader generation vector 
that contains the leader generation id and its starting offset (the offset of 
the first message written by the leader in that generation) and we persist that 
vector in a LeaderGeneration file locally.

If a replica becomes a leader, before it accepts any new message, it first 
appends the current leader generation id and its current log end offset to the 
LeaderGeneration file. If a replica becomes a follower, it first gets the 
leader generation vector from the leader and then determines the offset where 
its highest leader generation ends in the leader. It will then truncate its log 
up to that offset (if there are messages beyond that offset). After that, the 
follower will store the leader generation vector obtained from the leader in 
its local LeaderGeneration file and starts fetching messages from the leader 
from its log end offset.

Let's consider a couple of examples. 

Example 1. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offset   AB
1   m1  m1
2   m2
{noformat}
 
Let's assume that message m1 is committed, but m2 is not. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0m1  m1
1m2  m3
2m4
{noformat}

When replica A comes back, it's important for A to get rid of m2 from offset 1 
since m2 is never committed. In this case, the leader generation vector in A 
and B will look like the following.

{noformat}
 AB
leaderGenId   startOffsetleaderGenId   startOffset
1 0 1 0
2 1
{noformat}

By comparing A's leader generation vector with that from the current leader B, 
A knows that its latest messages are produced by the leader in generation 1, 
which ends at offset 0. So any message in its local log after offset 0 are not 
committed and can be truncated. Any message at or before offset 0 is guaranteed 
to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 
from B afterward. At that point, A's log is consistent with that of B. All 
committed messages are preserved and all uncommitted messages are removed.

Example 2. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offsetAB
1  m1  m1
2  m2  m2
{noformat}
 
Let's assume that both message m1 and m2 are committed. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0  m1  m1
1  m2  m2
2  m3
3  m4
{noformat}

In this case, the leader generation vector in A and B will look like the 
following.

{noformat}
 A  B
leaderGenId   startOffsetleaderGenId   startOffset
1 0 1 0
2 2
{noformat}

When A comes back, by comparing its leader generation vector with that from the 
current leader B, A knows that its latest messages are produced by the leader 
in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and 
get m3 and m4 from B. Again, this will make A's log consistent with B.

This approach doesn't pay the extra network roundtrip to commit a message. The 
becoming follower process will be a bit slower since It now needs to issue a 
new request to get the leader vector before it can start fetching from the 
leader. How

[jira] [Comment Edited] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073346#comment-14073346
 ] 

Jun Rao edited comment on KAFKA-1211 at 7/24/14 4:30 PM:
-

Yes, this is a potential problem. Waiting for HW to be propagated to the 
followers will introduce another round of network delay on every message to be 
committed though. The following is another potential solution that avoid this 
overhead.

Note that the follower in ISR always has all committed messages. On follower 
startup, if we can figure out accurately which messages are committed and which 
ones are not, we won't unnecessarily truncate committed messages. Not that when 
a follower takes over as the new leader, it always tries to commit all existing 
messages that are obtained from the previous generation of the leader. After 
that, it will start committing new messages received in its own generation. If 
we can track the leader generation of each message, we can do the truncation 
accurately. To do that, in each replica, we maintain a leader generation vector 
that contains the leader generation id and its starting offset (the offset of 
the first message written by the leader in that generation) and we persist that 
vector in a LeaderGeneration file locally.

If a replica becomes a leader, before it accepts any new message, it first 
appends the current leader generation id and its current log end offset to the 
LeaderGeneration file. If a replica becomes a follower, it first gets the 
leader generation vector from the leader and then determines the offset where 
its highest leader generation ends in the leader. It will then truncate its log 
up to that offset (if there are messages beyond that offset). After that, the 
follower will store the leader generation vector obtained from the leader in 
its local LeaderGeneration file and starts fetching messages from the leader 
from its log end offset.

Let's consider a couple of examples. 

Example 1. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offset   AB
1  m1  m1
2  m2
{noformat}
 
Let's assume that message m1 is committed, but m2 is not. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0   m1  m1
1   m2  m3
2  m4
{noformat}

When replica A comes back, it's important for A to get rid of m2 from offset 1 
since m2 is never committed. In this case, the leader generation vector in A 
and B will look like the following.

{noformat}
 AB
leaderGenId   startOffsetleaderGenId   startOffset
1 0  1 0
 2 1
{noformat}

By comparing A's leader generation vector with that from the current leader B, 
A knows that its latest messages are produced by the leader in generation 1, 
which ends at offset 0. So any message in its local log after offset 0 are not 
committed and can be truncated. Any message at or before offset 0 is guaranteed 
to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 
from B afterward. At that point, A's log is consistent with that of B. All 
committed messages are preserved and all uncommitted messages are removed.

Example 2. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offsetAB
1  m1  m1
2  m2  m2
{noformat}
 
Let's assume that both message m1 and m2 are committed. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0  m1  m1
1  m2  m2
2  m3
3  m4
{noformat}

In this case, the leader generation vector in A and B will look like the 
following.

{noformat}
 A  B
leaderGenId   startOffsetleaderGenId   startOffset
1 0  1 0
 2 2
{noformat}

When A comes back, by comparing its leader generation vector with that from the 
current leader B, A knows that its latest messages are produced by the leader 
in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and 
get m3 and m4 from B. Again, this will make A's log consistent with B.

This approach doesn't pay the extra network roundtrip to commit a message. The 
becoming follower process will be a bit slower since It now needs to issue a 
new request to get the leader vector before it ca

[jira] [Resolved] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

2014-07-24 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1150.
--

Resolution: Duplicate

> Fetch on a replicated topic does not return as soon as possible
> ---
>
> Key: KAFKA-1150
> URL: https://issues.apache.org/jira/browse/KAFKA-1150
> Project: Kafka
>  Issue Type: Bug
>  Components: core, replication
>Affects Versions: 0.8.0
>Reporter: Andrey Balmin
>Assignee: Neha Narkhede
> Attachments: Test.java
>
>
> I see a huge performance difference between replicated and not replicated 
> topics. On my laptop, running two brokers, I see producer-2-consumer latency 
> of under 1ms for topics with one replica. 
> However,  with two replicas the same latency equals to the max fetch delay. 
> Here is a simple test I just did:
> one producer thread in a loop sending one message and sleeping for 2500ms, 
> and one consumer thread looping on the long poll with max fetch delay of 1000 
> ms.
> Here is what happens with no replication:
> Produced 1 key: key1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:53.823
> Consumed up to 1 at time: 15:33:54.825
> Produced 2 key: key2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:56.326
> Consumed up to 2 at time: 15:33:57.328
> Produced 3 key: key3 at time: 15:33:57.827
> Consumed up to 3 at time: 15:33:57.827
> The are no delays between the message being produced and consumed -- this is 
> the behavior I expected. 
> Here is the same test, but for a topic with two replicas:
> Consumed up to 0 at time: 15:50:29.575
> Produced 1 key: key1 at time: 15:50:29.575
> Consumed up to 1 at time: 15:50:30.577
> Consumed up to 1 at time: 15:50:31.579
> Consumed up to 1 at time: 15:50:32.078
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> Consumed up to 2 at time: 15:50:34.081
> Consumed up to 2 at time: 15:50:34.581
> Produced 3 key: key3 at time: 15:50:34.581
> Consumed up to 3 at time: 15:50:35.584
> Notice how the fetch always returns as soon as the produce request is issued, 
> but without the new message, which consistently arrives ~1002 ms later.
> Below is the request log snippet for this part:
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> You can see the first FetchRequest returns at the same time as the replica 
> FetchRequest, but this fetch response is *empty* -- the message is not 
> committed yet, so it cannot be returned. The message is committed at 
> 15:50:32,079. However, the next FetchRequest (that does return the message) 
> comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it 
> waiting for the full 1000 ms, instead of returning right away?
> [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: 
> ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 
> 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
> Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; 
> MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> 
> PartitionFetchInfo(129,1024000) from client 
> /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
> Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; 
> ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] 
> -> PartitionFetchInfo(129,1048576) from client 
> /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: 
> FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; 
> MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> 
> PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; 
> Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 
> 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client 
> /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: 
> FetchRequest; Version: 0; CorrelationId: 3464; ClientId: 
> ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) 
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,5

[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2014-07-24 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073346#comment-14073346
 ] 

Jun Rao commented on KAFKA-1211:


Yes, this is a potential problem. Waiting for HW to be propagated to the 
followers will introduce another round of network delay on every message to be 
committed though. The following is another potential solution that avoid this 
overhead.

Note that the follower in ISR always has all committed messages. On follower 
startup, if we can figure out accurately which messages are committed and which 
ones are not, we won't unnecessarily truncate committed messages. Not that when 
a follower takes over as the new leader, it always tries to commit all existing 
messages that are obtained from the previous generation of the leader. After 
that, it will start committing new messages received in its own generation. If 
we can track the leader generation of each message, we can do the truncation 
accurately. To do that, in each replica, we maintain a leader generation vector 
that contains the leader generation id and its starting offset (the offset of 
the first message written by the leader in that generation) and we persist that 
vector in a LeaderGeneration file locally.

If a replica becomes a leader, before it accepts any new message, it first 
appends the current leader generation id and its current log end offset to the 
LeaderGeneration file. If a replica becomes a follower, it first gets the 
leader generation vector from the leader and then determines the offset where 
its highest leader generation ends in the leader. It will then truncate its log 
up to that offset (if there are messages beyond that offset). After that, the 
follower will store the leader generation vector obtained from the leader in 
its local LeaderGeneration file and starts fetching messages from the leader 
from its log end offset.

Let's consider a couple of examples. 

Example 1. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offsetAB
1  m1  m1
2  m2
{noformat}
 
Let's assume that message m1 is committed, but m2 is not. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0  m1  m1
1  m2  m3
2 m4
{noformat}

When replica A comes back, it's important for A to get rid of m2 from offset 1 
since m2 is never committed. In this case, the leader generation vector in A 
and B will look like the following.

{noformat}
 A  B
leaderGenId   startOffsetleaderGenId   startOffset
1 0  1 0
   2 1
{noformat}

By comparing A's leader generation vector with that from the current leader B, 
A knows that its latest messages are produced by the leader in generation 1, 
which ends at offset 0. So any message in its local log after offset 0 are not 
committed and can be truncated. Any message at or before offset 0 is guaranteed 
to be committed. So, replica A will remove m2 from offset 1 and get m3 and m4 
from B afterward. At that point, A's log is consistent with that of B. All 
committed messages are preserved and all uncommitted messages are removed.

Example 2. Suppose that we have two replicas A and B and A is the leader. At 
some point, we have the following messages in A and B.

{noformat}
offsetAB
1  m1  m1
2  m2  m2
{noformat}
 
Let's assume that both message m1 and m2 are committed. At this point, A dies 
and B takes over as the leader. Let's say B then commits 2 more messages m3 and 
m4.

{noformat}
offsetAB
0  m1  m1
1  m2  m2
2 m3
3 m4
{noformat}

In this case, the leader generation vector in A and B will look like the 
following.

{noformat}
 A  B
leaderGenId   startOffsetleaderGenId   startOffset
1 0  1 0
   2 2
{noformat}

When A comes back, by comparing its leader generation vector with that from the 
current leader B, A knows that its latest messages are produced by the leader 
in generation 1, which ends at offset 1. So, it will keep m2 at offset 1 and 
get m3 and m4 from B. Again, this will make A's log consistent with B.

This approach doesn't pay the extra network roundtrip to commit a message. The 
becoming follower process will be a bit slower since It now needs to issue a 
new request to get the leader vector before it can start fetching from the 

[jira] [Updated] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Ivan Lyutov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Lyutov updated KAFKA-1419:
---

Attachment: KAFKA-1419.patch

> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch, KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Ivan Lyutov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073337#comment-14073337
 ] 

Ivan Lyutov commented on KAFKA-1419:


Created reviewboard https://reviews.apache.org/r/23895/diff/
 against branch apache/trunk

> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch, KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Review Request 23895: Patch for KAFKA-1419

2014-07-24 Thread Ivan Lyutov

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23895/
---

Review request for kafka.


Bugs: KAFKA-1419
https://issues.apache.org/jira/browse/KAFKA-1419


Repository: kafka


Description
---

KAFKA-1419 - cross build for scala 2.11


Diffs
-

  build.gradle a72905df824ba68bed5d5170d18873c23e1782c9 
  core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala 
fecee8d5f7b32f483bb1bfc6a5080d589906f9c4 
  core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 
73401c5ff34d08abce22267aa9c4d86632c6fb74 
  gradle.properties 4827769a3f8e34f0fe7e783eb58e44d4db04859b 
  gradle/buildscript.gradle 225e0a82708bc5f390e5e2c1d4d9a0d06f491b95 
  gradle/wrapper/gradle-wrapper.properties 
610282a699afc89a82203ef0e4e71ecc53761039 

Diff: https://reviews.apache.org/r/23895/diff/


Testing
---


Thanks,

Ivan Lyutov



[jira] [Commented] (KAFKA-1555) provide strong consistency with reasonable availability

2014-07-24 Thread Sudarshan Kadambi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073326#comment-14073326
 ] 

Sudarshan Kadambi commented on KAFKA-1555:
--

Neha and Jun:
I want to make sure we have agreement on the issue (both, that there is an 
issue and what the issue is :) ). We plan to use a replication factor of 3 and 
acks=2. We'd like this to mean that we are tolerant to the loss of 1 machine, 
without loss of published messages or the producer being blocked. 

Let's focus on the following scenario: Let's say, L, F1 and F2 are the leader 
and 2 followers in the ISR. With acks=2, let's say L and F1 have committed all 
published messages and F2 is up to replica.max.lag.messages behind. When L goes 
down, F2 is made the new leader and not F1, even though F1 is up to date with 
the leader. We need to be able to take into account how caught up a given 
broker is in the ISR, when electing a new leader. This is also unclean leader 
election, but of a different type than what we've been discussing. 

> provide strong consistency with reasonable availability
> ---
>
> Key: KAFKA-1555
> URL: https://issues.apache.org/jira/browse/KAFKA-1555
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 0.8.1.1
>Reporter: Jiang Wu
>Assignee: Neha Narkhede
>
> In a mission critical application, we expect a kafka cluster with 3 brokers 
> can satisfy two requirements:
> 1. When 1 broker is down, no message loss or service blocking happens.
> 2. In worse cases such as two brokers are down, service can be blocked, but 
> no message loss happens.
> We found that current kafka versoin (0.8.1.1) cannot achieve the requirements 
> due to its three behaviors:
> 1. when choosing a new leader from 2 followers in ISR, the one with less 
> messages may be chosen as the leader.
> 2. even when replica.lag.max.messages=0, a follower can stay in ISR when it 
> has less messages than the leader.
> 3. ISR can contains only 1 broker, therefore acknowledged messages may be 
> stored in only 1 broker.
> The following is an analytical proof. 
> We consider a cluster with 3 brokers and a topic with 3 replicas, and assume 
> that at the beginning, all 3 replicas, leader A, followers B and C, are in 
> sync, i.e., they have the same messages and are all in ISR.
> According to the value of request.required.acks (acks for short), there are 
> the following cases.
> 1. acks=0, 1, 3. Obviously these settings do not satisfy the requirement.
> 2. acks=2. Producer sends a message m. It's acknowledged by A and B. At this 
> time, although C hasn't received m, C is still in ISR. If A is killed, C can 
> be elected as the new leader, and consumers will miss m.
> 3. acks=-1. B and C restart and are removed from ISR. Producer sends a 
> message m to A, and receives an acknowledgement. Disk failure happens in A 
> before B and C replicate m. Message m is lost.
> In summary, any existing configuration cannot satisfy the requirements.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-42) Support rebalancing the partitions with replication

2014-07-24 Thread Ivan Lyutov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073298#comment-14073298
 ] 

Ivan Lyutov commented on KAFKA-42:
--

Updated reviewboard  against branch apache/0.8.1

> Support rebalancing the partitions with replication
> ---
>
> Key: KAFKA-42
> URL: https://issues.apache.org/jira/browse/KAFKA-42
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: features
> Fix For: 0.8.0
>
> Attachments: KAFKA-42.patch, KAFKA-42_2014-07-24_15:36:04.patch, 
> KAFKA-42_2014-07-24_15:37:26.patch, kafka-42-v1.patch, kafka-42-v2.patch, 
> kafka-42-v3.patch, kafka-42-v4.patch, kafka-42-v5.patch
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from 
> one set of brokers to another, online.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Ivan Lyutov (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073300#comment-14073300
 ] 

Ivan Lyutov commented on KAFKA-1419:


Created reviewboard  against branch apache/0.8.1

> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1419) cross build for scala 2.11

2014-07-24 Thread Ivan Lyutov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Lyutov updated KAFKA-1419:
---

Attachment: KAFKA-1419.patch

> cross build for scala 2.11
> --
>
> Key: KAFKA-1419
> URL: https://issues.apache.org/jira/browse/KAFKA-1419
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.8.1
>Reporter: Scott Clasen
>Assignee: Ivan Lyutov
>Priority: Blocker
> Fix For: 0.8.2
>
> Attachments: KAFKA-1419.patch
>
>
> Please publish builds for scala 2.11, hopefully just needs a small tweak to 
> the gradle conf?



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-42) Support rebalancing the partitions with replication

2014-07-24 Thread Ivan Lyutov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Lyutov updated KAFKA-42:
-

Attachment: KAFKA-42_2014-07-24_15:37:26.patch

> Support rebalancing the partitions with replication
> ---
>
> Key: KAFKA-42
> URL: https://issues.apache.org/jira/browse/KAFKA-42
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: features
> Fix For: 0.8.0
>
> Attachments: KAFKA-42.patch, KAFKA-42_2014-07-24_15:36:04.patch, 
> KAFKA-42_2014-07-24_15:37:26.patch, kafka-42-v1.patch, kafka-42-v2.patch, 
> kafka-42-v3.patch, kafka-42-v4.patch, kafka-42-v5.patch
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from 
> one set of brokers to another, online.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-42) Support rebalancing the partitions with replication

2014-07-24 Thread Ivan Lyutov (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-42?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Lyutov updated KAFKA-42:
-

Attachment: KAFKA-42_2014-07-24_15:36:04.patch

> Support rebalancing the partitions with replication
> ---
>
> Key: KAFKA-42
> URL: https://issues.apache.org/jira/browse/KAFKA-42
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Neha Narkhede
>Priority: Blocker
>  Labels: features
> Fix For: 0.8.0
>
> Attachments: KAFKA-42.patch, KAFKA-42_2014-07-24_15:36:04.patch, 
> kafka-42-v1.patch, kafka-42-v2.patch, kafka-42-v3.patch, kafka-42-v4.patch, 
> kafka-42-v5.patch
>
>   Original Estimate: 240h
>  Remaining Estimate: 240h
>
> As new brokers are added, we need to support moving partition replicas from 
> one set of brokers to another, online.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23858: Patch for KAFKA-1544

2014-07-24 Thread Manikumar Reddy O


> On July 23, 2014, 4:56 p.m., Jay Kreps wrote:
> > Is this better than using a condition? I was imagining replacing sleep with 
> > shutdownCondition.await(backoffMs, TimeUnit.MILLISECONDS) and then in 
> > shutdown() we call shutdownCondition.signal().
> 
> Jay Kreps wrote:
> Actually a condition isn't quite right as there is a subtle race between 
> signal and await. Better would be the same approach but using 
> CountDownLatch.await(backoff, TimeUnit.MS)

Ok..I got your point. I uploaded a new patch which uses CountDownLatch.  For 
this i need to override shutdown method in CleanerThread.


- Manikumar Reddy


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23858/#review48532
---


On July 24, 2014, 3:11 p.m., Manikumar Reddy O wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23858/
> ---
> 
> (Updated July 24, 2014, 3:11 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1544
> https://issues.apache.org/jira/browse/KAFKA-1544
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Used CountDownLatch in CleanerThread to decrease LogCleaner shutdown time 
> during shutdown
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/log/LogCleaner.scala 
> 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
> 
> Diff: https://reviews.apache.org/r/23858/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Manikumar Reddy O
> 
>



[jira] [Commented] (KAFKA-1544) LogCleaner may take a long time to shutdown

2014-07-24 Thread Manikumar Reddy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073282#comment-14073282
 ] 

Manikumar Reddy commented on KAFKA-1544:


Updated reviewboard https://reviews.apache.org/r/23858/diff/
 against branch origin/trunk

> LogCleaner may take a long time to shutdown
> ---
>
> Key: KAFKA-1544
> URL: https://issues.apache.org/jira/browse/KAFKA-1544
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Manikumar Reddy
>  Labels: newbie
> Attachments: KAFKA-1544.patch, KAFKA-1544_2014-07-24_20:38:52.patch
>
>
> We have the following code in LogCleaner. Since the cleaner thread is 
> shutdown w/o interrupt. If may take up to backoff time for the cleaner thread 
> to detect the shutdown flag.
> private def cleanOrSleep() {
>   cleanerManager.grabFilthiestLog() match {
> case None =>
>   // there are no cleanable logs, sleep a while
>   time.sleep(config.backOffMs)
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Updated] (KAFKA-1544) LogCleaner may take a long time to shutdown

2014-07-24 Thread Manikumar Reddy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1544?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar Reddy updated KAFKA-1544:
---

Attachment: KAFKA-1544_2014-07-24_20:38:52.patch

> LogCleaner may take a long time to shutdown
> ---
>
> Key: KAFKA-1544
> URL: https://issues.apache.org/jira/browse/KAFKA-1544
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.8.2
>Reporter: Jun Rao
>Assignee: Manikumar Reddy
>  Labels: newbie
> Attachments: KAFKA-1544.patch, KAFKA-1544_2014-07-24_20:38:52.patch
>
>
> We have the following code in LogCleaner. Since the cleaner thread is 
> shutdown w/o interrupt. If may take up to backoff time for the cleaner thread 
> to detect the shutdown flag.
> private def cleanOrSleep() {
>   cleanerManager.grabFilthiestLog() match {
> case None =>
>   // there are no cleanable logs, sleep a while
>   time.sleep(config.backOffMs)
>  



--
This message was sent by Atlassian JIRA
(v6.2#6252)


Re: Review Request 23858: Patch for KAFKA-1544

2014-07-24 Thread Manikumar Reddy O

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23858/
---

(Updated July 24, 2014, 3:11 p.m.)


Review request for kafka.


Bugs: KAFKA-1544
https://issues.apache.org/jira/browse/KAFKA-1544


Repository: kafka


Description (updated)
---

Used CountDownLatch in CleanerThread to decrease LogCleaner shutdown time 
during shutdown


Diffs (updated)
-

  core/src/main/scala/kafka/log/LogCleaner.scala 
2faa196a4dc612bc634d5ff5f5f275d09073f13b 

Diff: https://reviews.apache.org/r/23858/diff/


Testing
---


Thanks,

Manikumar Reddy O



Re: Gradle hanging on unit test run against ProducerFailureHangingTest->testBrokerFailure

2014-07-24 Thread Guozhang Wang
Hi Dave,

KAFKA-1533 has just been committed targeting at this issue. Did your update
on trunk include this commit?

commit ff05e9b3616a222e29a42f6e8fdf41945a417f41
Author: Guozhang Wang 
Date:   Tue Jul 22 14:14:19 2014 -0700

Guozhang

kafka-1533; transient unit test failure in ProducerFailureHandlingTest;
reviewed by Guozhang Wang; reviewed by Jun Rao




On Thu, Jul 24, 2014 at 5:52 AM, David Corley  wrote:

> Hey all,
> I'm trying my hand at writing some patches for open issues, but I'm running
> into issues with running gradlew test.
> It hangs every time when trying to run testBrokerFailure in the
> ProducerFailureHangingTest suite.
>
> It was working for a time, but I updated to trunk HEAD and it's no longer
> working.
> I'm running on OSX with JDK 1.6.0_65.
> I tried increasing the HeapSize for the test target, but hit the same
> issue.
> Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb
>
> Would appreciate any help.
> Regards,
> Dave
>



-- 
-- Guozhang


[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

2014-07-24 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073276#comment-14073276
 ] 

Guozhang Wang commented on KAFKA-1150:
--

Awesome!

> Fetch on a replicated topic does not return as soon as possible
> ---
>
> Key: KAFKA-1150
> URL: https://issues.apache.org/jira/browse/KAFKA-1150
> Project: Kafka
>  Issue Type: Bug
>  Components: core, replication
>Affects Versions: 0.8.0
>Reporter: Andrey Balmin
>Assignee: Neha Narkhede
> Attachments: Test.java
>
>
> I see a huge performance difference between replicated and not replicated 
> topics. On my laptop, running two brokers, I see producer-2-consumer latency 
> of under 1ms for topics with one replica. 
> However,  with two replicas the same latency equals to the max fetch delay. 
> Here is a simple test I just did:
> one producer thread in a loop sending one message and sleeping for 2500ms, 
> and one consumer thread looping on the long poll with max fetch delay of 1000 
> ms.
> Here is what happens with no replication:
> Produced 1 key: key1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:53.823
> Consumed up to 1 at time: 15:33:54.825
> Produced 2 key: key2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:56.326
> Consumed up to 2 at time: 15:33:57.328
> Produced 3 key: key3 at time: 15:33:57.827
> Consumed up to 3 at time: 15:33:57.827
> The are no delays between the message being produced and consumed -- this is 
> the behavior I expected. 
> Here is the same test, but for a topic with two replicas:
> Consumed up to 0 at time: 15:50:29.575
> Produced 1 key: key1 at time: 15:50:29.575
> Consumed up to 1 at time: 15:50:30.577
> Consumed up to 1 at time: 15:50:31.579
> Consumed up to 1 at time: 15:50:32.078
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> Consumed up to 2 at time: 15:50:34.081
> Consumed up to 2 at time: 15:50:34.581
> Produced 3 key: key3 at time: 15:50:34.581
> Consumed up to 3 at time: 15:50:35.584
> Notice how the fetch always returns as soon as the produce request is issued, 
> but without the new message, which consistently arrives ~1002 ms later.
> Below is the request log snippet for this part:
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> You can see the first FetchRequest returns at the same time as the replica 
> FetchRequest, but this fetch response is *empty* -- the message is not 
> committed yet, so it cannot be returned. The message is committed at 
> 15:50:32,079. However, the next FetchRequest (that does return the message) 
> comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it 
> waiting for the full 1000 ms, instead of returning right away?
> [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: 
> ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 
> 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
> Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; 
> MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> 
> PartitionFetchInfo(129,1024000) from client 
> /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
> Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; 
> ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] 
> -> PartitionFetchInfo(129,1048576) from client 
> /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: 
> FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; 
> MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> 
> PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; 
> Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 
> 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client 
> /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: 
> FetchRequest; Version: 0; CorrelationId: 3464; ClientId: 
> ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; 
> RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) 
> (kafka.net

Gradle hanging on unit test run against ProducerFailureHangingTest->testBrokerFailure

2014-07-24 Thread David Corley
Hey all,
I'm trying my hand at writing some patches for open issues, but I'm running
into issues with running gradlew test.
It hangs every time when trying to run testBrokerFailure in the
ProducerFailureHangingTest suite.

It was working for a time, but I updated to trunk HEAD and it's no longer
working.
I'm running on OSX with JDK 1.6.0_65.
I tried increasing the HeapSize for the test target, but hit the same issue.
Running against 3f1a9c4cee778d089d3ec3167555c2b89cdc48bb

Would appreciate any help.
Regards,
Dave


[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

2014-07-24 Thread Simon Cooper (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14073025#comment-14073025
 ] 

Simon Cooper commented on KAFKA-1150:
-

Yes, I built a replicated system with the patched kafka, and ran the test for 
about 10 minutes, with no delayed polls.

> Fetch on a replicated topic does not return as soon as possible
> ---
>
> Key: KAFKA-1150
> URL: https://issues.apache.org/jira/browse/KAFKA-1150
> Project: Kafka
>  Issue Type: Bug
>  Components: core, replication
>Affects Versions: 0.8.0
>Reporter: Andrey Balmin
>Assignee: Neha Narkhede
> Attachments: Test.java
>
>
> I see a huge performance difference between replicated and not replicated 
> topics. On my laptop, running two brokers, I see producer-2-consumer latency 
> of under 1ms for topics with one replica. 
> However,  with two replicas the same latency equals to the max fetch delay. 
> Here is a simple test I just did:
> one producer thread in a loop sending one message and sleeping for 2500ms, 
> and one consumer thread looping on the long poll with max fetch delay of 1000 
> ms.
> Here is what happens with no replication:
> Produced 1 key: key1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:53.823
> Consumed up to 1 at time: 15:33:54.825
> Produced 2 key: key2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:56.326
> Consumed up to 2 at time: 15:33:57.328
> Produced 3 key: key3 at time: 15:33:57.827
> Consumed up to 3 at time: 15:33:57.827
> The are no delays between the message being produced and consumed -- this is 
> the behavior I expected. 
> Here is the same test, but for a topic with two replicas:
> Consumed up to 0 at time: 15:50:29.575
> Produced 1 key: key1 at time: 15:50:29.575
> Consumed up to 1 at time: 15:50:30.577
> Consumed up to 1 at time: 15:50:31.579
> Consumed up to 1 at time: 15:50:32.078
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> Consumed up to 2 at time: 15:50:34.081
> Consumed up to 2 at time: 15:50:34.581
> Produced 3 key: key3 at time: 15:50:34.581
> Consumed up to 3 at time: 15:50:35.584
> Notice how the fetch always returns as soon as the produce request is issued, 
> but without the new message, which consistently arrives ~1002 ms later.
> Below is the request log snippet for this part:
> Produced 2 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> You can see the first FetchRequest returns at the same time as the replica 
> FetchRequest, but this fetch response is *empty* -- the message is not 
> committed yet, so it cannot be returned. The message is committed at 
> 15:50:32,079. However, the next FetchRequest (that does return the message) 
> comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it 
> waiting for the full 1000 ms, instead of returning right away?
> [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: 
> ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 
> 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 
> (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
> Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; 
> MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> 
> PartitionFetchInfo(129,1024000) from client 
> /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; 
> Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; 
> ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] 
> -> PartitionFetchInfo(129,1048576) from client 
> /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: 
> FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; 
> MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> 
> PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; 
> Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 
> 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client 
> /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0
>  (kafka.request.logger)
> [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: 
> FetchRequest; Version: 0; CorrelationId: 3464; ClientId: 
> ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWai

[jira] [Commented] (KAFKA-1510) Force offset commits when migrating consumer offsets from zookeeper to kafka

2014-07-24 Thread Nicolae Marasoiu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072913#comment-14072913
 ] 

Nicolae Marasoiu commented on KAFKA-1510:
-

[~jjkoshy] Yes I am going to tackle this these days, have a first patch 
proposal in the weekend or sooner.

> Force offset commits when migrating consumer offsets from zookeeper to kafka
> 
>
> Key: KAFKA-1510
> URL: https://issues.apache.org/jira/browse/KAFKA-1510
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2
>Reporter: Joel Koshy
>Assignee: nicu marasoiu
>  Labels: newbie
> Fix For: 0.8.2
>
>
> When migrating consumer offsets from ZooKeeper to kafka, we have to turn on 
> dual-commit (i.e., the consumers will commit offsets to both zookeeper and 
> kafka) in addition to setting offsets.storage to kafka. However, when we 
> commit offsets we only commit offsets if they have changed (since the last 
> commit). For low-volume topics or for topics that receive data in bursts 
> offsets may not move for a long period of time. Therefore we may want to 
> force the commit (even if offsets have not changed) when migrating (i.e., 
> when dual-commit is enabled) - we can add a minimum interval threshold (say 
> force commit after every 10 auto-commits) as well as on rebalance and 
> shutdown.
> Also, I think it is safe to switch the default for offsets.storage from 
> zookeeper to kafka and set the default to dual-commit (for people who have 
> not migrated yet). We have deployed this to the largest consumers at linkedin 
> and have not seen any issues so far (except for the migration caveat that 
> this jira will resolve).



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (KAFKA-1029) Zookeeper leader election stuck in ephemeral node retry loop

2014-07-24 Thread vincent ye (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072905#comment-14072905
 ] 

vincent ye commented on KAFKA-1029:
---

I think the problem is because registerSessionExpirationListener() is called 
before controllerElector.startup in kafkaController.startup().
If the session expired before the election happens in 
controllerElector.startup, SessionExpirationListener calls election to create a 
ephemeral node. Then the election triggered by controllerElector.startup will 
run into the infinite loop dealing with the ephemeral node bug.

> Zookeeper leader election stuck in ephemeral node retry loop
> 
>
> Key: KAFKA-1029
> URL: https://issues.apache.org/jira/browse/KAFKA-1029
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.0
>Reporter: Sam Meder
>Assignee: Sam Meder
>Priority: Blocker
> Fix For: 0.8.0
>
> Attachments: 
> 0002-KAFKA-1029-Use-brokerId-instead-of-leaderId-when-tri.patch
>
>
> We're seeing the following log statements (over and over):
> [2013-08-27 07:21:49,538] INFO conflict in /controller data: { "brokerid":3, 
> "timestamp":"1377587945206", "version":1 } stored data: { "brokerid":2, 
> "timestamp":"1377587460904", "version":1 } (kafka.utils.ZkUtils$)
> [2013-08-27 07:21:49,559] INFO I wrote this conflicted ephemeral node [{ 
> "brokerid":3, "timestamp":"1377587945206", "version":1 }] at /controller a 
> while back in a different session, hence I will backoff for this node to be 
> deleted by Zookeeper and retry (kafka.utils.ZkUtils$)
> where the broker is essentially stuck in the loop that is trying to deal with 
> left-over ephemeral nodes. The code looks a bit racy to me. In particular:
> ZookeeperLeaderElector:
>   def elect: Boolean = {
> controllerContext.zkClient.subscribeDataChanges(electionPath, 
> leaderChangeListener)
> val timestamp = SystemTime.milliseconds.toString
> val electString = ...
> try {
>   
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, leaderId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> leaderChangeListener is registered before the create call (by the way, it 
> looks like a new registration will be added every elect call - shouldn't it 
> register in startup()?) so can update leaderId to the current leader before 
> the call to create. If that happens then we will continuously get node exists 
> exceptions and the checker function will always return true, i.e. we will 
> never get out of the while(true) loop.
> I think the right fix here is to pass brokerId instead of leaderId when 
> calling create, i.e.
> createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, 
> electionPath, electString, brokerId,
> (controllerString : String, leaderId : Any) => 
> KafkaController.parseControllerId(controllerString) == 
> leaderId.asInstanceOf[Int],
> controllerContext.zkSessionTimeout)
> The loop dealing with the ephemeral node bug is now only triggered for the 
> broker that owned the node previously, although I am still not 100% sure if 
> that is sufficient.



--
This message was sent by Atlassian JIRA
(v6.2#6252)