Re: Review Request 31568: Patch for KAFKA-1989

2015-03-17 Thread Guozhang Wang


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > 
> >
> > We need to make tickMs and wheelSize configurable.
> 
> Yasuhiro Matsuda wrote:
> What is the motivation? I don't think it is a good idea to allow users to 
> configure them.

I am not concerning about user-configurability. The purgatory is used by 
multiple request types: produce, fetch and in the future rebalance, heartbeat 
and join group, different request type may need to set the tickMs and wheelSize 
differently.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, lines 187-192
> > 
> >
> > TBD

Replace the TBD here: we can let Timer.addTimerTaskEntry and Timer.add return 
Boolean instead of Unit indicating if the task has not expired and successfully 
added to the timer. And then we can change above as

if (!operation.isComplete()) {
  if (!timeoutTimer.add(operation) {
operation.cancel()
  }
}


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288
> > 
> >
> > It may be useful to return #.purged items?
> 
> Yasuhiro Matsuda wrote:
> What is the use?

At line 316 / 317 we could log on trace level whether the clock advance expired 
any tasks and the #.purged items.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > 
> >
> > I think bucket.flush(reinsurt) will always fail on all the items since 
> > their expiration time will always < bucket expiration + ticketMs, i.e. the 
> > returned bucket from the delayed queue has already expired all its items. 
> > In this case, could we just call foreach(submit) on all of them instead of 
> > trying to reinsurt them?
> 
> Yasuhiro Matsuda wrote:
> It is true only for the lowest wheel. Reinsert is necessary to make 
> timing wheels work. A bucket from a higher wheel may contain tasks not 
> expired (a tick time is longer in a higher wheel).

OK, I may miss sth. here, but this is my reasoning:

The bucket is only returned from delayed queue in line 62 if its expiration 
time has passed currentTime, after that at least the lowest wheel will advance 
to its expiration time, and hence add call within the reinsert is doomed to 
fail as task.expirationTime < wheel's time + tickMs.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > 
> >
> > It seems the task entry of the task will only be set once throughout 
> > its life time; even when the task entry gets reinsurted its correspondence 
> > to the task will not change, right?
> > 
> > If that is true we can just set the entry for the task in the 
> > constructor of the task entry.
> 
> Yasuhiro Matsuda wrote:
> This sets TimerTaskEntry to TimerTask. TimeTask is created independently 
> from a Timer, then enqueued to a Timer.

Yes, but can we move this line to line 119 of TimerTaskList.scala? Then in line 
46 of Timer when we create the TimerTaskEntry with the passed in TimerTask its 
entry field will be set automatically.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
---


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> ---
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
> https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> new purgatory implementation
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/

[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366588#comment-14366588
 ] 

Jiangjie Qin commented on KAFKA-1997:
-

Created reviewboard https://reviews.apache.org/r/32193/diff/
 against branch origin/trunk

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997.patch, 
> KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, 
> KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, 
> KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, 
> KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, 
> KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1997:

Attachment: KAFKA-1997.patch

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997.patch, 
> KAFKA-1997_2015-03-03_16:28:46.patch, KAFKA-1997_2015-03-04_15:07:46.patch, 
> KAFKA-1997_2015-03-04_15:42:45.patch, KAFKA-1997_2015-03-05_20:14:58.patch, 
> KAFKA-1997_2015-03-09_18:55:54.patch, KAFKA-1997_2015-03-10_18:31:34.patch, 
> KAFKA-1997_2015-03-11_15:20:18.patch, KAFKA-1997_2015-03-11_19:10:53.patch, 
> KAFKA-1997_2015-03-13_14:43:34.patch, KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Review Request 32193: Patch for KAFKA-1997

2015-03-17 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32193/
---

Review request for kafka.


Bugs: KAFKA-1997
https://issues.apache.org/jira/browse/KAFKA-1997


Repository: kafka


Description
---

Follow-up patch for KAFKA-1997, fix a few bugs.


Diffs
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
11acc3103e4e4a30d7380e26355ccba09b3304bb 

Diff: https://reviews.apache.org/r/32193/diff/


Testing
---


Thanks,

Jiangjie Qin



Re: [Discussion] Using Client Requests and Responses in Server

2015-03-17 Thread Neha Narkhede
>
> How about keeping those server-side for now, since there's no duplication?
> We can move them over in a follow-up jira.


+1. The work involved in this refactoring is pretty sizable. I would be ok
with splitting it in 2 JIRAs - one that converts the duplicated ones over
first and another that converts the rest.

On Tue, Mar 17, 2015 at 6:39 PM, Gwen Shapira  wrote:

> Never mind, it is implemented. Its just called LIST_OFFSETS.
>
>
>
> On Tue, Mar 17, 2015 at 6:27 PM, Gwen Shapira 
> wrote:
> > Thanks Jiangjie!
> >
> > Another, more questionable missing-in-action:
> >
> > Client API had offset fetch request and offset commit requests. I
> > understand those :)
> > The Server API also has OffsetRequest, which returns offset
> > corresponding to a specific time stamp. Its used by consumers.
> >
> > This sounds like something we actually need on the client side... is
> > this dropped with the new consumer?
> >
> >
> >
> >
> > On Tue, Mar 17, 2015 at 6:13 PM, Jiangjie Qin 
> wrote:
> >> I think those two requests are only used by controller to broker
> >> communication. Not sure if client side will need them in KIP-4,
> unlikely I
> >> guess.
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On 3/17/15, 6:08 PM, "Gwen Shapira"  wrote:
> >>
> >>>Hi,
> >>>
> >>>I'm starting this thread for the various questions I run into while
> >>>refactoring the server to use client requests and responses.
> >>>
> >>>Help is appreciated :)
> >>>
> >>>First question: LEADER_AND_ISR request and STOP_REPLICA request are
> >>>unimplemented in the client.
> >>>
> >>>Do we want to implement them as part of this refactoring?
> >>>Or should we continue using the scala implementation for those?
> >>>
> >>>Gwen
> >>
>



-- 
Thanks,
Neha


[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366549#comment-14366549
 ] 

Jiangjie Qin commented on KAFKA-1997:
-

[~xiaotao183] Yes, that's not right. I'll submit a patch. Thanks for finding 
this.

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, 
> KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, 
> KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, 
> KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, 
> KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, 
> KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread TAO XIAO (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366543#comment-14366543
 ] 

TAO XIAO edited comment on KAFKA-1997 at 3/18/15 2:52 AM:
--

[~becket_qin] I think I found a bug in MirrorMaker.scala. As show in below code 
block rebalanceListenerArgs gets passed into MirrorMakerMessageHandler 
constructor instead of messageHandlerArgs. Below code is quoted from line 256 
from MirrorMaker.scala off trunk

{code:title=MirrorMaker.scala|borderStyle=solid}
messageHandler = {
  if (customMessageHandlerClass != null) {
if (messageHandlerArgs != null)
  
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, 
rebalanceListenerArgs)
else
  
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
  } else {
defaultMirrorMakerMessageHandler
  }
}
{code}


was (Author: xiaotao183):
[~becket_qin] I think I found a bug in MirrorMaker.scala. As show in below code 
block rebalanceListenerArgs gets passed into MirrorMakerMessageHandler 
constructor instead of messageHandlerArgs

{code:title=MirrorMaker.scala|borderStyle=solid}
messageHandler = {
  if (customMessageHandlerClass != null) {
if (messageHandlerArgs != null)
  
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, 
rebalanceListenerArgs)
else
  
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
  } else {
defaultMirrorMakerMessageHandler
  }
}
{code}

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, 
> KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, 
> KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, 
> KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, 
> KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, 
> KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread TAO XIAO (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366543#comment-14366543
 ] 

TAO XIAO commented on KAFKA-1997:
-

[~becket_qin] I think I found a bug in MirrorMaker.scala. As show in below code 
block rebalanceListenerArgs gets passed into MirrorMakerMessageHandler 
constructor instead of messageHandlerArgs

{code:title=MirrorMaker.scala|borderStyle=solid}
messageHandler = {
  if (customMessageHandlerClass != null) {
if (messageHandlerArgs != null)
  
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, 
rebalanceListenerArgs)
else
  
Utils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass)
  } else {
defaultMirrorMakerMessageHandler
  }
}
{code}

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, 
> KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, 
> KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, 
> KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, 
> KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, 
> KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-17 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366490#comment-14366490
 ] 

Gwen Shapira commented on KAFKA-1688:
-

Got it. That does make sense.

> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-17 Thread Gwen Shapira
Never mind, it is implemented. Its just called LIST_OFFSETS.



On Tue, Mar 17, 2015 at 6:27 PM, Gwen Shapira  wrote:
> Thanks Jiangjie!
>
> Another, more questionable missing-in-action:
>
> Client API had offset fetch request and offset commit requests. I
> understand those :)
> The Server API also has OffsetRequest, which returns offset
> corresponding to a specific time stamp. Its used by consumers.
>
> This sounds like something we actually need on the client side... is
> this dropped with the new consumer?
>
>
>
>
> On Tue, Mar 17, 2015 at 6:13 PM, Jiangjie Qin  
> wrote:
>> I think those two requests are only used by controller to broker
>> communication. Not sure if client side will need them in KIP-4, unlikely I
>> guess.
>>
>> Jiangjie (Becket) Qin
>>
>> On 3/17/15, 6:08 PM, "Gwen Shapira"  wrote:
>>
>>>Hi,
>>>
>>>I'm starting this thread for the various questions I run into while
>>>refactoring the server to use client requests and responses.
>>>
>>>Help is appreciated :)
>>>
>>>First question: LEADER_AND_ISR request and STOP_REPLICA request are
>>>unimplemented in the client.
>>>
>>>Do we want to implement them as part of this refactoring?
>>>Or should we continue using the scala implementation for those?
>>>
>>>Gwen
>>


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-17 Thread Gwen Shapira
Thanks Jiangjie!

Another, more questionable missing-in-action:

Client API had offset fetch request and offset commit requests. I
understand those :)
The Server API also has OffsetRequest, which returns offset
corresponding to a specific time stamp. Its used by consumers.

This sounds like something we actually need on the client side... is
this dropped with the new consumer?




On Tue, Mar 17, 2015 at 6:13 PM, Jiangjie Qin  wrote:
> I think those two requests are only used by controller to broker
> communication. Not sure if client side will need them in KIP-4, unlikely I
> guess.
>
> Jiangjie (Becket) Qin
>
> On 3/17/15, 6:08 PM, "Gwen Shapira"  wrote:
>
>>Hi,
>>
>>I'm starting this thread for the various questions I run into while
>>refactoring the server to use client requests and responses.
>>
>>Help is appreciated :)
>>
>>First question: LEADER_AND_ISR request and STOP_REPLICA request are
>>unimplemented in the client.
>>
>>Do we want to implement them as part of this refactoring?
>>Or should we continue using the scala implementation for those?
>>
>>Gwen
>


Re: [Discussion] Using Client Requests and Responses in Server

2015-03-17 Thread Jiangjie Qin
I think those two requests are only used by controller to broker
communication. Not sure if client side will need them in KIP-4, unlikely I
guess.

Jiangjie (Becket) Qin

On 3/17/15, 6:08 PM, "Gwen Shapira"  wrote:

>Hi,
>
>I'm starting this thread for the various questions I run into while
>refactoring the server to use client requests and responses.
>
>Help is appreciated :)
>
>First question: LEADER_AND_ISR request and STOP_REPLICA request are
>unimplemented in the client.
>
>Do we want to implement them as part of this refactoring?
>Or should we continue using the scala implementation for those?
>
>Gwen



Re: [Discussion] Using Client Requests and Responses in Server

2015-03-17 Thread Gwen Shapira
Same for UPDATE_METADATA. Also, missing in client side. For good reasons :)

How about keeping those server-side for now, since there's no duplication?
We can move them over in a follow-up jira.

Gwen

On Tue, Mar 17, 2015 at 6:08 PM, Gwen Shapira  wrote:
> Hi,
>
> I'm starting this thread for the various questions I run into while
> refactoring the server to use client requests and responses.
>
> Help is appreciated :)
>
> First question: LEADER_AND_ISR request and STOP_REPLICA request are
> unimplemented in the client.
>
> Do we want to implement them as part of this refactoring?
> Or should we continue using the scala implementation for those?
>
> Gwen


[Discussion] Using Client Requests and Responses in Server

2015-03-17 Thread Gwen Shapira
Hi,

I'm starting this thread for the various questions I run into while
refactoring the server to use client requests and responses.

Help is appreciated :)

First question: LEADER_AND_ISR request and STOP_REPLICA request are
unimplemented in the client.

Do we want to implement them as part of this refactoring?
Or should we continue using the scala implementation for those?

Gwen


Re: Review Request 31568: Patch for KAFKA-1989

2015-03-17 Thread Yasuhiro Matsuda


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/Timer.scala, line 68
> > 
> >
> > I think bucket.flush(reinsurt) will always fail on all the items since 
> > their expiration time will always < bucket expiration + ticketMs, i.e. the 
> > returned bucket from the delayed queue has already expired all its items. 
> > In this case, could we just call foreach(submit) on all of them instead of 
> > trying to reinsurt them?

It is true only for the lowest wheel. Reinsert is necessary to make timing 
wheels work. A bucket from a higher wheel may contain tasks not expired (a tick 
time is longer in a higher wheel).


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 116
> > 
> >
> > We need to make tickMs and wheelSize configurable.

What is the motivation? I don't think it is a good idea to allow users to 
configure them.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 253
> > 
> >
> > Does it require to sync on refQueue as well?

I don't think so.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 288
> > 
> >
> > It may be useful to return #.purged items?

What is the use?


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala, line 67
> > 
> >
> > latch.await(0, TimeUnit.SECONDS)?

This is to avoid a race condition. In Timer, tasks are run by a thread pool. 
"0" makes it more vulnerable. "3" makes it pretty safe.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/DelayedOperation.scala, line 245
> > 
> >
> > It seems we do not need to keep this as a class member variable, but 
> > just compute the value in purge() on-the-fly every time.

This is a shared counter updated at multiple places. We need to have this to 
avoid unnecessary purge calls.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/utils/timer/TimerTaskList.scala, line 72
> > 
> >
> > It seems the task entry of the task will only be set once throughout 
> > its life time; even when the task entry gets reinsurted its correspondence 
> > to the task will not change, right?
> > 
> > If that is true we can just set the entry for the task in the 
> > constructor of the task entry.

This sets TimerTaskEntry to TimerTask. TimeTask is created independently from a 
Timer, then enqueued to a Timer.


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala, lines 
> > 29-33
> > 
> >
> > Could we just add an atomic integer recording the list size and size() 
> > function to TimerTaskList?

We size the list only in this test. Adding a counter to an individual list is 
unnecessary overhead.


- Yasuhiro


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
---


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> ---
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
> https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> new purgatory implementation
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOper

[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366377#comment-14366377
 ] 

Sriharsha Chintalapani commented on KAFKA-1566:
---

[~nehanarkhede] patch applies against the trunk cleanly now. Can you please 
take a look.

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1566:
--
Attachment: KAFKA-1566_2015-03-17_17:19:23.patch

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366373#comment-14366373
 ] 

Sriharsha Chintalapani commented on KAFKA-1566:
---

Updated reviewboard https://reviews.apache.org/r/29724/diff/
 against branch origin/trunk

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch, KAFKA-1566_2015-03-17_17:19:23.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29724: Patch for KAFKA-1566

2015-03-17 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29724/
---

(Updated March 18, 2015, 12:19 a.m.)


Review request for kafka.


Bugs: KAFKA-1566
https://issues.apache.org/jira/browse/KAFKA-1566


Repository: kafka


Description
---

KAFKA-1566. Kafka environment configuration (kafka-env.sh).


Diffs (updated)
-

  bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e 
  bin/windows/kafka-run-class.bat 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2 
  config/kafka-env.cmd PRE-CREATION 
  config/kafka-env.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/29724/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-2028) Unable to start the ZK instance after myid file was missing and had to recreate it.

2015-03-17 Thread InduR (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366362#comment-14366362
 ] 

InduR commented on KAFKA-2028:
--

The Data and log files are also mapped to be on tmp;

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

The server had not been rebooted  in the last two  months since kafka was first 
installed and started running.
[root@lablx0025 config]# uptime
 19:07:23 up 63 days,  3:19,  1 user,  load average: 0.00, 0.00, 0.00

So to resolve this issue should I re point  server.properties and  
ZK.Properties to a new file system and start the instances?

Thanks!

> Unable to start the ZK instance after myid file was missing and had to 
> recreate it.
> ---
>
> Key: KAFKA-2028
> URL: https://issues.apache.org/jira/browse/KAFKA-2028
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.8.1.1
> Environment: Non Prod
>Reporter: InduR
>
> Created a Dev 3 node cluster environment in Jan and the environment has been 
> up and running without any issues until few days.
>  Kafka server stopped running but ZK listener was up .Noticed that the Myid 
> file was missing in all 3 servers.
> Recreated the file when ZK was still running did not help.
> Stopped all of the ZK /kafka server instances and see the following error 
> when starting ZK.
> kafka_2.10-0.8.1.1
> OS : RHEL
> [root@lablx0025 bin]# ./zookeeper-server-start.sh 
> ../config/zookeeper.properties &
> [1] 31053
> [* bin]# [2015-03-17 15:04:33,876] INFO Reading configuration from: 
> ../config/zookeeper.properties (org.apache.zookeeper. 
>   
> server.quorum.QuorumPeerConfig)
> [2015-03-17 15:04:33,885] INFO Defaulting to majority quorums 
> (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
> [2015-03-17 15:04:33,911] DEBUG preRegister called. 
> Server=com.sun.jmx.mbeanserver.JmxMBeanServer@4891d863, 
> name=log4j:logger=kafka (k
>afka)
> [2015-03-17 15:04:33,915] INFO Starting quorum peer 
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
> [2015-03-17 15:04:33,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2015-03-17 15:04:33,966] INFO tickTime set to 3000 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO minSessionTimeout set to -1 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO maxSessionTimeout set to -1 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO initLimit set to 5 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:34,023] ERROR Failed to increment parent cversion for: 
> /consumers/console-consumer-6249/offsets/test (org.apache.zoo 
>   
> keeper.server.persistence.FileTxnSnapLog)
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/console-consumer-6249/offsets/test
> at 
> org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
> at 
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
> [2015-03-17 15:04:34,027] FATAL Unable to load database on disk 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> java.io.IOException: Failed to process transaction type: 2 error: 
> KeeperErrorCode = NoNode for /consumers/console-consumer-6249/offset  
>   
>s/test
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
> at 
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfi

[jira] [Commented] (KAFKA-2028) Unable to start the ZK instance after myid file was missing and had to recreate it.

2015-03-17 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366349#comment-14366349
 ] 

Joe Stein commented on KAFKA-2028:
--

the issue is like related to using the /tmp directory 

dataDir=/tmp/zookeeper

you should not be using the /tmp directory for zookeeper nor kafka (check your 
server.properties log.dirs) data

what could have happened is a reboot which os in that case delete everything in 
/tmp

> Unable to start the ZK instance after myid file was missing and had to 
> recreate it.
> ---
>
> Key: KAFKA-2028
> URL: https://issues.apache.org/jira/browse/KAFKA-2028
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.8.1.1
> Environment: Non Prod
>Reporter: InduR
>
> Created a Dev 3 node cluster environment in Jan and the environment has been 
> up and running without any issues until few days.
>  Kafka server stopped running but ZK listener was up .Noticed that the Myid 
> file was missing in all 3 servers.
> Recreated the file when ZK was still running did not help.
> Stopped all of the ZK /kafka server instances and see the following error 
> when starting ZK.
> kafka_2.10-0.8.1.1
> OS : RHEL
> [root@lablx0025 bin]# ./zookeeper-server-start.sh 
> ../config/zookeeper.properties &
> [1] 31053
> [* bin]# [2015-03-17 15:04:33,876] INFO Reading configuration from: 
> ../config/zookeeper.properties (org.apache.zookeeper. 
>   
> server.quorum.QuorumPeerConfig)
> [2015-03-17 15:04:33,885] INFO Defaulting to majority quorums 
> (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
> [2015-03-17 15:04:33,911] DEBUG preRegister called. 
> Server=com.sun.jmx.mbeanserver.JmxMBeanServer@4891d863, 
> name=log4j:logger=kafka (k
>afka)
> [2015-03-17 15:04:33,915] INFO Starting quorum peer 
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
> [2015-03-17 15:04:33,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2015-03-17 15:04:33,966] INFO tickTime set to 3000 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO minSessionTimeout set to -1 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO maxSessionTimeout set to -1 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO initLimit set to 5 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:34,023] ERROR Failed to increment parent cversion for: 
> /consumers/console-consumer-6249/offsets/test (org.apache.zoo 
>   
> keeper.server.persistence.FileTxnSnapLog)
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/console-consumer-6249/offsets/test
> at 
> org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
> at 
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
> [2015-03-17 15:04:34,027] FATAL Unable to load database on disk 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> java.io.IOException: Failed to process transaction type: 2 error: 
> KeeperErrorCode = NoNode for /consumers/console-consumer-6249/offset  
>   
>s/test
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
> at 
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPee

[jira] [Commented] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366347#comment-14366347
 ] 

Sriharsha Chintalapani commented on KAFKA-1566:
---

Updated reviewboard https://reviews.apache.org/r/29724/diff/
 against branch origin/trunk

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1566) Kafka environment configuration (kafka-env.sh)

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1566:
--
Attachment: KAFKA-1566_2015-03-17_17:01:38.patch

> Kafka environment configuration (kafka-env.sh)
> --
>
> Key: KAFKA-1566
> URL: https://issues.apache.org/jira/browse/KAFKA-1566
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Cosmin Lehene
>Assignee: Sriharsha Chintalapani
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-1566.patch, KAFKA-1566_2015-02-21_21:57:02.patch, 
> KAFKA-1566_2015-03-17_17:01:38.patch
>
>
> It would be useful (especially for automated deployments) to have an 
> environment configuration file that could be sourced from the launcher files 
> (e.g. kafka-run-server.sh). 
> This is how this could look like kafka-env.sh 
> {code}
> export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseCompressedOops 
> -XX:+DisableExplicitGC -Djava.awt.headless=true \ -XX:+UseG1GC 
> -XX:PermSize=48m -XX:MaxPermSize=48m -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35' %>" 
> export KAFKA_HEAP_OPTS="'-Xmx1G -Xms1G' %>" 
> export KAFKA_LOG4J_OPTS="-Dkafka.logs.dir=/var/log/kafka" 
> {code} 
> kafka-server-start.sh 
> {code} 
> ... 
> source $base_dir/config/kafka-env.sh 
> ... 
> {code} 
> This approach is consistent with Hadoop and HBase. However the idea here is 
> to be able to set these values in a single place without having to edit 
> startup scripts.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 29724: Patch for KAFKA-1566

2015-03-17 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29724/
---

(Updated March 18, 2015, 12:01 a.m.)


Review request for kafka.


Bugs: KAFKA-1566
https://issues.apache.org/jira/browse/KAFKA-1566


Repository: kafka


Description
---

KAFKA-1566. Kafka environment configuration (kafka-env.sh).


Diffs (updated)
-

  bin/kafka-run-class.sh 881f578a8f5c796fe23415b978c1ad35869af76e 
  bin/windows/kafka-run-class.bat 9df3d2b45236b4f06d55a89c84afcf0ab9f5d0f2 
  config/kafka-env.cmd PRE-CREATION 
  config/kafka-env.sh PRE-CREATION 

Diff: https://reviews.apache.org/r/29724/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



Re: Broker is not aware of new partitions assigned

2015-03-17 Thread Allen Wang
I am not sure if Kafka version difference is the issue as later adding
partitions for other topics works. Is there any compatibility issues on
protocol level in 0.8.2.1 vs. 0.8.1.1?

Also restarting the controller seems to fix the problem.

On Tue, Mar 17, 2015 at 4:08 PM, Mayuresh Gharat  wrote:

> Probably you can try restarting the controller and have same version for
> the controller and the brokers.
> BTW, was there any specific reason you are running 2 different versions for
> the controller and other brokers?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Mar 17, 2015 at 4:02 PM, Allen Wang 
> wrote:
>
> > Yes, the watcher is still alive. The log in the controller indicates that
> > it observed the changes.
> >
> >
> > On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > I think the way reassignment works is asynchronous. Changes are made to
> > > zookeeper but those changes get reflected only when controller watcher
> > > fires for the respective zookeeper path. Is your watcher still alive?
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang  >
> > > wrote:
> > >
> > > > Looking a bit more into controller log, it seems that when the
> > partition
> > > > assignment is changed in ZooKeeper, the controller has quite a lot
> > > > exceptions communicating with new brokers where the partitions are
> > > > assigned. One thing to note is that the new brokers have Kafka
> version
> > > > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> > > >
> > > > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > > > [Controller-2-to-broker-48-send-thread] [warn]
> > > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > > > request to broker id:48,host:xyz:7101
> > > > java.io.EOFException: Received -1 when reading from channel, socket
> has
> > > > likely been closed.
> > > > at kafka.utils.Utils$.read(Utils.scala:376)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > > at
> > > > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > > at
> > > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > > at
> > > >
> > > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> > > > at
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > > >
> > > > Does it explain why the brokers are not aware of the new assignments?
> > Is
> > > > there anyway to recover from this communication problem, like
> > restarting
> > > > the controller?
> > > >
> > > > Thanks,
> > > > Allen
> > > >
> > > >
> > > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang 
> > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > I developed a tool to add partitions and assign new partitions to a
> > set
> > > > of
> > > > > brokers in one operation by utilizing the API
> > > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > > > >
> > > > > It worked well in most cases. However, in one case, I found that
> the
> > > > > brokers are not aware of new partitions assigned to them, even
> though
> > > the
> > > > > zookeeper data clearly shows the assignment.
> > > > >
> > > > > Here is the zookeeper data for the partition:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > > > >
> > > > > On broker 62, the error message is:
> > > > >
> > > > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > > > correlation id 2048464 from client x on partition [m,71] failed due
> > to
> > > > > Partition [m,71] doesn't exist on 62
> > > > >
> > > > > Here is the core function of the tool:
> > > > >
> > > > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > > > Boolean):
> > > > > Unit = {
> > > > > val existingPartitionsReplicaList =
> > > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > > > > val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > > > > printf("Topic config: %s\n\n", config)
> > > > > if (existingPartitionsReplicaList.size == 0)
> > > > >   throw new AdminOperationException("The topic %s does not
> > > > > exist".format(topic))
> > > > > val currentPartitions = existingPartitionsReplicaList.size
> > > > > val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > > > e._2.size).max
> > > > > val brokersWithPartitions =
> > existingPartitionsReplicaList.flatMap(e
> > > > =>
> > > > > e._2).toSet.toSeq
> > > > > if
> > > (brokersToAssign

[jira] [Updated] (KAFKA-2028) Unable to start the ZK instance after myid file was missing and had to recreate it.

2015-03-17 Thread InduR (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

InduR updated KAFKA-2028:
-
Description: 
Created a Dev 3 node cluster environment in Jan and the environment has been up 
and running without any issues until few days.
 Kafka server stopped running but ZK listener was up .Noticed that the Myid 
file was missing in all 3 servers.
Recreated the file when ZK was still running did not help.
Stopped all of the ZK /kafka server instances and see the following error when 
starting ZK.

kafka_2.10-0.8.1.1
OS : RHEL
[root@lablx0025 bin]# ./zookeeper-server-start.sh 
../config/zookeeper.properties &
[1] 31053
[* bin]# [2015-03-17 15:04:33,876] INFO Reading configuration from: 
../config/zookeeper.properties (org.apache.zookeeper.   

server.quorum.QuorumPeerConfig)
[2015-03-17 15:04:33,885] INFO Defaulting to majority quorums 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-03-17 15:04:33,911] DEBUG preRegister called. 
Server=com.sun.jmx.mbeanserver.JmxMBeanServer@4891d863, name=log4j:logger=kafka 
(k  
 afka)
[2015-03-17 15:04:33,915] INFO Starting quorum peer 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2015-03-17 15:04:33,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 
(org.apache.zookeeper.server.NIOServerCnxn)
[2015-03-17 15:04:33,966] INFO tickTime set to 3000 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:33,966] INFO minSessionTimeout set to -1 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:33,966] INFO maxSessionTimeout set to -1 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:33,966] INFO initLimit set to 5 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:34,023] ERROR Failed to increment parent cversion for: 
/consumers/console-consumer-6249/offsets/test (org.apache.zoo   

keeper.server.persistence.FileTxnSnapLog)
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/console-consumer-6249/offsets/test
at 
org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
at 
org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
at 
org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
[2015-03-17 15:04:34,027] FATAL Unable to load database on disk 
(org.apache.zookeeper.server.quorum.QuorumPeer)
java.io.IOException: Failed to process transaction type: 2 error: 
KeeperErrorCode = NoNode for /consumers/console-consumer-6249/offset

   s/test
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
at 
org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
at 
org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
[2015-03-17 15:04:34,027] FATAL Unexpected exception, exiting abnormally 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
java.lang.RuntimeException: Unable to run quorum server
at 
org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:401)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
Caused by: java.io.IOException: Failed to process transaction type: 2 error: 
KeeperErrorCode = NoNode for /consumers/console-consumer-   

6249/offsets/test
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
  

[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366286#comment-14366286
 ] 

Sriharsha Chintalapani commented on KAFKA-1461:
---

[~guozhang] updated the PR as per your review suggestions. Please take a look 
when you get a chance.


> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
> KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
> KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Broker is not aware of new partitions assigned

2015-03-17 Thread Mayuresh Gharat
Probably you can try restarting the controller and have same version for
the controller and the brokers.
BTW, was there any specific reason you are running 2 different versions for
the controller and other brokers?

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 4:02 PM, Allen Wang 
wrote:

> Yes, the watcher is still alive. The log in the controller indicates that
> it observed the changes.
>
>
> On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > I think the way reassignment works is asynchronous. Changes are made to
> > zookeeper but those changes get reflected only when controller watcher
> > fires for the respective zookeeper path. Is your watcher still alive?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang 
> > wrote:
> >
> > > Looking a bit more into controller log, it seems that when the
> partition
> > > assignment is changed in ZooKeeper, the controller has quite a lot
> > > exceptions communicating with new brokers where the partitions are
> > > assigned. One thing to note is that the new brokers have Kafka version
> > > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> > >
> > > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > > [Controller-2-to-broker-48-send-thread] [warn]
> > > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > > request to broker id:48,host:xyz:7101
> > > java.io.EOFException: Received -1 when reading from channel, socket has
> > > likely been closed.
> > > at kafka.utils.Utils$.read(Utils.scala:376)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > at
> > > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > > at
> > >
> > >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > > at
> > kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > > at
> > >
> > >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> > > at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > >
> > > Does it explain why the brokers are not aware of the new assignments?
> Is
> > > there anyway to recover from this communication problem, like
> restarting
> > > the controller?
> > >
> > > Thanks,
> > > Allen
> > >
> > >
> > > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang 
> wrote:
> > >
> > > > Hello,
> > > >
> > > > I developed a tool to add partitions and assign new partitions to a
> set
> > > of
> > > > brokers in one operation by utilizing the API
> > > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > > >
> > > > It worked well in most cases. However, in one case, I found that the
> > > > brokers are not aware of new partitions assigned to them, even though
> > the
> > > > zookeeper data clearly shows the assignment.
> > > >
> > > > Here is the zookeeper data for the partition:
> > > >
> > > >
> > > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > > >
> > > > On broker 62, the error message is:
> > > >
> > > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > > correlation id 2048464 from client x on partition [m,71] failed due
> to
> > > > Partition [m,71] doesn't exist on 62
> > > >
> > > > Here is the core function of the tool:
> > > >
> > > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > > Boolean):
> > > > Unit = {
> > > > val existingPartitionsReplicaList =
> > > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > > > val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > > > printf("Topic config: %s\n\n", config)
> > > > if (existingPartitionsReplicaList.size == 0)
> > > >   throw new AdminOperationException("The topic %s does not
> > > > exist".format(topic))
> > > > val currentPartitions = existingPartitionsReplicaList.size
> > > > val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > > e._2.size).max
> > > > val brokersWithPartitions =
> existingPartitionsReplicaList.flatMap(e
> > > =>
> > > > e._2).toSet.toSeq
> > > > if
> > (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > > 0) {
> > > >   printf("Topic %s already has partitions on brokers %s.
> > > Skipping.\n",
> > > > topic, brokersToAssignPartitions)
> > > >   return
> > > > }
> > > > val totalBrokers = brokers.size
> > > > val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > > > if (oldBrokers == 0) {
> > > >   throw new IllegalArgumentException("Cannot add partitions to
> new
> > > > brokers without existing partitions")
> > > > }
> > > > val expectedPartitions = currentPartit

[jira] [Commented] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366262#comment-14366262
 ] 

Sriharsha Chintalapani commented on KAFKA-1461:
---

Updated reviewboard https://reviews.apache.org/r/31366/diff/
 against branch origin/trunk

> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
> KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
> KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1461) Replica fetcher thread does not implement any back-off behavior

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1461:
--
Attachment: KAFKA-1461_2015-03-17_16:03:33.patch

> Replica fetcher thread does not implement any back-off behavior
> ---
>
> Key: KAFKA-1461
> URL: https://issues.apache.org/jira/browse/KAFKA-1461
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.1.1
>Reporter: Sam Meder
>Assignee: Sriharsha Chintalapani
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1461.patch, KAFKA-1461.patch, 
> KAFKA-1461_2015-03-11_10:41:26.patch, KAFKA-1461_2015-03-11_18:17:51.patch, 
> KAFKA-1461_2015-03-12_13:54:51.patch, KAFKA-1461_2015-03-17_16:03:33.patch
>
>
> The current replica fetcher thread will retry in a tight loop if any error 
> occurs during the fetch call. For example, we've seen cases where the fetch 
> continuously throws a connection refused exception leading to several replica 
> fetcher threads that spin in a pretty tight loop.
> To a much lesser degree this is also an issue in the consumer fetcher thread, 
> although the fact that erroring partitions are removed so a leader can be 
> re-discovered helps some.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Broker is not aware of new partitions assigned

2015-03-17 Thread Allen Wang
Yes, the watcher is still alive. The log in the controller indicates that
it observed the changes.


On Tue, Mar 17, 2015 at 2:05 PM, Mayuresh Gharat  wrote:

> I think the way reassignment works is asynchronous. Changes are made to
> zookeeper but those changes get reflected only when controller watcher
> fires for the respective zookeeper path. Is your watcher still alive?
>
> Thanks,
>
> Mayuresh
>
> On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang 
> wrote:
>
> > Looking a bit more into controller log, it seems that when the partition
> > assignment is changed in ZooKeeper, the controller has quite a lot
> > exceptions communicating with new brokers where the partitions are
> > assigned. One thing to note is that the new brokers have Kafka version
> > 0.8.2.1 and the controller has Kafka version 0.8.1.1.
> >
> > 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> > [Controller-2-to-broker-48-send-thread] [warn]
> > [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> > request to broker id:48,host:xyz:7101
> > java.io.EOFException: Received -1 when reading from channel, socket has
> > likely been closed.
> > at kafka.utils.Utils$.read(Utils.scala:376)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> > at
> >
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> > at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> > at
> >
> >
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> > at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> >
> > Does it explain why the brokers are not aware of the new assignments? Is
> > there anyway to recover from this communication problem, like restarting
> > the controller?
> >
> > Thanks,
> > Allen
> >
> >
> > On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang  wrote:
> >
> > > Hello,
> > >
> > > I developed a tool to add partitions and assign new partitions to a set
> > of
> > > brokers in one operation by utilizing the API
> > > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> > >
> > > It worked well in most cases. However, in one case, I found that the
> > > brokers are not aware of new partitions assigned to them, even though
> the
> > > zookeeper data clearly shows the assignment.
> > >
> > > Here is the zookeeper data for the partition:
> > >
> > >
> > >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> > >
> > > On broker 62, the error message is:
> > >
> > > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > > correlation id 2048464 from client x on partition [m,71] failed due to
> > > Partition [m,71] doesn't exist on 62
> > >
> > > Here is the core function of the tool:
> > >
> > >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> > Boolean):
> > > Unit = {
> > > val existingPartitionsReplicaList =
> > > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > > val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > > printf("Topic config: %s\n\n", config)
> > > if (existingPartitionsReplicaList.size == 0)
> > >   throw new AdminOperationException("The topic %s does not
> > > exist".format(topic))
> > > val currentPartitions = existingPartitionsReplicaList.size
> > > val replicationFactor = existingPartitionsReplicaList.map(e =>
> > > e._2.size).max
> > > val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e
> > =>
> > > e._2).toSet.toSeq
> > > if
> (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > > 0) {
> > >   printf("Topic %s already has partitions on brokers %s.
> > Skipping.\n",
> > > topic, brokersToAssignPartitions)
> > >   return
> > > }
> > > val totalBrokers = brokers.size
> > > val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > > if (oldBrokers == 0) {
> > >   throw new IllegalArgumentException("Cannot add partitions to new
> > > brokers without existing partitions")
> > > }
> > > val expectedPartitions = currentPartitions * totalBrokers /
> > oldBrokers
> > > val newPartitions = expectedPartitions - currentPartitions
> > > if (newPartitions <= 0) {
> > >   throw new IllegalArgumentException("Invalid number of new
> > partitions
> > > %d".format(newPartitions))
> > > }
> > > val newPartitionReplicaList =
> > > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > > newPartitions, replicationFactor, startPartitionId = currentPartitions)
> > > val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > > p._1.partition -> p._2)
> > > // add 

Re: Review Request 31366: Patch for KAFKA-1461

2015-03-17 Thread Sriharsha Chintalapani

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31366/
---

(Updated March 17, 2015, 11:03 p.m.)


Review request for kafka.


Bugs: KAFKA-1461
https://issues.apache.org/jira/browse/KAFKA-1461


Repository: kafka


Description
---

KAFKA-1461. Replica fetcher thread does not implement any back-off behavior.


Diffs (updated)
-

  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
e731df4b2a3e44aa3d761713a09b1070aff81430 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 
96faa7b4ed7c9ba8a3f6f9f114bd94e19b3a7ac0 

Diff: https://reviews.apache.org/r/31366/diff/


Testing
---


Thanks,

Sriharsha Chintalapani



[jira] [Commented] (KAFKA-2028) Unable to start the ZK instance after myid file was missing and had to recreate it.

2015-03-17 Thread Joe Stein (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366250#comment-14366250
 ] 

Joe Stein commented on KAFKA-2028:
--

can you post your zookeeper.properties

> Unable to start the ZK instance after myid file was missing and had to 
> recreate it.
> ---
>
> Key: KAFKA-2028
> URL: https://issues.apache.org/jira/browse/KAFKA-2028
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.8.1.1
> Environment: Non Prod
>Reporter: InduR
>
> Created a Dev 3 node cluster environment in Jan and the environment has been 
> up and running without any issues until few days.
>  Kafka server stopped running but ZK listener was up .Noticed that the Myid 
> file was missing in all 3 servers.
> Recreated the file when ZK was still running did not help.
> Stopped all of the ZK /kafka server instances and see the following error 
> when starting ZK.
> kafka_2.10-0.8.1.1
> OS : RHEL
> [root@lablx0025 bin]# ./zookeeper-server-start.sh 
> ../config/zookeeper.properties &
> [1] 31053
> [* bin]# [2015-03-17 15:04:33,876] INFO Reading configuration from: 
> ../config/zookeeper.properties (org.apache.zookeeper. 
>   
> server.quorum.QuorumPeerConfig)
> [2015-03-17 15:04:33,885] INFO Defaulting to majority quorums 
> (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
> [2015-03-17 15:04:33,911] DEBUG preRegister called. 
> Server=com.sun.jmx.mbeanserver.JmxMBeanServer@4891d863, 
> name=log4j:logger=kafka (k
>afka)
> [2015-03-17 15:04:33,915] INFO Starting quorum peer 
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
> [2015-03-17 15:04:33,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 
> (org.apache.zookeeper.server.NIOServerCnxn)
> [2015-03-17 15:04:33,966] INFO tickTime set to 3000 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO minSessionTimeout set to -1 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO maxSessionTimeout set to -1 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:33,966] INFO initLimit set to 5 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> [2015-03-17 15:04:34,023] ERROR Failed to increment parent cversion for: 
> /consumers/console-consumer-6249/offsets/test (org.apache.zoo 
>   
> keeper.server.persistence.FileTxnSnapLog)
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
> NoNode for /consumers/console-consumer-6249/offsets/test
> at 
> org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
> at 
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
> [2015-03-17 15:04:34,027] FATAL Unable to load database on disk 
> (org.apache.zookeeper.server.quorum.QuorumPeer)
> java.io.IOException: Failed to process transaction type: 2 error: 
> KeeperErrorCode = NoNode for /consumers/console-consumer-6249/offset  
>   
>s/test
> at 
> org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
> at 
> org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
> at 
> org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
> [2015-03-17 15:04:34,027] FATAL Unexpected exception, exiting abnormally 
> (org.apache.zookeeper.server.quorum.QuorumPeerMain)
> java.lang.RuntimeException: Unable to run quorum server
> at 
> org.apache.zookeeper.s

Re: Review Request 30809: Patch for KAFKA-1888

2015-03-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30809/#review76835
---


Also, I think we can move ValidatingProducer/Consumer and BoostrapConsumer to 
kafka.perf if we could do the refactoring based on Producer/ConsumerPerformance.

- Guozhang Wang


On March 9, 2015, 11:55 p.m., Abhishek Nigam wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30809/
> ---
> 
> (Updated March 9, 2015, 11:55 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1888
> https://issues.apache.org/jira/browse/KAFKA-1888
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> Fixing the tests based on Mayuresh comments, code cleanup after proper IDE 
> setup
> 
> 
> Diffs
> -
> 
>   build.gradle 0f0fe60a74542efa91a0e727146e896edcaa38af 
>   core/src/main/scala/kafka/tools/ContinuousValidationTest.java PRE-CREATION 
>   system_test/broker_upgrade/bin/kafka-run-class.sh PRE-CREATION 
>   system_test/broker_upgrade/bin/test.sh PRE-CREATION 
>   system_test/broker_upgrade/configs/server1.properties PRE-CREATION 
>   system_test/broker_upgrade/configs/server2.properties PRE-CREATION 
>   system_test/broker_upgrade/configs/zookeeper_source.properties PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/30809/diff/
> 
> 
> Testing
> ---
> 
> Scripted it to run 20 times without any failures.
> Command-line: broker-upgrade/bin/test.sh  
> 
> 
> Thanks,
> 
> Abhishek Nigam
> 
>



Re: Review Request 30809: Patch for KAFKA-1888

2015-03-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30809/#review76103
---


There some coding convention suggestions that may not be comprehensive. You can 
take a look at these wiki pages:

http://kafka.apache.org/coding-guide.html

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging

Also I think the ValidatingProducer / ValidatingConsumer can be implemented via 
extending ShutdownableThread and Producer/ConsumerPerformance, which will save 
lots of code here.


build.gradle


I think the general idea is to not introduce external dependencies to core 
/ clients dependencies, but only to test dependencies (i.e. testCompile) like 
yammer / jopt.



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Could you group the imports with

org.apache.kafka..

kafka..

java..

scala..

other external imports..



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


The comments here are a bit too high-level, would better off adding some 
details about:

1) the consumer scenarios: real-time consumer and bootstrap consumer.
2)



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Explicitly specify these varialbes as public or private. For example, the 
logger should be:

private static final Logger = ..



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Rename to

"ContinuousValidationTestRealTimeConsumer" and

"ContinuousValidationTestBootstrapConsumer"?

Also there are some inconsistency in the comments / code below between 
"consumer" and "real-time consumer" but they are actually referring to the same 
thing. Could you make sure the terms are consistent, e.g. "real-time consumer".



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Move the static variables to the top of the class.



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Add a space after "//", ditto below.



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Do not need to add the "//need flip" comment, as generally when the buffer 
swtich from write mode to read mode one needs to flip.



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Could we move all the inner classes to the bottom of the 
ContinuousValidationTest class?



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Could we use

/**
 *
 */

for class-level comments?



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Can we just extend ProducerPerformance to extend this class?



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Unclear comments, and also below.



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


Likewise, can we extend ConsumerPerformance to implement this class?



core/src/main/scala/kafka/tools/ContinuousValidationTest.java


1. Move these declarations to the top of the class. 
2. Rename consumer to realtimeConsumer.
3. I think we do not need to maintain a separate thread for each one of 
producer / realtimeConsumer / bootstrapConsumer. You could take a look at 
kafka.utils.ShutdownableThread, and implement, for example, ValidatingConsumer 
as

class ValidatingConsumer extends ConsumerPerformance, ShutdownableThread..



system_test/broker_upgrade/bin/kafka-run-class.sh


Can we just use the the kafka-run-class.sh in bin? For example, in 
system_test/producer_perf/bin/run-compression-test.sh, you can see

"$base_dir/../../bin/kafka-run-class.sh"


- Guozhang Wang


On March 9, 2015, 11:55 p.m., Abhishek Nigam wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30809/
> ---
> 
> (Updated March 9, 2015, 11:55 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1888
> https://issues.apache.org/jira/brows

[jira] [Updated] (KAFKA-2016) RollingBounceTest takes long

2015-03-17 Thread Ted Malaska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Malaska updated KAFKA-2016:
---
 Reviewer: Jun Rao
   Labels: newbie  (was: )
Affects Version/s: 0.8.2.1
   Status: Patch Available  (was: Open)

> RollingBounceTest takes long
> 
>
> Key: KAFKA-2016
> URL: https://issues.apache.org/jira/browse/KAFKA-2016
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2.1
>Reporter: Jun Rao
>  Labels: newbie
> Fix For: 0.8.3
>
> Attachments: KAFKA-2016-1.patch
>
>
> RollingBounceTest.testRollingBounce() currently takes about 48 secs. This is 
> a bit too long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2016) RollingBounceTest takes long

2015-03-17 Thread Ted Malaska (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Malaska updated KAFKA-2016:
---
Attachment: KAFKA-2016-1.patch

Modified unit test to not wait 5000 but 100 milliseconds between retries.

> RollingBounceTest takes long
> 
>
> Key: KAFKA-2016
> URL: https://issues.apache.org/jira/browse/KAFKA-2016
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
> Fix For: 0.8.3
>
> Attachments: KAFKA-2016-1.patch
>
>
> RollingBounceTest.testRollingBounce() currently takes about 48 secs. This is 
> a bit too long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2016) RollingBounceTest takes long

2015-03-17 Thread Ted Malaska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366174#comment-14366174
 ] 

Ted Malaska commented on KAFKA-2016:


Nope my bad that was my mistake.  They all get set to 100.  Hmm.  I will keep 
looking.

> RollingBounceTest takes long
> 
>
> Key: KAFKA-2016
> URL: https://issues.apache.org/jira/browse/KAFKA-2016
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
> Fix For: 0.8.3
>
>
> RollingBounceTest.testRollingBounce() currently takes about 48 secs. This is 
> a bit too long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: KafkaPreCommit #39

2015-03-17 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-1997; Follow-up to add the shutdown hook before starting the 
consumers; reviewed by Guozhang Wang

--
[...truncated 2041 lines...]
kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.ReplicationUtilsTest > testUpdateLeaderAndIsr PASSED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.log.FileMessageSetTest > testWrittenEqualsRead PASSED

kafka.log.FileMessageSetTest > testIteratorIsConsistent PASSED

kafka.log.FileMessageSetTest > testSizeInBytes PASSED

kafka.log.FileMessageSetTest > testWriteTo PASSED

kafka.log.FileMessageSetTest > testFileSize PASSED

kafka.log.FileMessageSetTest > testIterationOverPartialAndTruncation PASSED

kafka.log.FileMessageSetTest > testIterationDoesntChangePosition PASSED

kafka.log.FileMessageSetTest > testRead PASSED

kafka.log.FileMessageSetTest > testSearch PASSED

kafka.log.FileMessageSetTest > testIteratorWithLimits PASSED

kafka.log.FileMessageSetTest > testTruncate PASSED

kafka.log.LogConfigTest > testFromPropsDefaults PASSED

kafka.log.LogConfigTest > testFromPropsEmpty PASSED

kafka.log.LogConfigTest > testFromPropsToProps PASSED

kafka.log.LogConfigTest > testFromPropsInvalid PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[0] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[1] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[2] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[3] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[4] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[5] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[6] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[7] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[8] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[9] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[10] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[11] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[12] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[13] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[14] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[15] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[16] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[17] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[18] PASSED

kafka.log.BrokerCompressionTest > testBrokerSideCompression[19] PASSED

kafka.log.LogCleanerIntegrationTest > cleanerTest PASSED

kafka.log.LogManagerTest > testCreateLog PASSED

kafka.log.LogManagerTest > testGetNonExistentLog PASSED

kafka.log.LogManagerTest > testCleanupExpiredSegments PASSED

kafka.log.LogManagerTest > testCleanupSegmentsToMaintainSize PASSED

kafka.log.LogManagerTest > testTimeBasedFlush PASSED

kafka.log.LogManagerTest > testLeastLoadedAssignment PASSED

kafka.log.LogManagerTest > testTwoLogManagersUsingSameDirFails PASSED

kafka.log.LogManagerTest > testCheckpointRecoveryPoints PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithTrailingSlash PASSED

kafka.log.LogManagerTest > testRecoveryDirectoryMappingWithRelativeDirectory 
PASSED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testReadOutOfRange PASSED

kafka.log.LogTest > testLogRolls PASSED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > testMessageSizeCheck PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testIndexRebuild PASSED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testAsyncDelete PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testParseTopicPartitionName PASS

Re: Review Request 31967: Patch for KAFKA-1546

2015-03-17 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
---

(Updated March 17, 2015, 9:46 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
---

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time 
since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the 
LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Addressing Joel's comments


Diffs (updated)
-

  core/src/main/scala/kafka/cluster/Partition.scala 
c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala 
bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 
06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 
26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
---


Thanks,

Aditya Auradkar



[jira] [Commented] (KAFKA-1546) Automate replica lag tuning

2015-03-17 Thread Aditya A Auradkar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366152#comment-14366152
 ] 

Aditya A Auradkar commented on KAFKA-1546:
--

Updated reviewboard https://reviews.apache.org/r/31967/diff/
 against branch origin/trunk

> Automate replica lag tuning
> ---
>
> Key: KAFKA-1546
> URL: https://issues.apache.org/jira/browse/KAFKA-1546
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
>Reporter: Neha Narkhede
>Assignee: Aditya Auradkar
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
> KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch, 
> KAFKA-1546_2015-03-17_14:46:10.patch
>
>
> Currently, there is no good way to tune the replica lag configs to 
> automatically account for high and low volume topics on the same cluster. 
> For the low-volume topic it will take a very long time to detect a lagging
> replica, and for the high-volume topic it will have false-positives.
> One approach to making this easier would be to have the configuration
> be something like replica.lag.max.ms and translate this into a number
> of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1546) Automate replica lag tuning

2015-03-17 Thread Aditya A Auradkar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aditya A Auradkar updated KAFKA-1546:
-
Attachment: KAFKA-1546_2015-03-17_14:46:10.patch

> Automate replica lag tuning
> ---
>
> Key: KAFKA-1546
> URL: https://issues.apache.org/jira/browse/KAFKA-1546
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.8.0, 0.8.1, 0.8.1.1
>Reporter: Neha Narkhede
>Assignee: Aditya Auradkar
>  Labels: newbie++
> Fix For: 0.8.3
>
> Attachments: KAFKA-1546.patch, KAFKA-1546_2015-03-11_18:48:09.patch, 
> KAFKA-1546_2015-03-12_13:42:01.patch, KAFKA-1546_2015-03-16_11:31:39.patch, 
> KAFKA-1546_2015-03-17_14:46:10.patch
>
>
> Currently, there is no good way to tune the replica lag configs to 
> automatically account for high and low volume topics on the same cluster. 
> For the low-volume topic it will take a very long time to detect a lagging
> replica, and for the high-volume topic it will have false-positives.
> One approach to making this easier would be to have the configuration
> be something like replica.lag.max.ms and translate this into a number
> of messages dynamically based on the throughput of the partition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366141#comment-14366141
 ] 

Guozhang Wang commented on KAFKA-1997:
--

Commit the follow-up patch to trunk.

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, 
> KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, 
> KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, 
> KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, 
> KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, 
> KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2016) RollingBounceTest takes long

2015-03-17 Thread Ted Malaska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366134#comment-14366134
 ] 

Ted Malaska commented on KAFKA-2016:


OK I got it.

So controlled.shutdown.retry.backoff.ms is only set for the forth server and 
its set to 100 milliseconds.

The other 3 servers are not set and forgotten.  These sad servers are left with 
a controlled.shutdown.retry.backoff.ms of 5000 milliseconds.

The backoff is requested and hence the code waits.  The solution is to tell all 
the servers to have a controlled.shutdown.retry.backoff.ms of 100.

I will put the patch together tomorrow on the train.

> RollingBounceTest takes long
> 
>
> Key: KAFKA-2016
> URL: https://issues.apache.org/jira/browse/KAFKA-2016
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
> Fix For: 0.8.3
>
>
> RollingBounceTest.testRollingBounce() currently takes about 48 secs. This is 
> a bit too long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31706: Patch for KAFKA-1997

2015-03-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31706/#review76815
---

Ship it!


Ship It!

- Guozhang Wang


On March 17, 2015, 8:47 p.m., Jiangjie Qin wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31706/
> ---
> 
> (Updated March 17, 2015, 8:47 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1997
> https://issues.apache.org/jira/browse/KAFKA-1997
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> move shutdown hook registration before creating producer and consumer 
> streams, so mirror maker can be shutdown correctly if error occur during 
> consumer stream creation.
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala 
> 87b925c7332470de59a5b098d51c4bf0dfab8b8f 
> 
> Diff: https://reviews.apache.org/r/31706/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>



[jira] [Issue Comment Deleted] (KAFKA-1860) File system errors are not detected unless Kafka tries to write

2015-03-17 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-1860:
---
Comment: was deleted

(was: We need to reproduce and test the fix on our kafka server mp)

> File system errors are not detected unless Kafka tries to write
> ---
>
> Key: KAFKA-1860
> URL: https://issues.apache.org/jira/browse/KAFKA-1860
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0
>
> Attachments: KAFKA-1860.patch
>
>
> When the disk (raid with caches dir) dies on a Kafka broker, typically the 
> filesystem gets mounted into read-only mode, and hence when Kafka tries to 
> read the disk, they'll get a FileNotFoundException with the read-only errno 
> set (EROFS).
> However, as long as there is no produce request received, hence no writes 
> attempted on the disks, Kafka will not exit on such FATAL error (when the 
> disk starts working again, Kafka might think some files are gone while they 
> will reappear later as raid comes back online). Instead it keeps spilling 
> exceptions like:
> {code}
> 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] 
> [kafka-server] [] Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint'
> java.io.FileNotFoundException: 
> /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp 
> (Read-only file system)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:206)
>   at java.io.FileOutputStream.(FileOutputStream.java:156)
>   at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Broker is not aware of new partitions assigned

2015-03-17 Thread Mayuresh Gharat
I think the way reassignment works is asynchronous. Changes are made to
zookeeper but those changes get reflected only when controller watcher
fires for the respective zookeeper path. Is your watcher still alive?

Thanks,

Mayuresh

On Tue, Mar 17, 2015 at 1:29 PM, Allen Wang 
wrote:

> Looking a bit more into controller log, it seems that when the partition
> assignment is changed in ZooKeeper, the controller has quite a lot
> exceptions communicating with new brokers where the partitions are
> assigned. One thing to note is that the new brokers have Kafka version
> 0.8.2.1 and the controller has Kafka version 0.8.1.1.
>
> 2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
> [Controller-2-to-broker-48-send-thread] [warn]
> [Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
> request to broker id:48,host:xyz:7101
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:376)
> at
>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> at
>
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
>
> Does it explain why the brokers are not aware of the new assignments? Is
> there anyway to recover from this communication problem, like restarting
> the controller?
>
> Thanks,
> Allen
>
>
> On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang  wrote:
>
> > Hello,
> >
> > I developed a tool to add partitions and assign new partitions to a set
> of
> > brokers in one operation by utilizing the API
> > AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
> >
> > It worked well in most cases. However, in one case, I found that the
> > brokers are not aware of new partitions assigned to them, even though the
> > zookeeper data clearly shows the assignment.
> >
> > Here is the zookeeper data for the partition:
> >
> >
> >
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
> >
> > On broker 62, the error message is:
> >
> > 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> > [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> > correlation id 2048464 from client x on partition [m,71] failed due to
> > Partition [m,71] doesn't exist on 62
> >
> > Here is the core function of the tool:
> >
> >   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> > brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute:
> Boolean):
> > Unit = {
> > val existingPartitionsReplicaList =
> > ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> > val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> > printf("Topic config: %s\n\n", config)
> > if (existingPartitionsReplicaList.size == 0)
> >   throw new AdminOperationException("The topic %s does not
> > exist".format(topic))
> > val currentPartitions = existingPartitionsReplicaList.size
> > val replicationFactor = existingPartitionsReplicaList.map(e =>
> > e._2.size).max
> > val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e
> =>
> > e._2).toSet.toSeq
> > if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> > 0) {
> >   printf("Topic %s already has partitions on brokers %s.
> Skipping.\n",
> > topic, brokersToAssignPartitions)
> >   return
> > }
> > val totalBrokers = brokers.size
> > val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> > if (oldBrokers == 0) {
> >   throw new IllegalArgumentException("Cannot add partitions to new
> > brokers without existing partitions")
> > }
> > val expectedPartitions = currentPartitions * totalBrokers /
> oldBrokers
> > val newPartitions = expectedPartitions - currentPartitions
> > if (newPartitions <= 0) {
> >   throw new IllegalArgumentException("Invalid number of new
> partitions
> > %d".format(newPartitions))
> > }
> > val newPartitionReplicaList =
> > AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> > newPartitions, replicationFactor, startPartitionId = currentPartitions)
> > val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> > p._1.partition -> p._2)
> > // add the new list
> > partitionReplicaList ++= newPartitionReplicaList
> > printf("Changing number of partitions from %d to %d to topic %s\n\n",
> > currentPartitions, expectedPartitions, topic)
> > printf("Replica reassignment for new partitions:\n\n%s\n\n",
> > getAssignmentJson(topic, newPartitionReplicaList))
> > printf("Complete replica assignment:\n\n%s\n\n",
> > getAssignmentJson(topic,

[jira] [Commented] (KAFKA-1860) File system errors are not detected unless Kafka tries to write

2015-03-17 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366088#comment-14366088
 ] 

Mayuresh Gharat commented on KAFKA-1860:


We need to reproduce and test the fix on our kafka server mp

> File system errors are not detected unless Kafka tries to write
> ---
>
> Key: KAFKA-1860
> URL: https://issues.apache.org/jira/browse/KAFKA-1860
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0
>
> Attachments: KAFKA-1860.patch
>
>
> When the disk (raid with caches dir) dies on a Kafka broker, typically the 
> filesystem gets mounted into read-only mode, and hence when Kafka tries to 
> read the disk, they'll get a FileNotFoundException with the read-only errno 
> set (EROFS).
> However, as long as there is no produce request received, hence no writes 
> attempted on the disks, Kafka will not exit on such FATAL error (when the 
> disk starts working again, Kafka might think some files are gone while they 
> will reappear later as raid comes back online). Instead it keeps spilling 
> exceptions like:
> {code}
> 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] 
> [kafka-server] [] Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint'
> java.io.FileNotFoundException: 
> /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp 
> (Read-only file system)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:206)
>   at java.io.FileOutputStream.(FileOutputStream.java:156)
>   at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (KAFKA-2023) git clone kafka repository requires https

2015-03-17 Thread Anatoli Fomenko (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anatoli Fomenko closed KAFKA-2023.
--

Verified in the doc repo and on the site.

Thank you.

> git clone kafka repository requires https
> -
>
> Key: KAFKA-2023
> URL: https://issues.apache.org/jira/browse/KAFKA-2023
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Anatoli Fomenko
>Assignee: Anatoly Fayngelerin
>Priority: Minor
> Attachments: KAFKA-2023.patch
>
>
> From http://kafka.apache.org/code.html: 
> Our code is kept in git. You can check it out like this:
>   git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> On CentOS 6.5:
> {code}
> $ git clone http://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> error: RPC failed; result=22, HTTP code = 405
> {code}
> while:
> {code}
> $ git clone https://git-wip-us.apache.org/repos/asf/kafka.git kafka
> Initialized empty Git repository in /home/anatoli/git/kafka/.git/
> remote: Counting objects: 24607, done.
> remote: Compressing objects: 100% (9212/9212), done.
> remote: Total 24607 (delta 14449), reused 19801 (delta 11465)
> Receiving objects: 100% (24607/24607), 15.61 MiB | 5.85 MiB/s, done.
> Resolving deltas: 100% (14449/14449), done.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 32172: Patch for KAFKA-1860

2015-03-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32172/#review76809
---


LGTM, let's try to reproduce the issue and see if it works as expected.

- Guozhang Wang


On March 17, 2015, 8:56 p.m., Mayuresh Gharat wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/32172/
> ---
> 
> (Updated March 17, 2015, 8:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1860
> https://issues.apache.org/jira/browse/KAFKA-1860
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> The JVM should stop if the underlying file system goes in to Read only mode
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
> 8c5b0546908d3b3affb9f48e2ece9ed252518783 
> 
> Diff: https://reviews.apache.org/r/32172/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>



Re: Review Request 31568: Patch for KAFKA-1989

2015-03-17 Thread Sriharsha Chintalapani


> On March 17, 2015, 8:56 p.m., Guozhang Wang wrote:
> > We can probably remove DelayedItem if it is not referenced by anyone any 
> > more.

I am using DelayedItem for KAFKA-1461.


- Sriharsha


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
---


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> ---
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
> https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> new purgatory implementation
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/31568/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Yasuhiro Matsuda
> 
>



[jira] [Updated] (KAFKA-1860) File system errors are not detected unless Kafka tries to write

2015-03-17 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-1860:
---
Status: Patch Available  (was: Open)

> File system errors are not detected unless Kafka tries to write
> ---
>
> Key: KAFKA-1860
> URL: https://issues.apache.org/jira/browse/KAFKA-1860
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0
>
> Attachments: KAFKA-1860.patch
>
>
> When the disk (raid with caches dir) dies on a Kafka broker, typically the 
> filesystem gets mounted into read-only mode, and hence when Kafka tries to 
> read the disk, they'll get a FileNotFoundException with the read-only errno 
> set (EROFS).
> However, as long as there is no produce request received, hence no writes 
> attempted on the disks, Kafka will not exit on such FATAL error (when the 
> disk starts working again, Kafka might think some files are gone while they 
> will reappear later as raid comes back online). Instead it keeps spilling 
> exceptions like:
> {code}
> 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] 
> [kafka-server] [] Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint'
> java.io.FileNotFoundException: 
> /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp 
> (Read-only file system)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:206)
>   at java.io.FileOutputStream.(FileOutputStream.java:156)
>   at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1860) File system errors are not detected unless Kafka tries to write

2015-03-17 Thread Mayuresh Gharat (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366069#comment-14366069
 ] 

Mayuresh Gharat commented on KAFKA-1860:


Created reviewboard https://reviews.apache.org/r/32172/diff/
 against branch origin/trunk

> File system errors are not detected unless Kafka tries to write
> ---
>
> Key: KAFKA-1860
> URL: https://issues.apache.org/jira/browse/KAFKA-1860
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0
>
> Attachments: KAFKA-1860.patch
>
>
> When the disk (raid with caches dir) dies on a Kafka broker, typically the 
> filesystem gets mounted into read-only mode, and hence when Kafka tries to 
> read the disk, they'll get a FileNotFoundException with the read-only errno 
> set (EROFS).
> However, as long as there is no produce request received, hence no writes 
> attempted on the disks, Kafka will not exit on such FATAL error (when the 
> disk starts working again, Kafka might think some files are gone while they 
> will reappear later as raid comes back online). Instead it keeps spilling 
> exceptions like:
> {code}
> 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] 
> [kafka-server] [] Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint'
> java.io.FileNotFoundException: 
> /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp 
> (Read-only file system)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:206)
>   at java.io.FileOutputStream.(FileOutputStream.java:156)
>   at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31568: Patch for KAFKA-1989

2015-03-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31568/#review76459
---


We can probably remove DelayedItem if it is not referenced by anyone any more.


core/src/main/scala/kafka/server/DelayedOperation.scala


Add "override" keyword to indicate it is extended from the Runnable. Also, 
the comments on top seems be referring to a variable, not a function, which is 
a bit misleading.



core/src/main/scala/kafka/server/DelayedOperation.scala


We need to make tickMs and wheelSize configurable.



core/src/main/scala/kafka/server/DelayedOperation.scala


TBD



core/src/main/scala/kafka/server/DelayedOperation.scala


It seems we do not need to keep this as a class member variable, but just 
compute the value in purge() on-the-fly every time.



core/src/main/scala/kafka/server/DelayedOperation.scala


Does it require to sync on refQueue as well?



core/src/main/scala/kafka/server/DelayedOperation.scala


It may be useful to return #.purged items?



core/src/main/scala/kafka/server/DelayedOperation.scala


The threshold should be configurable.



core/src/main/scala/kafka/utils/timer/Timer.scala


I think bucket.flush(reinsurt) will always fail on all the items since 
their expiration time will always < bucket expiration + ticketMs, i.e. the 
returned bucket from the delayed queue has already expired all its items. In 
this case, could we just call foreach(submit) on all of them instead of trying 
to reinsurt them?



core/src/main/scala/kafka/utils/timer/TimerTaskList.scala


It seems the task entry of the task will only be set once throughout its 
life time; even when the task entry gets reinsurted its correspondence to the 
task will not change, right?

If that is true we can just set the entry for the task in the constructor 
of the task entry.



core/src/main/scala/kafka/utils/timer/TimingWheel.scala


How about change this comment to:

We only need to enqueue the bucket when its expieration time has changed, 
i.e. the wheel has advanced one cycle and the previous buckets gets reused; 
further calls to set the expiration within the same wheel cycle will pass in 
the same value and hence return false, thus the bucket with the same expiration 
will not be enqueued multiple times.



core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala


Could we just add an atomic integer recording the list size and size() 
function to TimerTaskList?



core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala


latch.await(0, TimeUnit.SECONDS)?


Since this a rather complicated patch (even after reading the wiki page I took 
quite some time to get through the code), I would suggest adding more comments 
on the functions / member variables of each classes.

- Guozhang Wang


On Feb. 28, 2015, 12:14 a.m., Yasuhiro Matsuda wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31568/
> ---
> 
> (Updated Feb. 28, 2015, 12:14 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1989
> https://issues.apache.org/jira/browse/KAFKA-1989
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> new purgatory implementation
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/server/DelayedOperation.scala 
> e317676b4dd5bb5ad9770930e694cd7282d5b6d5 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> 586cf4caa95f58d5b2e6c7429732d25d2b3635c8 
>   core/src/main/scala/kafka/utils/SinglyLinkedWeakList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/Timer.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTask.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimerTaskList.scala PRE-CREATION 
>   core/src/main/scala/kafka/utils/timer/TimingWheel.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala 
> 7a37617395b9e4226853913b8989f42e7301de7c 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTaskListTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/utils/timer/TimerTest.scala PRE-CREATION 
> 
> Diff: https://reviews.apache.

Review Request 32172: Patch for KAFKA-1860

2015-03-17 Thread Mayuresh Gharat

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/32172/
---

Review request for kafka.


Bugs: KAFKA-1860
https://issues.apache.org/jira/browse/KAFKA-1860


Repository: kafka


Description
---

The JVM should stop if the underlying file system goes in to Read only mode


Diffs
-

  core/src/main/scala/kafka/server/OffsetCheckpoint.scala 
8c5b0546908d3b3affb9f48e2ece9ed252518783 

Diff: https://reviews.apache.org/r/32172/diff/


Testing
---


Thanks,

Mayuresh Gharat



[jira] [Updated] (KAFKA-1860) File system errors are not detected unless Kafka tries to write

2015-03-17 Thread Mayuresh Gharat (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-1860:
---
Attachment: KAFKA-1860.patch

> File system errors are not detected unless Kafka tries to write
> ---
>
> Key: KAFKA-1860
> URL: https://issues.apache.org/jira/browse/KAFKA-1860
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Mayuresh Gharat
> Fix For: 0.9.0
>
> Attachments: KAFKA-1860.patch
>
>
> When the disk (raid with caches dir) dies on a Kafka broker, typically the 
> filesystem gets mounted into read-only mode, and hence when Kafka tries to 
> read the disk, they'll get a FileNotFoundException with the read-only errno 
> set (EROFS).
> However, as long as there is no produce request received, hence no writes 
> attempted on the disks, Kafka will not exit on such FATAL error (when the 
> disk starts working again, Kafka might think some files are gone while they 
> will reappear later as raid comes back online). Instead it keeps spilling 
> exceptions like:
> {code}
> 2015/01/07 09:47:41.543 ERROR [KafkaScheduler] [kafka-scheduler-1] 
> [kafka-server] [] Uncaught exception in scheduled task 
> 'kafka-recovery-point-checkpoint'
> java.io.FileNotFoundException: 
> /export/content/kafka/i001_caches/recovery-point-offset-checkpoint.tmp 
> (Read-only file system)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:206)
>   at java.io.FileOutputStream.(FileOutputStream.java:156)
>   at kafka.server.OffsetCheckpoint.write(OffsetCheckpoint.scala:37)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31706: Patch for KAFKA-1997

2015-03-17 Thread Jiangjie Qin

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31706/
---

(Updated March 17, 2015, 8:47 p.m.)


Review request for kafka.


Bugs: KAFKA-1997
https://issues.apache.org/jira/browse/KAFKA-1997


Repository: kafka


Description (updated)
---

move shutdown hook registration before creating producer and consumer streams, 
so mirror maker can be shutdown correctly if error occur during consumer stream 
creation.


Diffs (updated)
-

  core/src/main/scala/kafka/tools/MirrorMaker.scala 
87b925c7332470de59a5b098d51c4bf0dfab8b8f 

Diff: https://reviews.apache.org/r/31706/diff/


Testing
---


Thanks,

Jiangjie Qin



[jira] [Updated] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-1997:

Attachment: KAFKA-1997_2015-03-17_13:47:01.patch

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, 
> KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, 
> KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, 
> KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, 
> KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, 
> KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1997) Refactor Mirror Maker

2015-03-17 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366052#comment-14366052
 ] 

Jiangjie Qin commented on KAFKA-1997:
-

Updated reviewboard https://reviews.apache.org/r/31706/diff/
 against branch origin/trunk

> Refactor Mirror Maker
> -
>
> Key: KAFKA-1997
> URL: https://issues.apache.org/jira/browse/KAFKA-1997
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Attachments: KAFKA-1997.patch, KAFKA-1997_2015-03-03_16:28:46.patch, 
> KAFKA-1997_2015-03-04_15:07:46.patch, KAFKA-1997_2015-03-04_15:42:45.patch, 
> KAFKA-1997_2015-03-05_20:14:58.patch, KAFKA-1997_2015-03-09_18:55:54.patch, 
> KAFKA-1997_2015-03-10_18:31:34.patch, KAFKA-1997_2015-03-11_15:20:18.patch, 
> KAFKA-1997_2015-03-11_19:10:53.patch, KAFKA-1997_2015-03-13_14:43:34.patch, 
> KAFKA-1997_2015-03-17_13:47:01.patch
>
>
> Refactor mirror maker based on KIP-3



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2028) Unable to start the ZK instance after myid file was missing and had to recreate it.

2015-03-17 Thread InduR (JIRA)
InduR created KAFKA-2028:


 Summary: Unable to start the ZK instance after myid file was 
missing and had to recreate it.
 Key: KAFKA-2028
 URL: https://issues.apache.org/jira/browse/KAFKA-2028
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 0.8.1.1
 Environment: Non Prod
Reporter: InduR


Created a Dev 3 node cluster environment in Jan and the environment has been up 
and running without any issues until few days.
 Kafka server stopped running but ZK listener was up .Noticed that the Myid 
file was missing in all 3 servers.
Recreated the file when ZK was still running did not help.
Stopped all of the ZK /kafka server instances and see the following error when 
starting ZK.

kafka_2.10-0.8.1.1
OS : RHEL
[root@lablx0025 bin]# ./zookeeper-server-start.sh 
../config/zookeeper.properties &
[1] 31053
[* bin]# [2015-03-17 15:04:33,876] INFO Reading configuration from: 
../config/zookeeper.properties (org.apache.zookeeper.   

server.quorum.QuorumPeerConfig)
[2015-03-17 15:04:33,885] INFO Defaulting to majority quorums 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)
[2015-03-17 15:04:33,911] DEBUG preRegister called. 
Server=com.sun.jmx.mbeanserver.JmxMBeanServer@4891d863, name=log4j:logger=kafka 
(k  
 afka)
[2015-03-17 15:04:33,915] INFO Starting quorum peer 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
[2015-03-17 15:04:33,940] INFO binding to port 0.0.0.0/0.0.0.0:2181 
(org.apache.zookeeper.server.NIOServerCnxn)
[2015-03-17 15:04:33,966] INFO tickTime set to 3000 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:33,966] INFO minSessionTimeout set to -1 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:33,966] INFO maxSessionTimeout set to -1 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:33,966] INFO initLimit set to 5 
(org.apache.zookeeper.server.quorum.QuorumPeer)
[2015-03-17 15:04:34,023] ERROR Failed to increment parent cversion for: 
/consumers/console-consumer-6249/offsets/test (org.apache.zoo   

keeper.server.persistence.FileTxnSnapLog)
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /consumers/console-consumer-6249/offsets/test
at 
org.apache.zookeeper.server.DataTree.incrementCversion(DataTree.java:1218)
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.processTransaction(FileTxnSnapLog.java:222)
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:150)
at 
org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
at 
org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
[2015-03-17 15:04:34,027] FATAL Unable to load database on disk 
(org.apache.zookeeper.server.quorum.QuorumPeer)
java.io.IOException: Failed to process transaction type: 2 error: 
KeeperErrorCode = NoNode for /consumers/console-consumer-6249/offset

   s/test
at 
org.apache.zookeeper.server.persistence.FileTxnSnapLog.restore(FileTxnSnapLog.java:152)
at 
org.apache.zookeeper.server.ZKDatabase.loadDataBase(ZKDatabase.java:222)
at 
org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:398)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
[2015-03-17 15:04:34,027] FATAL Unexpected exception, exiting abnormally 
(org.apache.zookeeper.server.quorum.QuorumPeerMain)
java.lang.RuntimeException: Unable to run quorum server
at 
org.apache.zookeeper.server.quorum.QuorumPeer.start(QuorumPeer.java:401)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.runFromConfig(QuorumPeerMain.java:143)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.initializeAndRun(QuorumPeerMain.java:103)
at 
org.apache.zookeeper.server.quorum.QuorumPeerMain.main(QuorumPeerMain.java:76)
Caused by: java.io.IOException: Failed to process transaction type: 2 error: 
KeeperErrorCode = NoNode for /consumers/

Re: Broker is not aware of new partitions assigned

2015-03-17 Thread Allen Wang
Looking a bit more into controller log, it seems that when the partition
assignment is changed in ZooKeeper, the controller has quite a lot
exceptions communicating with new brokers where the partitions are
assigned. One thing to note is that the new brokers have Kafka version
0.8.2.1 and the controller has Kafka version 0.8.1.1.

2015-03-16 22:36:58,178 WARN  kafka.utils.Logging$class:89
[Controller-2-to-broker-48-send-thread] [warn]
[Controller-2-to-broker-48-send-thread], Controller 2 fails to send a
request to broker id:48,host:xyz:7101
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Does it explain why the brokers are not aware of the new assignments? Is
there anyway to recover from this communication problem, like restarting
the controller?

Thanks,
Allen


On Tue, Mar 17, 2015 at 10:34 AM, Allen Wang  wrote:

> Hello,
>
> I developed a tool to add partitions and assign new partitions to a set of
> brokers in one operation by utilizing the API
> AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().
>
> It worked well in most cases. However, in one case, I found that the
> brokers are not aware of new partitions assigned to them, even though the
> zookeeper data clearly shows the assignment.
>
> Here is the zookeeper data for the partition:
>
>
> {"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}
>
> On broker 62, the error message is:
>
> 2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
> [kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
> correlation id 2048464 from client x on partition [m,71] failed due to
> Partition [m,71] doesn't exist on 62
>
> Here is the core function of the tool:
>
>   def addPartitionsToTopic(zkClient: ZkClient, topic: String,
> brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean):
> Unit = {
> val existingPartitionsReplicaList =
> ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
> val config = AdminUtils.fetchTopicConfig(zkClient, topic)
> printf("Topic config: %s\n\n", config)
> if (existingPartitionsReplicaList.size == 0)
>   throw new AdminOperationException("The topic %s does not
> exist".format(topic))
> val currentPartitions = existingPartitionsReplicaList.size
> val replicationFactor = existingPartitionsReplicaList.map(e =>
> e._2.size).max
> val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e =>
> e._2).toSet.toSeq
> if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
> 0) {
>   printf("Topic %s already has partitions on brokers %s. Skipping.\n",
> topic, brokersToAssignPartitions)
>   return
> }
> val totalBrokers = brokers.size
> val oldBrokers = totalBrokers - brokersToAssignPartitions.size
> if (oldBrokers == 0) {
>   throw new IllegalArgumentException("Cannot add partitions to new
> brokers without existing partitions")
> }
> val expectedPartitions = currentPartitions * totalBrokers / oldBrokers
> val newPartitions = expectedPartitions - currentPartitions
> if (newPartitions <= 0) {
>   throw new IllegalArgumentException("Invalid number of new partitions
> %d".format(newPartitions))
> }
> val newPartitionReplicaList =
> AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
> newPartitions, replicationFactor, startPartitionId = currentPartitions)
> val partitionReplicaList = existingPartitionsReplicaList.map(p =>
> p._1.partition -> p._2)
> // add the new list
> partitionReplicaList ++= newPartitionReplicaList
> printf("Changing number of partitions from %d to %d to topic %s\n\n",
> currentPartitions, expectedPartitions, topic)
> printf("Replica reassignment for new partitions:\n\n%s\n\n",
> getAssignmentJson(topic, newPartitionReplicaList))
> printf("Complete replica assignment:\n\n%s\n\n",
> getAssignmentJson(topic, partitionReplicaList))
> if (execute) {
>   AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
> topic, partitionReplicaList, config, update = true)
>   println("New partitions are added")
> } else {
>   println("No update is executed in dry run mode")
> }
>   }
>
> It seems to me that the new assignment in ZooKeeper data does not
> propagate to some of the new brokers. However, looking at TopicCommand, it
> uses the same AdminUtils function to add new partition

[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-17 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14366010#comment-14366010
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

I am looking for a common abstraction for all requests that are bound to have a 
Topic (or a list of topics) param and I dont think AbstractRequestResponse is 
that structure. I think it makes sense to have TopicRequestResponse extends 
AbstractRequestResponse which will have a list of topics in it and will be 
extended by all Topic related actions. 

> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1912) Create a simple request re-routing facility

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-1912:
-

Assignee: Sriharsha Chintalapani

> Create a simple request re-routing facility
> ---
>
> Key: KAFKA-1912
> URL: https://issues.apache.org/jira/browse/KAFKA-1912
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.3
>
>
> We are accumulating a lot of requests that have to be directed to the correct 
> server. This makes sense for high volume produce or fetch requests. But it is 
> silly to put the extra burden on the client for the many miscellaneous 
> requests such as fetching or committing offsets and so on.
> This adds a ton of practical complexity to the clients with little or no 
> payoff in performance.
> We should add a generic request-type agnostic re-routing facility on the 
> server. This would allow any server to accept a request and forward it to the 
> correct destination, proxying the response back to the user. Naturally it 
> needs to do this without blocking the thread.
> The result is that a client implementation can choose to be optimally 
> efficient and manage a local cache of cluster state and attempt to always 
> direct its requests to the proper server OR it can choose simplicity and just 
> send things all to a single host and let that host figure out where to 
> forward it.
> I actually think we should implement this more or less across the board, but 
> some requests such as produce and fetch require more logic to proxy since 
> they have to be scattered out to multiple servers and gathered back to create 
> the response. So these could be done in a second phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2007) update offsetrequest for more useful information we have on broker about partition

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-2007:
-

Assignee: Sriharsha Chintalapani

> update offsetrequest for more useful information we have on broker about 
> partition
> --
>
> Key: KAFKA-2007
> URL: https://issues.apache.org/jira/browse/KAFKA-2007
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joe Stein
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.3
>
>
> this will need a KIP
> via [~jkreps] in KIP-6 discussion about KAFKA-1694
> The other information that would be really useful to get would be
> information about partitions--how much data is in the partition, what are
> the segment offsets, what is the log-end offset (i.e. last offset), what is
> the compaction point, etc. I think that done right this would be the
> successor to the very awkward OffsetRequest we have today.
> This is not really blocking that ticket and could happen before/after and has 
> a lot of other useful purposes and is important to get done so tracking it 
> here in this JIRA.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1912) Create a simple request re-routing facility

2015-03-17 Thread Sriharsha Chintalapani (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani updated KAFKA-1912:
--
Assignee: (was: Sriharsha Chintalapani)

> Create a simple request re-routing facility
> ---
>
> Key: KAFKA-1912
> URL: https://issues.apache.org/jira/browse/KAFKA-1912
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.8.3
>
>
> We are accumulating a lot of requests that have to be directed to the correct 
> server. This makes sense for high volume produce or fetch requests. But it is 
> silly to put the extra burden on the client for the many miscellaneous 
> requests such as fetching or committing offsets and so on.
> This adds a ton of practical complexity to the clients with little or no 
> payoff in performance.
> We should add a generic request-type agnostic re-routing facility on the 
> server. This would allow any server to accept a request and forward it to the 
> correct destination, proxying the response back to the user. Naturally it 
> needs to do this without blocking the thread.
> The result is that a client implementation can choose to be optimally 
> efficient and manage a local cache of cluster state and attempt to always 
> direct its requests to the proper server OR it can choose simplicity and just 
> send things all to a single host and let that host figure out where to 
> forward it.
> I actually think we should implement this more or less across the board, but 
> some requests such as produce and fetch require more logic to proxy since 
> they have to be scattered out to multiple servers and gathered back to create 
> the response. So these could be done in a second phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-17 Thread Andrii Biletskyi
Guys,

Thanks for a great discussion!
Here are the actions points:

1. Q: Get rid of all scala requests objects, use java protocol definitions.
A: Gwen kindly took that (KAFKA-1927). It's important to speed up
review procedure
 there since this ticket blocks other important changes.

2. Q: Generic re-reroute facility vs client maintaining cluster state.
A: Jay has added pseudo code to KAFKA-1912 - need to consider whether
this will be
easy to implement as a server-side feature (comments are welcomed!).

3. Q: Controller field in wire protocol.
A: This might be useful for clients, add this to TopicMetadataResponse
(already in KIP).

4. Q: Decoupling topic creation from TMR.
A: I will add proposed by Jun solution (using clientId for that) to the
KIP.

5. Q: Bumping new versions of TMR vs grabbing all protocol changes in one
version.
A: It was decided to try to gather all changes to protocol (before
release).
In case of TMR it worth checking: KAFKA-2020 and KIP-13 (quotas)

6. Q: JSON lib is needed to deserialize user's input in CLI tool.
A: Use jackson for that, /tools project is a separate jar so shouldn't
be a big deal.

7.  Q: VerifyReassingPartitions vs generic status check command.
 A: For long-running requests like reassign partitions *progress* check
request is useful,
 it makes sense to introduce it.

 Please add, correct me if I missed something.

Thanks,
Andrii Biletskyi

On Tue, Mar 17, 2015 at 6:20 PM, Andrii Biletskyi <
andrii.bilets...@stealth.ly> wrote:

> Joel,
>
> You are right, I removed ClusterMetadata because we have partially
> what we need in TopicMetadata. Also, as Jay pointed out earlier, we
> would like to have "orthogonal" API, but at the same time we need
> to be backward compatible.
>
> But I like your idea and even have some other arguments for this option:
> There is also DescribeTopicRequest which was proposed in this KIP,
> it returns topic configs, partitions, replication factor plus partition
> ISR, ASR,
> leader replica. The later part is really already there in
> TopicMetadataRequest.
> So again we'll have to add stuff to TMR, not to duplicate some info in
> newly added requests. However, this way we'll end up with "monster"
> request which returns cluster metadata, topic replication and config info
> plus partition replication data. Seems logical to split TMR to
> - ClusterMetadata (brokers + controller, maybe smth else)
> - TopicMetadata (topic info + partition details)
> But since current TMR is involved in lots of places (including network
> client,
> as I understand) this might be very serious change and it probably makes
> sense to stick with current approach.
>
> Thanks,
> Andrii Biletskyi
>
>
> On Tue, Mar 17, 2015 at 5:29 PM, Joel Koshy  wrote:
>
>> I may be missing some context but hopefully this will also be covered
>> today: I thought the earlier proposal where there was an explicit
>> ClusterMetadata request was clearer and explicit. During the course of
>> this thread I think the conclusion was that the main need was for
>> controller information and that can be rolled into the topic metadata
>> response but that seems a bit irrelevant to topic metadata. FWIW I
>> think the full broker-list is also irrelevant to topic metadata, but
>> it is already there and in use. I think there is still room for an
>> explicit ClusterMetadata request since there may be other
>> cluster-level information that we may want to add over time (and that
>> have nothing to do with topic metadata).
>>
>> On Tue, Mar 17, 2015 at 02:45:30PM +0200, Andrii Biletskyi wrote:
>> > Jun,
>> >
>> > 101. Okay, if you say that such use case is important. I also think
>> > using clientId for these purposes is fine - if we already have this
>> field
>> > as part of all Wire protocol messages, why not use that.
>> > I will update KIP-4 page if nobody has other ideas (which may come up
>> > during the call today).
>> >
>> > 102.1 Agree, I'll update the KIP accordingly. I think we can add new,
>> > fine-grained error codes if some error code received in specific case
>> > won't give enough context to return a descriptive error message for
>> user.
>> >
>> > Look forward to discussing all outstanding issues in detail today during
>> > the call.
>> >
>> > Thanks,
>> > Andrii Biletskyi
>> >
>> >
>> >
>> > On Mon, Mar 16, 2015 at 10:59 PM, Jun Rao  wrote:
>> >
>> > > 101. There may be a use case where you only want the topics to be
>> created
>> > > manually by admins. Currently, you can do that by disabling auto topic
>> > > creation and issue topic creation from the TopicCommand. If we
>> disable auto
>> > > topic creation completely on the broker and don't have a way to
>> distinguish
>> > > between topic creation requests from the regular clients and the
>> admin, we
>> > > can't support manual topic creation any more. I was thinking that
>> another
>> > > way of distinguishing the clients making the topic creation requests
>> is
>> > > usi

[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-17 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365929#comment-14365929
 ] 

Gwen Shapira commented on KAFKA-1688:
-

We are moving all Requests and Responses to Java where currently they all 
extend AbstractRequestResponse.

Can you use this for the authorization code? or do think another level of 
abstraction specifically for topic request and responses is needed?

> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2016) RollingBounceTest takes long

2015-03-17 Thread Ted Malaska (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2016?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365893#comment-14365893
 ] 

Ted Malaska commented on KAFKA-2016:


I would love to help out on this issue if that is OK.

Jun is it possible to get listed as a contributor so I can assign this to 
myself.

Thanks

> RollingBounceTest takes long
> 
>
> Key: KAFKA-2016
> URL: https://issues.apache.org/jira/browse/KAFKA-2016
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
> Fix For: 0.8.3
>
>
> RollingBounceTest.testRollingBounce() currently takes about 48 secs. This is 
> a bit too long.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Review Request 31967: Patch for KAFKA-1546

2015-03-17 Thread Aditya Auradkar

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76781
---



core/src/main/scala/kafka/server/ReplicaManager.scala


Renamed.

We can use the raw offset directly, but in general the code is passing the 
LogOffsetMetadata objects and it made sense to remain consistent.


- Aditya Auradkar


On March 16, 2015, 6:32 p.m., Aditya Auradkar wrote:
> 
> ---
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> ---
> 
> (Updated March 16, 2015, 6:32 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
> https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> ---
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time 
> since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the 
> LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Diffs
> -
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala 
> bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 
> 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 
> 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 
> 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 
> 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
> c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 
> 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 
> efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> ---
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>



[jira] [Updated] (KAFKA-1926) Replace kafka.utils.Utils with o.a.k.common.utils.Utils

2015-03-17 Thread Gwen Shapira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1926?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira updated KAFKA-1926:

Assignee: Tong Li

> Replace kafka.utils.Utils with o.a.k.common.utils.Utils
> ---
>
> Key: KAFKA-1926
> URL: https://issues.apache.org/jira/browse/KAFKA-1926
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.2.0
>Reporter: Jay Kreps
>Assignee: Tong Li
>  Labels: newbie, patch
> Attachments: KAFKA-1926.patch, KAFKA-1926.patch, KAFKA-1926.patch
>
>
> There is currently a lot of duplication between the Utils class in common and 
> the one in core.
> Our plan has been to deprecate duplicate code in the server and replace it 
> with the new common code.
> As such we should evaluate each method in the scala Utils and do one of the 
> following:
> 1. Migrate it to o.a.k.common.utils.Utils if it is a sensible general purpose 
> utility in active use that is not Kafka-specific. If we migrate it we should 
> really think about the API and make sure there is some test coverage. A few 
> things in there are kind of funky and we shouldn't just blindly copy them 
> over.
> 2. Create a new class ServerUtils or ScalaUtils in kafka.utils that will hold 
> any utilities that really need to make use of Scala features to be convenient.
> 3. Delete it if it is not used, or has a bad api.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1982) change kafka.examples.Producer to use the new java producer

2015-03-17 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365881#comment-14365881
 ] 

Gwen Shapira commented on KAFKA-1982:
-

Pinging [~junrao] 
I think its ready for commit?

> change kafka.examples.Producer to use the new java producer
> ---
>
> Key: KAFKA-1982
> URL: https://issues.apache.org/jira/browse/KAFKA-1982
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.3
>Reporter: Jun Rao
>Assignee: Ashish K Singh
>  Labels: newbie
> Attachments: KAFKA-1982.patch, KAFKA-1982_2015-02-24_10:34:51.patch, 
> KAFKA-1982_2015-02-24_20:45:48.patch, KAFKA-1982_2015-02-24_20:48:12.patch, 
> KAFKA-1982_2015-02-27_11:08:34.patch, KAFKA-1982_2015-03-03_17:50:57.patch
>
>
> We need to change the example to use the new java producer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1961) Looks like its possible to delete _consumer_offsets topic

2015-03-17 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365875#comment-14365875
 ] 

Gwen Shapira commented on KAFKA-1961:
-

ping [~nehanarkhede] :)

I think this is ready for commit?

> Looks like its possible to delete _consumer_offsets topic
> -
>
> Key: KAFKA-1961
> URL: https://issues.apache.org/jira/browse/KAFKA-1961
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.0
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>  Labels: newbie
> Attachments: KAFKA-1961.3.patch, KAFKA-1961.4.patch
>
>
> Noticed that kafka-topics.sh --delete can successfully delete internal topics 
> (__consumer_offsets).
> I'm pretty sure we want to prevent that, to avoid users shooting themselves 
> in the foot.
> Topic admin command should check for internal topics, just like 
> ReplicaManager does and not let users delete them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1688) Add authorization interface and naive implementation

2015-03-17 Thread Parth Brahmbhatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365860#comment-14365860
 ] 

Parth Brahmbhatt commented on KAFKA-1688:
-

Another thing I wanted to bring up is having a Top level TopicRequest and 
TopicResponse classes/traits that extends RequestOrResponse. This will save a 
lot of duplication of code and will allow use cases like authorization to 
inject them self at toplevel.

> Add authorization interface and naive implementation
> 
>
> Key: KAFKA-1688
> URL: https://issues.apache.org/jira/browse/KAFKA-1688
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Jay Kreps
>Assignee: Parth Brahmbhatt
> Fix For: 0.8.3
>
>
> Add a PermissionManager interface as described here:
> https://cwiki.apache.org/confluence/display/KAFKA/Security
> (possibly there is a better name?)
> Implement calls to the PermissionsManager in KafkaApis for the main requests 
> (FetchRequest, ProduceRequest, etc). We will need to add a new error code and 
> exception to the protocol to indicate "permission denied".
> Add a server configuration to give the class you want to instantiate that 
> implements that interface. That class can define its own configuration 
> properties from the main config file.
> Provide a simple implementation of this interface which just takes a user and 
> ip whitelist and permits those in either of the whitelists to do anything, 
> and denies all others.
> Rather than writing an integration test for this class we can probably just 
> use this class for the TLS and SASL authentication testing.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2027) kafka never notifies the zookeeper client when a partition moved with due to an auto-rebalance (when auto.leader.rebalance.enable=true)

2015-03-17 Thread Sampath Reddy Lambu (JIRA)
Sampath Reddy Lambu created KAFKA-2027:
--

 Summary: kafka never notifies the zookeeper client when a 
partition moved with due to an auto-rebalance (when 
auto.leader.rebalance.enable=true)
 Key: KAFKA-2027
 URL: https://issues.apache.org/jira/browse/KAFKA-2027
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.8.1.1
 Environment: Kafka 0.8.1.1, Node.js & Mac OS
Reporter: Sampath Reddy Lambu
Assignee: Neha Narkhede
Priority: Blocker


I would like report an issue when auto.leader.rebalance.enable=true. Kafka 
never sends an event/notification to its zookeeper client after preferred 
election complete. This works fine with manual rebalance from CLI 
(kafka-preferred-replica-election.sh).

Initially i thought this issue was with Kafka-Node, but its not. 
An event should be emitted from zookeeper if any partition moved while 
preferred election.

Im working with kafka_2.9.2-0.8.1.1 (Broker's) & Kafka-Node(Node.JS).




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2026) Logging of unused options always shows null for the value and is misleading if the option is used by serializers

2015-03-17 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2026?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-2026:
-
Description: 
This is a really simple issue. When AbstractConfig logs unused messages, it 
gets the value from the parsed configs. Since those are generated from the 
ConfigDef, they value will not have been parsed or copied over from the 
original map. This is especially confusing if you've explicitly set an option 
to pass through to the serializers since you're always going to see these 
warnings in your log.

The simplest patch would grab the original value from this.originals. But now 
I'm not sure logging this makes sense at all anymore since configuring any 
serializer that has options that aren't in ProducerConfig will create a 
misleading warning message. Further, using AbstractConfig for your serializer 
implementation would cause all the producer's config settings to be logged as 
unused. Since a single set of properties is being used to configure multiple 
components, trying to log unused keys may not make sense anymore.

Example of confusion caused by this: 
http://mail-archives.apache.org/mod_mbox/kafka-users/201503.mbox/%3CCAPAVcJ8nwSVjia3%2BH893V%2B87StST6r0xN4O2ac8Es2bEXjv1OA%40mail.gmail.com%3E

  was:
This is a really simple issue. When AbstractConfig logs unused messages, it 
gets the value from the parsed configs. Since those are generated from the 
ConfigDef, they value will not have been parsed or copied over from the 
original map. This is especially confusing if you've explicitly set an option 
to pass through to the serializers since you're always going to see these 
warnings in your log.

The simplest patch would grab the original value from this.originals. But now 
I'm not sure logging this makes sense at all anymore since configuring any 
serializer that has options that aren't in ProducerConfig will create a 
misleading warning message. Further, using AbstractConfig for your serializer 
implementation would cause all the producer's config settings to be logged as 
unused. Since a single set of properties is being used to configure multiple 
components, trying to log unused keys may not make sense anymore.


> Logging of unused options always shows null for the value and is misleading 
> if the option is used by serializers
> 
>
> Key: KAFKA-2026
> URL: https://issues.apache.org/jira/browse/KAFKA-2026
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.1
>Reporter: Ewen Cheslack-Postava
>Assignee: Ewen Cheslack-Postava
>Priority: Trivial
> Fix For: 0.8.3
>
>
> This is a really simple issue. When AbstractConfig logs unused messages, it 
> gets the value from the parsed configs. Since those are generated from the 
> ConfigDef, they value will not have been parsed or copied over from the 
> original map. This is especially confusing if you've explicitly set an option 
> to pass through to the serializers since you're always going to see these 
> warnings in your log.
> The simplest patch would grab the original value from this.originals. But now 
> I'm not sure logging this makes sense at all anymore since configuring any 
> serializer that has options that aren't in ProducerConfig will create a 
> misleading warning message. Further, using AbstractConfig for your serializer 
> implementation would cause all the producer's config settings to be logged as 
> unused. Since a single set of properties is being used to configure multiple 
> components, trying to log unused keys may not make sense anymore.
> Example of confusion caused by this: 
> http://mail-archives.apache.org/mod_mbox/kafka-users/201503.mbox/%3CCAPAVcJ8nwSVjia3%2BH893V%2B87StST6r0xN4O2ac8Es2bEXjv1OA%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2026) Logging of unused options always shows null for the value and is misleading if the option is used by serializers

2015-03-17 Thread Ewen Cheslack-Postava (JIRA)
Ewen Cheslack-Postava created KAFKA-2026:


 Summary: Logging of unused options always shows null for the value 
and is misleading if the option is used by serializers
 Key: KAFKA-2026
 URL: https://issues.apache.org/jira/browse/KAFKA-2026
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.8.2.1
Reporter: Ewen Cheslack-Postava
Assignee: Ewen Cheslack-Postava
Priority: Trivial
 Fix For: 0.8.3


This is a really simple issue. When AbstractConfig logs unused messages, it 
gets the value from the parsed configs. Since those are generated from the 
ConfigDef, they value will not have been parsed or copied over from the 
original map. This is especially confusing if you've explicitly set an option 
to pass through to the serializers since you're always going to see these 
warnings in your log.

The simplest patch would grab the original value from this.originals. But now 
I'm not sure logging this makes sense at all anymore since configuring any 
serializer that has options that aren't in ProducerConfig will create a 
misleading warning message. Further, using AbstractConfig for your serializer 
implementation would cause all the producer's config settings to be logged as 
unused. Since a single set of properties is being used to configure multiple 
components, trying to log unused keys may not make sense anymore.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1912) Create a simple request re-routing facility

2015-03-17 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365766#comment-14365766
 ] 

Jay Kreps commented on KAFKA-1912:
--

I think this is easier than I thought.

We have an async api now in NetworkClient:
  send(ClientRequest request)

So re-routing is fairly straight-forward:
  reroutedRequest = new RequestSend(newDestinationNode, originalRequest.header, 
originalRequest.body);
  networkClient.send(new ClientRequest(time, true, reroutedRequest, 
new RequestCompletionHandler() {
  public void onComplete(ClientResponse response) {
responseQueue.add(response);
  }
}
  ));

The only issue with this is that  the network client currently requires a ready 
connection to do a send. I think we can fix this by making a small 
RequestRerouter shim that has a queue of unsent messages and a thread that does 
the ready check and send. Hopefully that makes sense, if not I can flesh it out 
a bit more.

> Create a simple request re-routing facility
> ---
>
> Key: KAFKA-1912
> URL: https://issues.apache.org/jira/browse/KAFKA-1912
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.8.3
>
>
> We are accumulating a lot of requests that have to be directed to the correct 
> server. This makes sense for high volume produce or fetch requests. But it is 
> silly to put the extra burden on the client for the many miscellaneous 
> requests such as fetching or committing offsets and so on.
> This adds a ton of practical complexity to the clients with little or no 
> payoff in performance.
> We should add a generic request-type agnostic re-routing facility on the 
> server. This would allow any server to accept a request and forward it to the 
> correct destination, proxying the response back to the user. Naturally it 
> needs to do this without blocking the thread.
> The result is that a client implementation can choose to be optimally 
> efficient and manage a local cache of cluster state and attempt to always 
> direct its requests to the proper server OR it can choose simplicity and just 
> send things all to a single host and let that host figure out where to 
> forward it.
> I actually think we should implement this more or less across the board, but 
> some requests such as produce and fetch require more logic to proxy since 
> they have to be scattered out to multiple servers and gathered back to create 
> the response. So these could be done in a second phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1912) Create a simple request re-routing facility

2015-03-17 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1912:
-
Fix Version/s: 0.8.3

> Create a simple request re-routing facility
> ---
>
> Key: KAFKA-1912
> URL: https://issues.apache.org/jira/browse/KAFKA-1912
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
> Fix For: 0.8.3
>
>
> We are accumulating a lot of requests that have to be directed to the correct 
> server. This makes sense for high volume produce or fetch requests. But it is 
> silly to put the extra burden on the client for the many miscellaneous 
> requests such as fetching or committing offsets and so on.
> This adds a ton of practical complexity to the clients with little or no 
> payoff in performance.
> We should add a generic request-type agnostic re-routing facility on the 
> server. This would allow any server to accept a request and forward it to the 
> correct destination, proxying the response back to the user. Naturally it 
> needs to do this without blocking the thread.
> The result is that a client implementation can choose to be optimally 
> efficient and manage a local cache of cluster state and attempt to always 
> direct its requests to the proper server OR it can choose simplicity and just 
> send things all to a single host and let that host figure out where to 
> forward it.
> I actually think we should implement this more or less across the board, but 
> some requests such as produce and fetch require more logic to proxy since 
> they have to be scattered out to multiple servers and gathered back to create 
> the response. So these could be done in a second phase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-17 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein updated KAFKA-1927:
-
Fix Version/s: 0.8.3

> Replace requests in kafka.api with requests in 
> org.apache.kafka.common.requests
> ---
>
> Key: KAFKA-1927
> URL: https://issues.apache.org/jira/browse/KAFKA-1927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
>
> The common package introduced a better way of defining requests using a new 
> protocol definition DSL and also includes wrapper objects for these.
> We should switch KafkaApis over to use these request definitions and consider 
> the scala classes deprecated (we probably need to retain some of them for a 
> while for the scala clients).
> This will be a big improvement because
> 1. We will have each request now defined in only one place (Protocol.java)
> 2. We will have built-in support for multi-version requests
> 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-1927) Replace requests in kafka.api with requests in org.apache.kafka.common.requests

2015-03-17 Thread Joe Stein (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1927?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joe Stein reassigned KAFKA-1927:


Assignee: Gwen Shapira

> Replace requests in kafka.api with requests in 
> org.apache.kafka.common.requests
> ---
>
> Key: KAFKA-1927
> URL: https://issues.apache.org/jira/browse/KAFKA-1927
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jay Kreps
>Assignee: Gwen Shapira
> Fix For: 0.8.3
>
>
> The common package introduced a better way of defining requests using a new 
> protocol definition DSL and also includes wrapper objects for these.
> We should switch KafkaApis over to use these request definitions and consider 
> the scala classes deprecated (we probably need to retain some of them for a 
> while for the scala clients).
> This will be a big improvement because
> 1. We will have each request now defined in only one place (Protocol.java)
> 2. We will have built-in support for multi-version requests
> 3. We will have much better error messages (no more cryptic underflow errors)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Broker is not aware of new partitions assigned

2015-03-17 Thread Allen Wang
Hello,

I developed a tool to add partitions and assign new partitions to a set of
brokers in one operation by utilizing the API
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK().

It worked well in most cases. However, in one case, I found that the
brokers are not aware of new partitions assigned to them, even though the
zookeeper data clearly shows the assignment.

Here is the zookeeper data for the partition:

{"controller_epoch":6,"leader":62,"version":1,"leader_epoch":0,"isr":[62,74]}

On broker 62, the error message is:

2015-03-17 17:11:57,157 WARN  kafka.utils.Logging$class:83
[kafka-request-handler-7] [warn] [KafkaApi-62] Produce request with
correlation id 2048464 from client x on partition [m,71] failed due to
Partition [m,71] doesn't exist on 62

Here is the core function of the tool:

  def addPartitionsToTopic(zkClient: ZkClient, topic: String,
brokersToAssignPartitions: Seq[Int], brokers: Seq[Int], execute: Boolean):
Unit = {
val existingPartitionsReplicaList =
ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
val config = AdminUtils.fetchTopicConfig(zkClient, topic)
printf("Topic config: %s\n\n", config)
if (existingPartitionsReplicaList.size == 0)
  throw new AdminOperationException("The topic %s does not
exist".format(topic))
val currentPartitions = existingPartitionsReplicaList.size
val replicationFactor = existingPartitionsReplicaList.map(e =>
e._2.size).max
val brokersWithPartitions = existingPartitionsReplicaList.flatMap(e =>
e._2).toSet.toSeq
if (brokersToAssignPartitions.intersect(brokersWithPartitions).size >
0) {
  printf("Topic %s already has partitions on brokers %s. Skipping.\n",
topic, brokersToAssignPartitions)
  return
}
val totalBrokers = brokers.size
val oldBrokers = totalBrokers - brokersToAssignPartitions.size
if (oldBrokers == 0) {
  throw new IllegalArgumentException("Cannot add partitions to new
brokers without existing partitions")
}
val expectedPartitions = currentPartitions * totalBrokers / oldBrokers
val newPartitions = expectedPartitions - currentPartitions
if (newPartitions <= 0) {
  throw new IllegalArgumentException("Invalid number of new partitions
%d".format(newPartitions))
}
val newPartitionReplicaList =
AdminUtils.assignReplicasToBrokers(brokersToAssignPartitions,
newPartitions, replicationFactor, startPartitionId = currentPartitions)
val partitionReplicaList = existingPartitionsReplicaList.map(p =>
p._1.partition -> p._2)
// add the new list
partitionReplicaList ++= newPartitionReplicaList
printf("Changing number of partitions from %d to %d to topic %s\n\n",
currentPartitions, expectedPartitions, topic)
printf("Replica reassignment for new partitions:\n\n%s\n\n",
getAssignmentJson(topic, newPartitionReplicaList))
printf("Complete replica assignment:\n\n%s\n\n",
getAssignmentJson(topic, partitionReplicaList))
if (execute) {
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient,
topic, partitionReplicaList, config, update = true)
  println("New partitions are added")
} else {
  println("No update is executed in dry run mode")
}
  }

It seems to me that the new assignment in ZooKeeper data does not propagate
to some of the new brokers. However, looking at TopicCommand, it uses the
same AdminUtils function to add new partitions.

Am I missing anything or this is a bug in the broker?

Thanks,
Allen


Re: [DISCUSSION] KIP-15 close(timeout) for producer

2015-03-17 Thread Jiangjie Qin
Hi Jun,

Yes, as Guozhang said, the main reason we set a flag is because close(0)
is expected to be called by sender thread itself.
If we want to maintain the semantic meaning of close(), one alternative is
to have an abort() method does the same thing as close(0) except cleanup.
And in close(timeout), after timeout we call abort() and join the sender
thread. This was one of the previous proposal. We merged abort to close(0)
because they are almost doing the same thing. But from what you mentioned,
it might make sense to have two separate methods.

Thanks.

Jiangjie (Becket) Qin

On 3/16/15, 10:31 PM, "Guozhang Wang"  wrote:

>Yeah in this sense the sender thread will not exist immediately in the
>close(0) call, but will only terminate after the current response batch
>has
>been processed, as will the producer instance itself.
>
>There is a reason for this though: for a clean shutdown the caller thread
>has to wait for the sender thread to join before closing the producer
>instance, but this cannot be achieve if close(0) is called by the sender
>thread itself (for example in KAFKA-1659, there is a proposal from Andrew
>Stein on using thread.interrupt and thread.stop, but if it is called by
>the
>ioThread itself the stop call will fail). Hence we came up with the flag
>approach to let the sender thread to close as soon as it is at the barrier
>of the run loop.
>
>Guozhang
>
>On Mon, Mar 16, 2015 at 9:41 PM, Jun Rao  wrote:
>
>> Hmm, does that mean that after close(0), the sender thread is not
>>necessary
>> gone? Normally, after closing an entity, we expect all internal threads
>> associated with the entity are shut down completely.
>>
>> Thanks,
>>
>> Jun
>>
>> On Mon, Mar 16, 2015 at 3:18 PM, Jiangjie Qin
>>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Close(0) will set two flags in sender. Running=false and a newly added
>> > forceClose=true. It will also set accumulator.closed=true so no
>>further
>> > producer.send() will succeed.
>> > The sender thread will finish executing all the callbacks in current
>> batch
>> > of responses, then it will see the forceClose flag. It will just fail
>>all
>> > the incomplete batches in the producer and exit.
>> > So close(0) is a non-blocking call and sender thread will not try to
>>join
>> > itself in close(0).
>> >
>> > Thanks.
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On 3/16/15, 2:50 PM, "Jun Rao"  wrote:
>> >
>> > >How does close(0) work if it's called from the sender thread? If
>> close(0)
>> > >needs to wait for the sender thread to join, wouldn't this cause a
>> > >deadlock?
>> > >
>> > >Thanks,
>> > >
>> > >Jun
>> > >
>> > >On Mon, Mar 16, 2015 at 2:26 PM, Jiangjie Qin
>>> >
>> > >wrote:
>> > >
>> > >> Thanks Guozhang. It wouldn’t be as thoroughly considered without
>> > >> discussing with you :)
>> > >>
>> > >> Jiangjie (Becket) Qin
>> > >>
>> > >> On 3/16/15, 1:07 PM, "Guozhang Wang"  wrote:
>> > >>
>> > >> >Thanks Jiangjie,
>> > >> >
>> > >> >After talking to you offline on this, I have been convinced and
>> > >>changed my
>> > >> >preference to blocking. The immediate shutdown approach does have
>> some
>> > >> >unsafeness in some cases.
>> > >> >
>> > >> >Guozhang
>> > >> >
>> > >> >On Mon, Mar 16, 2015 at 11:50 AM, Jiangjie Qin
>> > >>> > >> >
>> > >> >wrote:
>> > >> >
>> > >> >> It looks that the problem we want to solve and the purpose we
>>want
>> to
>> > >> >> achieve is:
>> > >> >> If user uses close() in callback, we want to let user be aware
>>that
>> > >>they
>> > >> >> should use close(0) instead of close() in the callback.
>> > >> >>
>> > >> >> We have agreed that we will have an error log to inform user
>>about
>> > >>this
>> > >> >> mis-usage. The options differ in the way how we can force user
>>to
>> > >>take a
>> > >> >> look at that error log.
>> > >> >> There are two scenarios:
>> > >> >> 1. User does not expect the program to exit.
>> > >> >> 2. User expect the program to exit.
>> > >> >>
>> > >> >> For scenario 1), blocking will probably delay the discovery of
>>the
>> > >> >> problem. Calling close(0) exposes the problem quicker. In this
>> > >>scenario
>> > >> >> producer just encounter a send failure when running normally.
>> > >> >> For scenario 2), blocking will expose the problem quick. Calling
>> > >> >>close(-1)
>> > >> >> might hide the problem. This scenario might include: a) Unit
>>test
>> > >>for a
>> > >> >> send failure. b) Message sending during a close() call from a
>>user
>> > >> >>thread.
>> > >> >>
>> > >> >> So as a summary table:
>> > >> >>
>> > >> >>   Scenario 1) Scenario
>>2)
>> > >> >>
>> > >> >> Blocking  Delay problem discovery Guaranteed problem
>> > >> >>discovery
>> > >> >>
>> > >> >> Close(-1) Immediate problem discovery Problem might be
>> hidden
>> > >> >>
>> > >> >>
>> > >> >> Personally I prefer blocking because it seems providing more
>> > >>guarantees
>> > >> >> and safer.
>> > >> >>
>> > >> >> Thanks.
>> > >> >>
>> > >> >> Jiangjie (Becket) Qin

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
Jay,

let's say an app produces to 10 different topics. one of the topic is sent
from a library. due to whatever condition/bug, this lib starts to send
messages over the quota. if we go with the delayed response approach, it
will cause the whole shared RecordAccumulator buffer to be filled up. that
will penalize other 9 topics who are within the quota. that is the
unfairness point that Ewen and I were trying to make.

if broker just drop the msg and return an error/status code indicates the
drop and why. then producer can just move on and accept the drop. shared
buffer won't be saturated and other 9 topics won't be penalized.

Thanks,
Steven



On Tue, Mar 17, 2015 at 9:44 AM, Jay Kreps  wrote:

> Hey Steven,
>
> It is true that hitting the quota will cause back-pressure on the producer.
> But the solution is simple, a producer that wants to avoid this should stay
> under its quota. In other words this is a contract between the cluster and
> the client, with each side having something to uphold. Quite possibly the
> same thing will happen in the absence of a quota, a client that produces an
> unexpected amount of load will hit the limits of the server and experience
> backpressure. Quotas just allow you to set that same limit at something
> lower than 100% of all resources on the server, which is useful for a
> shared cluster.
>
> -Jay
>
> On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu  wrote:
>
> > wait. we create one kafka producer for each cluster. each cluster can
> have
> > many topics. if producer buffer got filled up due to delayed response for
> > one throttled topic, won't that penalize other topics unfairly? it seems
> to
> > me that broker should just return error without delay.
> >
> > sorry that I am chatting to myself :)
> >
> > On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu 
> wrote:
> >
> > > I think I can answer my own question. delayed response will cause the
> > > producer buffer to be full, which then result in either thread blocking
> > or
> > > message drop.
> > >
> > > On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
> > wrote:
> > >
> > >> please correct me if I am missing sth here. I am not understanding how
> > >> would throttle work without cooperation/back-off from producer. new
> Java
> > >> producer supports non-blocking API. why would delayed response be able
> > to
> > >> slow down producer? producer will continue to fire async sends.
> > >>
> > >> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
> > >> wrote:
> > >>
> > >>> I think we are really discussing two separate issues here:
> > >>>
> > >>> 1. Whether we should a) append-then-block-then-returnOKButThrottled
> or
> > b)
> > >>> block-then-returnFailDuetoThrottled for quota actions on produce
> > >>> requests.
> > >>>
> > >>> Both these approaches assume some kind of well-behaveness of the
> > clients:
> > >>> option a) assumes the client sets an proper timeout value while can
> > just
> > >>> ignore "OKButThrottled" response, while option b) assumes the client
> > >>> handles the "FailDuetoThrottled" appropriately. For any malicious
> > clients
> > >>> that, for example, just keep retrying either intentionally or not,
> > >>> neither
> > >>> of these approaches are actually effective.
> > >>>
> > >>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we
> > >>> encode
> > >>> them as error codes or augment the protocol to use a separate field
> > >>> indicating "status codes".
> > >>>
> > >>> Today we have already incorporated some status code as error codes in
> > the
> > >>> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of
> > this
> > >>> is of course using a single field for response status like the HTTP
> > >>> status
> > >>> codes, while the cons is that it requires clients to handle the error
> > >>> codes
> > >>> carefully.
> > >>>
> > >>> I think maybe we can actually extend the single-code approach to
> > overcome
> > >>> its drawbacks, that is, wrap the error codes semantics to the users
> so
> > >>> that
> > >>> users do not need to handle the codes one-by-one. More concretely,
> > >>> following Jay's example the client could write sth. like this:
> > >>>
> > >>>
> > >>> -
> > >>>
> > >>>   if(error.isOK())
> > >>>  // status code is good or the code can be simply ignored for
> this
> > >>> request type, process the request
> > >>>   else if(error.needsRetry())
> > >>>  // throttled, transient error, etc: retry
> > >>>   else if(error.isFatal())
> > >>>  // non-retriable errors, etc: notify / terminate / other
> handling
> > >>>
> > >>> -
> > >>>
> > >>> Only when the clients really want to handle, for example
> > >>> FailDuetoThrottled
> > >>> status code specifically, it needs to:
> > >>>
> > >>>   if(error.isOK())
> > >>>  // status code is good or the code can be simply ignored for
> this
> > >>> request type, process the request
> > >>>   else if(error == FailDuetoThrottled )
> > >>>  // throttled: log it
> > >>>   else if(er

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Jay Kreps
Hey Guozhang,

Cool, I think we are mostly on the same page. I think I agree with what you
are saying, the error code thing is basically a matter of taste. It would
be possible to put all kinds of things as error codes and it would be
possible to devise a scheme for clients to deal with it--for example Gwen's
reserved error ranges. As a matter of taste I really do think a protocol
with a clear definition of error as "could not do what you asked, here is
the reason" is preferable to this. Basically what I am advocating is (1)
reversing the existing case where we used an error code to encode side
information, and (2) not doing that any more and instead using dedicated
fields in the response, and (3) adding a clear definition of error to the
protocol definition that formalizes this for client developers.

I think I am particularly sensitive on this point because I spent a lot of
time on clients (as you did) and error handling was really like 50% of the
effort. I also just think that having a clear, well-designed, tasteful
protocol is an asset in its own right.

-Jay

On Tue, Mar 17, 2015 at 8:25 AM, Guozhang Wang  wrote:

> Ewen,
>
> 1. I think we are on the same page as per "malicious clients", that it
> should not be the target of either approach. I was just trying to separate
> the discussion from "what if user just keep retrying" and maybe I was not
> clear.
>
> 2. I was not advocating option A on the wiki, in my previous email I
> actually assume that option is already dropped and we are only considering
> option B (which is my option b) in the email) and C (option a) in my
> email), and I think with some proper wrapping of "status codes" (today we
> still call them error codes) option B in the wiki may not necessarily
> require people who implement clients to handle each status code one-by-one.
>
> Guozhang
>
> On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava  >
> wrote:
>
> > Steven - that's a reasonable concern. I think I've mentioned the same
> sort
> > of issue in the issues about the new producer's RecordAccumulator not
> > timing out sends, e.g. in
> https://issues.apache.org/jira/browse/KAFKA-1788
> > .
> > The shared buffer causes problems if one broker isn't available for
> awhile
> > since messages to that broker end up consuming the entire buffer. You can
> > end up with a similar problem here due to the effective rate limiting
> > caused by delaying responses.
> >
> > Guozhang - I think only option A from the KIP is actually an error. If we
> > want to look to HTTP for examples, there's an RFC that defines the Too
> Many
> > Requests response to handle rate limiting:
> > http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is
> an
> > error, specifically a client error since its in the 400 range.The
> > implication from the status code (429), name of the response, and the
> > example given is that is is an error and no real data is returned, which
> > would correspond to option A from the KIP. Note that the protocol
> provides
> > a mechanism for giving extra (optional) information about when you should
> > retry (via headers). I'd guess that even despite that, most systems that
> > encounter a 429 use some ad hoc backoff mechanism because they only try
> to
> > detect anything in the 400 range...
> >
> > One additional point -- I think "malicious clients" shouldn't be our
> target
> > here, they can do a lot worse than what's been addressed in this thread.
> > But I do agree that any proposal should have a clear explanation of how
> > existing clients that are ignorant of quotas would behave (which is why
> > options b and c make a lot of sense -- they rate limit without requiring
> an
> > update to normally-behaving clients).
> >
> >
> > On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu 
> wrote:
> >
> > > wait. we create one kafka producer for each cluster. each cluster can
> > have
> > > many topics. if producer buffer got filled up due to delayed response
> for
> > > one throttled topic, won't that penalize other topics unfairly? it
> seems
> > to
> > > me that broker should just return error without delay.
> > >
> > > sorry that I am chatting to myself :)
> > >
> > > On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu 
> > wrote:
> > >
> > > > I think I can answer my own question. delayed response will cause the
> > > > producer buffer to be full, which then result in either thread
> blocking
> > > or
> > > > message drop.
> > > >
> > > > On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
> > > wrote:
> > > >
> > > >> please correct me if I am missing sth here. I am not understanding
> how
> > > >> would throttle work without cooperation/back-off from producer. new
> > Java
> > > >> producer supports non-blocking API. why would delayed response be
> able
> > > to
> > > >> slow down producer? producer will continue to fire async sends.
> > > >>
> > > >> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang  >
> > > >> wrote:
> > > >>
> > > >>> I think we are really discussing two separate i

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Jay Kreps
Hey Steven,

It is true that hitting the quota will cause back-pressure on the producer.
But the solution is simple, a producer that wants to avoid this should stay
under its quota. In other words this is a contract between the cluster and
the client, with each side having something to uphold. Quite possibly the
same thing will happen in the absence of a quota, a client that produces an
unexpected amount of load will hit the limits of the server and experience
backpressure. Quotas just allow you to set that same limit at something
lower than 100% of all resources on the server, which is useful for a
shared cluster.

-Jay

On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu  wrote:

> wait. we create one kafka producer for each cluster. each cluster can have
> many topics. if producer buffer got filled up due to delayed response for
> one throttled topic, won't that penalize other topics unfairly? it seems to
> me that broker should just return error without delay.
>
> sorry that I am chatting to myself :)
>
> On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu  wrote:
>
> > I think I can answer my own question. delayed response will cause the
> > producer buffer to be full, which then result in either thread blocking
> or
> > message drop.
> >
> > On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
> wrote:
> >
> >> please correct me if I am missing sth here. I am not understanding how
> >> would throttle work without cooperation/back-off from producer. new Java
> >> producer supports non-blocking API. why would delayed response be able
> to
> >> slow down producer? producer will continue to fire async sends.
> >>
> >> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
> >> wrote:
> >>
> >>> I think we are really discussing two separate issues here:
> >>>
> >>> 1. Whether we should a) append-then-block-then-returnOKButThrottled or
> b)
> >>> block-then-returnFailDuetoThrottled for quota actions on produce
> >>> requests.
> >>>
> >>> Both these approaches assume some kind of well-behaveness of the
> clients:
> >>> option a) assumes the client sets an proper timeout value while can
> just
> >>> ignore "OKButThrottled" response, while option b) assumes the client
> >>> handles the "FailDuetoThrottled" appropriately. For any malicious
> clients
> >>> that, for example, just keep retrying either intentionally or not,
> >>> neither
> >>> of these approaches are actually effective.
> >>>
> >>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we
> >>> encode
> >>> them as error codes or augment the protocol to use a separate field
> >>> indicating "status codes".
> >>>
> >>> Today we have already incorporated some status code as error codes in
> the
> >>> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of
> this
> >>> is of course using a single field for response status like the HTTP
> >>> status
> >>> codes, while the cons is that it requires clients to handle the error
> >>> codes
> >>> carefully.
> >>>
> >>> I think maybe we can actually extend the single-code approach to
> overcome
> >>> its drawbacks, that is, wrap the error codes semantics to the users so
> >>> that
> >>> users do not need to handle the codes one-by-one. More concretely,
> >>> following Jay's example the client could write sth. like this:
> >>>
> >>>
> >>> -
> >>>
> >>>   if(error.isOK())
> >>>  // status code is good or the code can be simply ignored for this
> >>> request type, process the request
> >>>   else if(error.needsRetry())
> >>>  // throttled, transient error, etc: retry
> >>>   else if(error.isFatal())
> >>>  // non-retriable errors, etc: notify / terminate / other handling
> >>>
> >>> -
> >>>
> >>> Only when the clients really want to handle, for example
> >>> FailDuetoThrottled
> >>> status code specifically, it needs to:
> >>>
> >>>   if(error.isOK())
> >>>  // status code is good or the code can be simply ignored for this
> >>> request type, process the request
> >>>   else if(error == FailDuetoThrottled )
> >>>  // throttled: log it
> >>>   else if(error.needsRetry())
> >>>  // transient error, etc: retry
> >>>   else if(error.isFatal())
> >>>  // non-retriable errors, etc: notify / terminate / other handling
> >>>
> >>> -
> >>>
> >>> And for implementation we can probably group the codes accordingly like
> >>> HTTP status code such that we can do:
> >>>
> >>> boolean Error.isOK() {
> >>>   return code < 300 && code >= 200;
> >>> }
> >>>
> >>> Guozhang
> >>>
> >>> On Mon, Mar 16, 2015 at 10:24 PM, Ewen Cheslack-Postava <
> >>> e...@confluent.io>
> >>> wrote:
> >>>
> >>> > Agreed that trying to shoehorn non-error codes into the error field
> is
> >>> a
> >>> > bad idea. It makes it *way* too easy to write code that looks (and
> >>> should
> >>> > be) correct but is actually incorrect. If necessary, I think it's
> much
> >>> > better to to spend a couple of extra bytes to encode that information
> >>> > separately (a "status" or "warning" s

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Joel Koshy
Yes I think there are two policy issues but I don't think they are
separate/mutually exclusive for the purposes of this discussion - the
first being what should the broker do if quota is violated and second,
what should it return to the client (error code or status code). (The
separate discussion unrelated to quotas is in general do we want to
include status code with errors - e.g., ReplicaNotAvailable in topic
metadata response - and I think everyone agrees that we should not.)

When we first started this thread I was leaning more toward error code
and immediate response (so the client can react accordingly), but
after this discussion I'm no longer convinced about that. (The error
code is appropriate I think in this case since the request is actually
dropped due to a quota violation.) An "issue" with this is that the
broker cannot effectively protect itself against "simple" clients that
don't back off properly. I actually think this may not be a huge issue
though because regardless of quotas there needs to be lower-level
protection against DoS - i.e., this applies even for the hold and
respond approach. Something other than a Kafka client can flood a
network. If the policy is to reject the request and respond
immediately (or wait up to current request timeout) on quota violation
then an error code is appropriate (since the append was rejected).

With the second approach, (for producer request do an append, hold
request and then respond), an error code does not really make sense.
The main concern here is request timeout. I agree with Jay that if we
improve the semantics of timeout (possibly adding a separate request
timeout) then this approach would be less controversial. i.e., for
producer requests there should be two timeouts - replication timeout
and request timeout, the latter being very large. One nuance to this
is I think it should be a broker-side setting (not client-side) that
needs to be communicated to the client somehow since the client needs
to know in advance a ceiling on how long it can expect to wait for a
response. So if the request succeeds immediately or fails due to a
usual error (e.g., slow replica and therefore replication timeout) the
client will get a response within the replication timeout. Otherwise,
it may block until the full request timeout if quota is violated.

Both approaches ideally need some negotiation - in the first approach,
the client should ideally be told its current quota from which it can
estimate how long it should ideally back off. In the second approach,
the client needs to know how long a request may be held and the broker
enforce backoff up to this limit on quota violations. The latter seems
simpler for client implementation.

Thanks,

Joel

On Mon, Mar 16, 2015 at 10:58:02PM -0700, Guozhang Wang wrote:
> I think we are really discussing two separate issues here:
> 
> 1. Whether we should a) append-then-block-then-returnOKButThrottled or b)
> block-then-returnFailDuetoThrottled for quota actions on produce requests.
> 
> Both these approaches assume some kind of well-behaveness of the clients:
> option a) assumes the client sets an proper timeout value while can just
> ignore "OKButThrottled" response, while option b) assumes the client
> handles the "FailDuetoThrottled" appropriately. For any malicious clients
> that, for example, just keep retrying either intentionally or not, neither
> of these approaches are actually effective.
> 
> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we encode
> them as error codes or augment the protocol to use a separate field
> indicating "status codes".
> 
> Today we have already incorporated some status code as error codes in the
> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of this
> is of course using a single field for response status like the HTTP status
> codes, while the cons is that it requires clients to handle the error codes
> carefully.
> 
> I think maybe we can actually extend the single-code approach to overcome
> its drawbacks, that is, wrap the error codes semantics to the users so that
> users do not need to handle the codes one-by-one. More concretely,
> following Jay's example the client could write sth. like this:
> 
> 
> -
> 
>   if(error.isOK())
>  // status code is good or the code can be simply ignored for this
> request type, process the request
>   else if(error.needsRetry())
>  // throttled, transient error, etc: retry
>   else if(error.isFatal())
>  // non-retriable errors, etc: notify / terminate / other handling
> 
> -
> 
> Only when the clients really want to handle, for example FailDuetoThrottled
> status code specifically, it needs to:
> 
>   if(error.isOK())
>  // status code is good or the code can be simply ignored for this
> request type, process the request
>   else if(error == FailDuetoThrottled )
>  // throttled: log it
>   else if(error.needsRetry())
>  // transient error, etc: retry
>   e

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-17 Thread Andrii Biletskyi
Joel,

You are right, I removed ClusterMetadata because we have partially
what we need in TopicMetadata. Also, as Jay pointed out earlier, we
would like to have "orthogonal" API, but at the same time we need
to be backward compatible.

But I like your idea and even have some other arguments for this option:
There is also DescribeTopicRequest which was proposed in this KIP,
it returns topic configs, partitions, replication factor plus partition
ISR, ASR,
leader replica. The later part is really already there in
TopicMetadataRequest.
So again we'll have to add stuff to TMR, not to duplicate some info in
newly added requests. However, this way we'll end up with "monster"
request which returns cluster metadata, topic replication and config info
plus partition replication data. Seems logical to split TMR to
- ClusterMetadata (brokers + controller, maybe smth else)
- TopicMetadata (topic info + partition details)
But since current TMR is involved in lots of places (including network
client,
as I understand) this might be very serious change and it probably makes
sense to stick with current approach.

Thanks,
Andrii Biletskyi


On Tue, Mar 17, 2015 at 5:29 PM, Joel Koshy  wrote:

> I may be missing some context but hopefully this will also be covered
> today: I thought the earlier proposal where there was an explicit
> ClusterMetadata request was clearer and explicit. During the course of
> this thread I think the conclusion was that the main need was for
> controller information and that can be rolled into the topic metadata
> response but that seems a bit irrelevant to topic metadata. FWIW I
> think the full broker-list is also irrelevant to topic metadata, but
> it is already there and in use. I think there is still room for an
> explicit ClusterMetadata request since there may be other
> cluster-level information that we may want to add over time (and that
> have nothing to do with topic metadata).
>
> On Tue, Mar 17, 2015 at 02:45:30PM +0200, Andrii Biletskyi wrote:
> > Jun,
> >
> > 101. Okay, if you say that such use case is important. I also think
> > using clientId for these purposes is fine - if we already have this field
> > as part of all Wire protocol messages, why not use that.
> > I will update KIP-4 page if nobody has other ideas (which may come up
> > during the call today).
> >
> > 102.1 Agree, I'll update the KIP accordingly. I think we can add new,
> > fine-grained error codes if some error code received in specific case
> > won't give enough context to return a descriptive error message for user.
> >
> > Look forward to discussing all outstanding issues in detail today during
> > the call.
> >
> > Thanks,
> > Andrii Biletskyi
> >
> >
> >
> > On Mon, Mar 16, 2015 at 10:59 PM, Jun Rao  wrote:
> >
> > > 101. There may be a use case where you only want the topics to be
> created
> > > manually by admins. Currently, you can do that by disabling auto topic
> > > creation and issue topic creation from the TopicCommand. If we disable
> auto
> > > topic creation completely on the broker and don't have a way to
> distinguish
> > > between topic creation requests from the regular clients and the
> admin, we
> > > can't support manual topic creation any more. I was thinking that
> another
> > > way of distinguishing the clients making the topic creation requests is
> > > using clientId. For example, the admin tool can set it to something
> like
> > > admin and the broker can treat that clientId specially.
> > >
> > > Also, there is a related discussion in KAFKA-2020. Currently, we do the
> > > following in TopicMetadataResponse:
> > >
> > > 1. If leader is not available, we set the partition level error code to
> > > LeaderNotAvailable.
> > > 2. If a non-leader replica is not available, we take that replica out
> of
> > > the assigned replica list and isr in the response. As an indication for
> > > doing that, we set the partition level error code to
> ReplicaNotAvailable.
> > >
> > > This has a few problems. First, ReplicaNotAvailable probably shouldn't
> be
> > > an error, at least for the normal producer/consumer clients that just
> want
> > > to find out the leader. Second, it can happen that both the leader and
> > > another replica are not available at the same time. There is no error
> code
> > > to indicate both. Third, even if a replica is not available, it's still
> > > useful to return its replica id since some clients (e.g. admin tool)
> may
> > > still make use of it.
> > >
> > > One way to address this issue is to always return the replica id for
> > > leader, assigned replicas, and isr regardless of whether the
> corresponding
> > > broker is live or not. Since we also return the list of live brokers,
> the
> > > client can figure out whether a leader or a replica is live or not and
> act
> > > accordingly. This way, we don't need to set the partition level error
> code
> > > when the leader or a replica is not available. This doesn't change the
> wire
> > > protocol, but does change the se

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Steven Wu
Ewen, I see your point regarding the shared buffer. yes, a bad/slow broker
could potentially consume up all buffer. On the other hand, I do like the
batching behavior of shared RecordAccumulator buffer.

On Tue, Mar 17, 2015 at 8:25 AM, Guozhang Wang  wrote:

> Ewen,
>
> 1. I think we are on the same page as per "malicious clients", that it
> should not be the target of either approach. I was just trying to separate
> the discussion from "what if user just keep retrying" and maybe I was not
> clear.
>
> 2. I was not advocating option A on the wiki, in my previous email I
> actually assume that option is already dropped and we are only considering
> option B (which is my option b) in the email) and C (option a) in my
> email), and I think with some proper wrapping of "status codes" (today we
> still call them error codes) option B in the wiki may not necessarily
> require people who implement clients to handle each status code one-by-one.
>
> Guozhang
>
> On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava  >
> wrote:
>
> > Steven - that's a reasonable concern. I think I've mentioned the same
> sort
> > of issue in the issues about the new producer's RecordAccumulator not
> > timing out sends, e.g. in
> https://issues.apache.org/jira/browse/KAFKA-1788
> > .
> > The shared buffer causes problems if one broker isn't available for
> awhile
> > since messages to that broker end up consuming the entire buffer. You can
> > end up with a similar problem here due to the effective rate limiting
> > caused by delaying responses.
> >
> > Guozhang - I think only option A from the KIP is actually an error. If we
> > want to look to HTTP for examples, there's an RFC that defines the Too
> Many
> > Requests response to handle rate limiting:
> > http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is
> an
> > error, specifically a client error since its in the 400 range.The
> > implication from the status code (429), name of the response, and the
> > example given is that is is an error and no real data is returned, which
> > would correspond to option A from the KIP. Note that the protocol
> provides
> > a mechanism for giving extra (optional) information about when you should
> > retry (via headers). I'd guess that even despite that, most systems that
> > encounter a 429 use some ad hoc backoff mechanism because they only try
> to
> > detect anything in the 400 range...
> >
> > One additional point -- I think "malicious clients" shouldn't be our
> target
> > here, they can do a lot worse than what's been addressed in this thread.
> > But I do agree that any proposal should have a clear explanation of how
> > existing clients that are ignorant of quotas would behave (which is why
> > options b and c make a lot of sense -- they rate limit without requiring
> an
> > update to normally-behaving clients).
> >
> >
> > On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu 
> wrote:
> >
> > > wait. we create one kafka producer for each cluster. each cluster can
> > have
> > > many topics. if producer buffer got filled up due to delayed response
> for
> > > one throttled topic, won't that penalize other topics unfairly? it
> seems
> > to
> > > me that broker should just return error without delay.
> > >
> > > sorry that I am chatting to myself :)
> > >
> > > On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu 
> > wrote:
> > >
> > > > I think I can answer my own question. delayed response will cause the
> > > > producer buffer to be full, which then result in either thread
> blocking
> > > or
> > > > message drop.
> > > >
> > > > On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
> > > wrote:
> > > >
> > > >> please correct me if I am missing sth here. I am not understanding
> how
> > > >> would throttle work without cooperation/back-off from producer. new
> > Java
> > > >> producer supports non-blocking API. why would delayed response be
> able
> > > to
> > > >> slow down producer? producer will continue to fire async sends.
> > > >>
> > > >> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang  >
> > > >> wrote:
> > > >>
> > > >>> I think we are really discussing two separate issues here:
> > > >>>
> > > >>> 1. Whether we should a) append-then-block-then-returnOKButThrottled
> > or
> > > b)
> > > >>> block-then-returnFailDuetoThrottled for quota actions on produce
> > > >>> requests.
> > > >>>
> > > >>> Both these approaches assume some kind of well-behaveness of the
> > > clients:
> > > >>> option a) assumes the client sets an proper timeout value while can
> > > just
> > > >>> ignore "OKButThrottled" response, while option b) assumes the
> client
> > > >>> handles the "FailDuetoThrottled" appropriately. For any malicious
> > > clients
> > > >>> that, for example, just keep retrying either intentionally or not,
> > > >>> neither
> > > >>> of these approaches are actually effective.
> > > >>>
> > > >>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall
> we
> > > >>> encode
> > > >>> them as error codes or augment the prot

Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-17 Thread Joel Koshy
I may be missing some context but hopefully this will also be covered
today: I thought the earlier proposal where there was an explicit
ClusterMetadata request was clearer and explicit. During the course of
this thread I think the conclusion was that the main need was for
controller information and that can be rolled into the topic metadata
response but that seems a bit irrelevant to topic metadata. FWIW I
think the full broker-list is also irrelevant to topic metadata, but
it is already there and in use. I think there is still room for an
explicit ClusterMetadata request since there may be other
cluster-level information that we may want to add over time (and that
have nothing to do with topic metadata).

On Tue, Mar 17, 2015 at 02:45:30PM +0200, Andrii Biletskyi wrote:
> Jun,
> 
> 101. Okay, if you say that such use case is important. I also think
> using clientId for these purposes is fine - if we already have this field
> as part of all Wire protocol messages, why not use that.
> I will update KIP-4 page if nobody has other ideas (which may come up
> during the call today).
> 
> 102.1 Agree, I'll update the KIP accordingly. I think we can add new,
> fine-grained error codes if some error code received in specific case
> won't give enough context to return a descriptive error message for user.
> 
> Look forward to discussing all outstanding issues in detail today during
> the call.
> 
> Thanks,
> Andrii Biletskyi
> 
> 
> 
> On Mon, Mar 16, 2015 at 10:59 PM, Jun Rao  wrote:
> 
> > 101. There may be a use case where you only want the topics to be created
> > manually by admins. Currently, you can do that by disabling auto topic
> > creation and issue topic creation from the TopicCommand. If we disable auto
> > topic creation completely on the broker and don't have a way to distinguish
> > between topic creation requests from the regular clients and the admin, we
> > can't support manual topic creation any more. I was thinking that another
> > way of distinguishing the clients making the topic creation requests is
> > using clientId. For example, the admin tool can set it to something like
> > admin and the broker can treat that clientId specially.
> >
> > Also, there is a related discussion in KAFKA-2020. Currently, we do the
> > following in TopicMetadataResponse:
> >
> > 1. If leader is not available, we set the partition level error code to
> > LeaderNotAvailable.
> > 2. If a non-leader replica is not available, we take that replica out of
> > the assigned replica list and isr in the response. As an indication for
> > doing that, we set the partition level error code to ReplicaNotAvailable.
> >
> > This has a few problems. First, ReplicaNotAvailable probably shouldn't be
> > an error, at least for the normal producer/consumer clients that just want
> > to find out the leader. Second, it can happen that both the leader and
> > another replica are not available at the same time. There is no error code
> > to indicate both. Third, even if a replica is not available, it's still
> > useful to return its replica id since some clients (e.g. admin tool) may
> > still make use of it.
> >
> > One way to address this issue is to always return the replica id for
> > leader, assigned replicas, and isr regardless of whether the corresponding
> > broker is live or not. Since we also return the list of live brokers, the
> > client can figure out whether a leader or a replica is live or not and act
> > accordingly. This way, we don't need to set the partition level error code
> > when the leader or a replica is not available. This doesn't change the wire
> > protocol, but does change the semantics. Since we are evolving the protocol
> > of TopicMetadataRequest here, we can potentially piggyback the change.
> >
> > 102.1 For those types of errors due to invalid input, shouldn't we just
> > guard it at parameter validation time and throw InvalidArgumentException
> > without even sending the request to the broker?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Mar 16, 2015 at 10:37 AM, Andrii Biletskyi <
> > andrii.bilets...@stealth.ly> wrote:
> >
> > > Jun,
> > >
> > > Answering your questions:
> > >
> > > 101. If I understand you correctly, you are saying future producer
> > versions
> > > (which
> > > will be ported to TMR_V1) won't be able to automatically create topic (if
> > > we
> > > unconditionally remove topic creation from there). But we need to this
> > > preserve logic.
> > > Ok, about your proposal: I'm not a big fan too, when it comes to
> > > differentiating
> > > clients directly in protocol schema. And also I'm not sure I understand
> > at
> > > all why
> > > auto.create.topics.enable is a server side configuration. Can we
> > deprecate
> > > this setting
> > > in future versions, add this setting to producer and based on that upon
> > > receiving
> > > UnknownTopic create topic explicitly by a separate producer call via
> > > adminClient?
> > >
> > > 102.1. Hm, yes. It's because we want to support batc

Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Guozhang Wang
Ewen,

1. I think we are on the same page as per "malicious clients", that it
should not be the target of either approach. I was just trying to separate
the discussion from "what if user just keep retrying" and maybe I was not
clear.

2. I was not advocating option A on the wiki, in my previous email I
actually assume that option is already dropped and we are only considering
option B (which is my option b) in the email) and C (option a) in my
email), and I think with some proper wrapping of "status codes" (today we
still call them error codes) option B in the wiki may not necessarily
require people who implement clients to handle each status code one-by-one.

Guozhang

On Tue, Mar 17, 2015 at 12:22 AM, Ewen Cheslack-Postava 
wrote:

> Steven - that's a reasonable concern. I think I've mentioned the same sort
> of issue in the issues about the new producer's RecordAccumulator not
> timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788
> .
> The shared buffer causes problems if one broker isn't available for awhile
> since messages to that broker end up consuming the entire buffer. You can
> end up with a similar problem here due to the effective rate limiting
> caused by delaying responses.
>
> Guozhang - I think only option A from the KIP is actually an error. If we
> want to look to HTTP for examples, there's an RFC that defines the Too Many
> Requests response to handle rate limiting:
> http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an
> error, specifically a client error since its in the 400 range.The
> implication from the status code (429), name of the response, and the
> example given is that is is an error and no real data is returned, which
> would correspond to option A from the KIP. Note that the protocol provides
> a mechanism for giving extra (optional) information about when you should
> retry (via headers). I'd guess that even despite that, most systems that
> encounter a 429 use some ad hoc backoff mechanism because they only try to
> detect anything in the 400 range...
>
> One additional point -- I think "malicious clients" shouldn't be our target
> here, they can do a lot worse than what's been addressed in this thread.
> But I do agree that any proposal should have a clear explanation of how
> existing clients that are ignorant of quotas would behave (which is why
> options b and c make a lot of sense -- they rate limit without requiring an
> update to normally-behaving clients).
>
>
> On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu  wrote:
>
> > wait. we create one kafka producer for each cluster. each cluster can
> have
> > many topics. if producer buffer got filled up due to delayed response for
> > one throttled topic, won't that penalize other topics unfairly? it seems
> to
> > me that broker should just return error without delay.
> >
> > sorry that I am chatting to myself :)
> >
> > On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu 
> wrote:
> >
> > > I think I can answer my own question. delayed response will cause the
> > > producer buffer to be full, which then result in either thread blocking
> > or
> > > message drop.
> > >
> > > On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
> > wrote:
> > >
> > >> please correct me if I am missing sth here. I am not understanding how
> > >> would throttle work without cooperation/back-off from producer. new
> Java
> > >> producer supports non-blocking API. why would delayed response be able
> > to
> > >> slow down producer? producer will continue to fire async sends.
> > >>
> > >> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
> > >> wrote:
> > >>
> > >>> I think we are really discussing two separate issues here:
> > >>>
> > >>> 1. Whether we should a) append-then-block-then-returnOKButThrottled
> or
> > b)
> > >>> block-then-returnFailDuetoThrottled for quota actions on produce
> > >>> requests.
> > >>>
> > >>> Both these approaches assume some kind of well-behaveness of the
> > clients:
> > >>> option a) assumes the client sets an proper timeout value while can
> > just
> > >>> ignore "OKButThrottled" response, while option b) assumes the client
> > >>> handles the "FailDuetoThrottled" appropriately. For any malicious
> > clients
> > >>> that, for example, just keep retrying either intentionally or not,
> > >>> neither
> > >>> of these approaches are actually effective.
> > >>>
> > >>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we
> > >>> encode
> > >>> them as error codes or augment the protocol to use a separate field
> > >>> indicating "status codes".
> > >>>
> > >>> Today we have already incorporated some status code as error codes in
> > the
> > >>> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of
> > this
> > >>> is of course using a single field for response status like the HTTP
> > >>> status
> > >>> codes, while the cons is that it requires clients to handle the error
> > >>> codes
> > >>> carefully.
> > >>>
> > >>> I think maybe we can actually extend 

Re: [VOTE] KIP-7 Security - IP Filtering

2015-03-17 Thread Jeff Holoman
bump

On Tue, Mar 3, 2015 at 8:12 PM, Jeff Holoman  wrote:

> Guozhang,
>
> The way the patch is implemented, the check is done in the acceptor thread
> accept() method of the Socket Server, just before connectionQuotas.
>
> Thanks
>
> Jeff
>
> On Tue, Mar 3, 2015 at 7:59 PM, Guozhang Wang  wrote:
>
>> Jeff,
>>
>> I am wondering if the IP filtering rule can be enforced at the socket
>> server level instead of the Kafka API level?
>>
>> Guozhang
>>
>> On Tue, Mar 3, 2015 at 2:24 PM, Jiangjie Qin 
>> wrote:
>>
>> > +1 (non-binding)
>> >
>> > On 3/3/15, 1:17 PM, "Gwen Shapira"  wrote:
>> >
>> > >+1 (non-binding)
>> > >
>> > >On Tue, Mar 3, 2015 at 12:44 PM, Jeff Holoman 
>> > >wrote:
>> > >> Details in the wiki.
>> > >>
>> > >>
>> > >>
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-7+-+Security+-+IP+F
>> > >>iltering
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> Jeff Holoman
>> > >> Systems Engineer
>> >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> Jeff Holoman
> Systems Engineer
>
>
>
>


-- 
Jeff Holoman
Systems Engineer


Re: [DISCUSS] KIP-4 - Command line and centralized administrative operations

2015-03-17 Thread Andrii Biletskyi
Jun,

101. Okay, if you say that such use case is important. I also think
using clientId for these purposes is fine - if we already have this field
as part of all Wire protocol messages, why not use that.
I will update KIP-4 page if nobody has other ideas (which may come up
during the call today).

102.1 Agree, I'll update the KIP accordingly. I think we can add new,
fine-grained error codes if some error code received in specific case
won't give enough context to return a descriptive error message for user.

Look forward to discussing all outstanding issues in detail today during
the call.

Thanks,
Andrii Biletskyi



On Mon, Mar 16, 2015 at 10:59 PM, Jun Rao  wrote:

> 101. There may be a use case where you only want the topics to be created
> manually by admins. Currently, you can do that by disabling auto topic
> creation and issue topic creation from the TopicCommand. If we disable auto
> topic creation completely on the broker and don't have a way to distinguish
> between topic creation requests from the regular clients and the admin, we
> can't support manual topic creation any more. I was thinking that another
> way of distinguishing the clients making the topic creation requests is
> using clientId. For example, the admin tool can set it to something like
> admin and the broker can treat that clientId specially.
>
> Also, there is a related discussion in KAFKA-2020. Currently, we do the
> following in TopicMetadataResponse:
>
> 1. If leader is not available, we set the partition level error code to
> LeaderNotAvailable.
> 2. If a non-leader replica is not available, we take that replica out of
> the assigned replica list and isr in the response. As an indication for
> doing that, we set the partition level error code to ReplicaNotAvailable.
>
> This has a few problems. First, ReplicaNotAvailable probably shouldn't be
> an error, at least for the normal producer/consumer clients that just want
> to find out the leader. Second, it can happen that both the leader and
> another replica are not available at the same time. There is no error code
> to indicate both. Third, even if a replica is not available, it's still
> useful to return its replica id since some clients (e.g. admin tool) may
> still make use of it.
>
> One way to address this issue is to always return the replica id for
> leader, assigned replicas, and isr regardless of whether the corresponding
> broker is live or not. Since we also return the list of live brokers, the
> client can figure out whether a leader or a replica is live or not and act
> accordingly. This way, we don't need to set the partition level error code
> when the leader or a replica is not available. This doesn't change the wire
> protocol, but does change the semantics. Since we are evolving the protocol
> of TopicMetadataRequest here, we can potentially piggyback the change.
>
> 102.1 For those types of errors due to invalid input, shouldn't we just
> guard it at parameter validation time and throw InvalidArgumentException
> without even sending the request to the broker?
>
> Thanks,
>
> Jun
>
>
> On Mon, Mar 16, 2015 at 10:37 AM, Andrii Biletskyi <
> andrii.bilets...@stealth.ly> wrote:
>
> > Jun,
> >
> > Answering your questions:
> >
> > 101. If I understand you correctly, you are saying future producer
> versions
> > (which
> > will be ported to TMR_V1) won't be able to automatically create topic (if
> > we
> > unconditionally remove topic creation from there). But we need to this
> > preserve logic.
> > Ok, about your proposal: I'm not a big fan too, when it comes to
> > differentiating
> > clients directly in protocol schema. And also I'm not sure I understand
> at
> > all why
> > auto.create.topics.enable is a server side configuration. Can we
> deprecate
> > this setting
> > in future versions, add this setting to producer and based on that upon
> > receiving
> > UnknownTopic create topic explicitly by a separate producer call via
> > adminClient?
> >
> > 102.1. Hm, yes. It's because we want to support batching and at the same
> > time we
> > want to give descriptive error messages for clients. Since AdminClient
> > holds the context
> > to construct such messages (e.g. AdminClient layer can know that
> > InvalidArgumentsCode
> > means two cases: either invalid number - e.g. -1; or replication-factor
> was
> > provided while
> > partitions argument wasn't) - I wrapped responses in Exceptions. But I'm
> > open to any
> > other ideas, this was just initial version.
> > 102.2. Yes, I agree. I'll change that to probably some other dto.
> >
> > Thanks,
> > Andrii Biletskyi
> >
> > On Fri, Mar 13, 2015 at 7:16 PM, Jun Rao  wrote:
> >
> > > Andrii,
> > >
> > > 101. That's what I was thinking too, but it may not be that simple. In
> > > TopicMetadataRequest_V1,
> > > we can let it not trigger auto topic creation. Then, in the producer
> > side,
> > > if it gets an UnknownTopicException, it can explicitly issue a
> > > createTopicRequest for auto topic creation. On the

[jira] [Commented] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions

2015-03-17 Thread Pradeep G (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14365011#comment-14365011
 ] 

Pradeep G commented on KAFKA-2025:
--

Workaround available at --> 
http://grokbase.com/t/kafka/users/138x3vbfcq/is-it-possible-to-commit-offsets-on-a-per-stream-basis

(changed priority from Blocker to Critical).

-Pradeep

> In multi-consumer setup - explicit commit, commits on all partitions
> 
>
> Key: KAFKA-2025
> URL: https://issues.apache.org/jira/browse/KAFKA-2025
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.0
> Environment: 1. Tested in Windows
> 2. Not tested on Linux
>Reporter: Pradeep G
>Assignee: Neha Narkhede
>Priority: Critical
>
> In a setup where there are two consumers C1 & C2 belonging to consumer group 
> CG, two partitions P1 & P2; with auto-commit disabled.
> An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
> commit called by C1 commits all messages being processed by other consumers 
> too here C2. 
> Ideally C1 should be able to commit only those messages it has consumed and 
> not what is being processed by C2.  The effect of this behavior is that; 
> suppose C2 crashes while processing message M after C1 commits, is that 
> message M being processed by C2 is not available on recovery and is lost 
> forever; and in kafka M is marked as consumed.
> I read that this would be addressed in the rewrite - 
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> Any thoughts on which release this would be addressed ?.  A quick response 
> would be greatly appreciated.
> Thanks,
> Pradeep



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions

2015-03-17 Thread Pradeep G (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pradeep G updated KAFKA-2025:
-
Priority: Critical  (was: Blocker)

> In multi-consumer setup - explicit commit, commits on all partitions
> 
>
> Key: KAFKA-2025
> URL: https://issues.apache.org/jira/browse/KAFKA-2025
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.0
> Environment: 1. Tested in Windows
> 2. Not tested on Linux
>Reporter: Pradeep G
>Assignee: Neha Narkhede
>Priority: Critical
>
> In a setup where there are two consumers C1 & C2 belonging to consumer group 
> CG, two partitions P1 & P2; with auto-commit disabled.
> An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
> commit called by C1 commits all messages being processed by other consumers 
> too here C2. 
> Ideally C1 should be able to commit only those messages it has consumed and 
> not what is being processed by C2.  The effect of this behavior is that; 
> suppose C2 crashes while processing message M after C1 commits, is that 
> message M being processed by C2 is not available on recovery and is lost 
> forever; and in kafka M is marked as consumed.
> I read that this would be addressed in the rewrite - 
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> Any thoughts on which release this would be addressed ?.  A quick response 
> would be greatly appreciated.
> Thanks,
> Pradeep



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions

2015-03-17 Thread Pradeep G (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pradeep G updated KAFKA-2025:
-
Description: 
In a setup where there are two consumers C1 & C2 belonging to consumer group 
CG, two partitions P1 & P2; with auto-commit disabled.

An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
commit called by C1 commits all messages being processed by other consumers too 
here C2. 

Ideally C1 should be able to commit only those messages it has consumed and not 
what is being processed by C2.  The effect of this behavior is that; suppose C2 
crashes while processing message M after C1 commits, is that message M being 
processed by C2 is not available on recovery and is lost forever; and in kafka 
M is marked as consumed.

I read that this would be addressed in the rewrite - 
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI

Any thoughts on which release this would be addressed ?.  A quick response 
would be greatly appreciated.

Thanks,
Pradeep


  was:
In a setup where there are two consumers C1 & C2 belonging to consumer group 
CG, two partitions P1 & P2; with auto-commit disabled.

An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
commit called by C1 commits all messages being processed by other consumers too 
here C2. 

Ideally C1 should be able to commit only those messages it has consumed and not 
what is being processed by C2.  The effect of this behavior is that; suppose C2 
crashes while processing message M after C1 commits, is that message M being 
processed by C2 is not available on recovery and is lost forever; and in kafka 
M is marked as consumed.

I read that this would be addressed in the rewrite - 
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI

Any thoughts on which release this would be addressed ?.  A quick response 
would be greatly appreciated.

Thanks,
Pradeep

This 


> In multi-consumer setup - explicit commit, commits on all partitions
> 
>
> Key: KAFKA-2025
> URL: https://issues.apache.org/jira/browse/KAFKA-2025
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.8.2.0
> Environment: 1. Tested in Windows
> 2. Not tested on Linux
>Reporter: Pradeep G
>Assignee: Neha Narkhede
>Priority: Blocker
>
> In a setup where there are two consumers C1 & C2 belonging to consumer group 
> CG, two partitions P1 & P2; with auto-commit disabled.
> An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
> commit called by C1 commits all messages being processed by other consumers 
> too here C2. 
> Ideally C1 should be able to commit only those messages it has consumed and 
> not what is being processed by C2.  The effect of this behavior is that; 
> suppose C2 crashes while processing message M after C1 commits, is that 
> message M being processed by C2 is not available on recovery and is lost 
> forever; and in kafka M is marked as consumed.
> I read that this would be addressed in the rewrite - 
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> Any thoughts on which release this would be addressed ?.  A quick response 
> would be greatly appreciated.
> Thanks,
> Pradeep



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2025) In multi-consumer setup - explicit commit, commits on all partitions

2015-03-17 Thread Pradeep G (JIRA)
Pradeep G created KAFKA-2025:


 Summary: In multi-consumer setup - explicit commit, commits on all 
partitions
 Key: KAFKA-2025
 URL: https://issues.apache.org/jira/browse/KAFKA-2025
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 0.8.2.0
 Environment: 1. Tested in Windows
2. Not tested on Linux

Reporter: Pradeep G
Assignee: Neha Narkhede
Priority: Blocker


In a setup where there are two consumers C1 & C2 belonging to consumer group 
CG, two partitions P1 & P2; with auto-commit disabled.

An explicit commit on ConsumerConnect commits on all the consumers i.e. a 
commit called by C1 commits all messages being processed by other consumers too 
here C2. 

Ideally C1 should be able to commit only those messages it has consumed and not 
what is being processed by C2.  The effect of this behavior is that; suppose C2 
crashes while processing message M after C1 commits, is that message M being 
processed by C2 is not available on recovery and is lost forever; and in kafka 
M is marked as consumed.

I read that this would be addressed in the rewrite - 
https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI

Any thoughts on which release this would be addressed ?.  A quick response 
would be greatly appreciated.

Thanks,
Pradeep

This 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [KIP-DISCUSSION] KIP-13 Quotas

2015-03-17 Thread Ewen Cheslack-Postava
Steven - that's a reasonable concern. I think I've mentioned the same sort
of issue in the issues about the new producer's RecordAccumulator not
timing out sends, e.g. in https://issues.apache.org/jira/browse/KAFKA-1788.
The shared buffer causes problems if one broker isn't available for awhile
since messages to that broker end up consuming the entire buffer. You can
end up with a similar problem here due to the effective rate limiting
caused by delaying responses.

Guozhang - I think only option A from the KIP is actually an error. If we
want to look to HTTP for examples, there's an RFC that defines the Too Many
Requests response to handle rate limiting:
http://tools.ietf.org/html/rfc6585#page-3 In this case, it actually is an
error, specifically a client error since its in the 400 range.The
implication from the status code (429), name of the response, and the
example given is that is is an error and no real data is returned, which
would correspond to option A from the KIP. Note that the protocol provides
a mechanism for giving extra (optional) information about when you should
retry (via headers). I'd guess that even despite that, most systems that
encounter a 429 use some ad hoc backoff mechanism because they only try to
detect anything in the 400 range...

One additional point -- I think "malicious clients" shouldn't be our target
here, they can do a lot worse than what's been addressed in this thread.
But I do agree that any proposal should have a clear explanation of how
existing clients that are ignorant of quotas would behave (which is why
options b and c make a lot of sense -- they rate limit without requiring an
update to normally-behaving clients).


On Mon, Mar 16, 2015 at 11:34 PM, Steven Wu  wrote:

> wait. we create one kafka producer for each cluster. each cluster can have
> many topics. if producer buffer got filled up due to delayed response for
> one throttled topic, won't that penalize other topics unfairly? it seems to
> me that broker should just return error without delay.
>
> sorry that I am chatting to myself :)
>
> On Mon, Mar 16, 2015 at 11:29 PM, Steven Wu  wrote:
>
> > I think I can answer my own question. delayed response will cause the
> > producer buffer to be full, which then result in either thread blocking
> or
> > message drop.
> >
> > On Mon, Mar 16, 2015 at 11:24 PM, Steven Wu 
> wrote:
> >
> >> please correct me if I am missing sth here. I am not understanding how
> >> would throttle work without cooperation/back-off from producer. new Java
> >> producer supports non-blocking API. why would delayed response be able
> to
> >> slow down producer? producer will continue to fire async sends.
> >>
> >> On Mon, Mar 16, 2015 at 10:58 PM, Guozhang Wang 
> >> wrote:
> >>
> >>> I think we are really discussing two separate issues here:
> >>>
> >>> 1. Whether we should a) append-then-block-then-returnOKButThrottled or
> b)
> >>> block-then-returnFailDuetoThrottled for quota actions on produce
> >>> requests.
> >>>
> >>> Both these approaches assume some kind of well-behaveness of the
> clients:
> >>> option a) assumes the client sets an proper timeout value while can
> just
> >>> ignore "OKButThrottled" response, while option b) assumes the client
> >>> handles the "FailDuetoThrottled" appropriately. For any malicious
> clients
> >>> that, for example, just keep retrying either intentionally or not,
> >>> neither
> >>> of these approaches are actually effective.
> >>>
> >>> 2. For "OKButThrottled" and "FailDuetoThrottled" responses, shall we
> >>> encode
> >>> them as error codes or augment the protocol to use a separate field
> >>> indicating "status codes".
> >>>
> >>> Today we have already incorporated some status code as error codes in
> the
> >>> responses, e.g. ReplicaNotAvailable in MetadataResponse, the pros of
> this
> >>> is of course using a single field for response status like the HTTP
> >>> status
> >>> codes, while the cons is that it requires clients to handle the error
> >>> codes
> >>> carefully.
> >>>
> >>> I think maybe we can actually extend the single-code approach to
> overcome
> >>> its drawbacks, that is, wrap the error codes semantics to the users so
> >>> that
> >>> users do not need to handle the codes one-by-one. More concretely,
> >>> following Jay's example the client could write sth. like this:
> >>>
> >>>
> >>> -
> >>>
> >>>   if(error.isOK())
> >>>  // status code is good or the code can be simply ignored for this
> >>> request type, process the request
> >>>   else if(error.needsRetry())
> >>>  // throttled, transient error, etc: retry
> >>>   else if(error.isFatal())
> >>>  // non-retriable errors, etc: notify / terminate / other handling
> >>>
> >>> -
> >>>
> >>> Only when the clients really want to handle, for example
> >>> FailDuetoThrottled
> >>> status code specifically, it needs to:
> >>>
> >>>   if(error.isOK())
> >>>  // status code is good or the code can be simply ignored for this
> >>>