Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-16 Thread Andrey L. Neporada
Hi!

> On 16 Aug 2016, at 20:28, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> I was thinking of just doing 2 for the new fetch request for backward
> compatibility.
> 
> It seems there are no more comments on this thread. So, we can probably
> start the voting thread once you update the wiki.

Ok, makes sense for me.
Will update wiki today.

> 
> Also, it seems that KIP-73 depends on this KIP. Do you think you will be
> actively working on the implementation of this KIP in the next couple of
> weeks so that KIP-73 can proceed?
> 

We really need this change as soon as possible, so I do plan to work actively 
on KIP-74. 

> Thanks,
> 
> Jun

Andrey.



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-16 Thread Jun Rao
Hi, Andrey,

I was thinking of just doing 2 for the new fetch request for backward
compatibility.

It seems there are no more comments on this thread. So, we can probably
start the voting thread once you update the wiki.

Also, it seems that KIP-73 depends on this KIP. Do you think you will be
actively working on the implementation of this KIP in the next couple of
weeks so that KIP-73 can proceed?

Thanks,

Jun

On Tue, Aug 16, 2016 at 9:28 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi, Jun!
>
> > On 16 Aug 2016, at 18:52, Jun Rao  wrote:
> >
> > Hi, Andrey,
> >
> > For 2, we actually can know the next message size. In LogSegment.read(),
> we
> > first use the offset index to find the file position close to the
> requested
> > offset and then scan the log forward to find the message whose offset is
> at
> > or larger than the requested offset. By the time we find such a message,
> we
> > know exactly the size of the first message that we need to return in the
> > fetch response. No additional read is needed. We can probably just
> > propagate this information back to the caller. I added a comment related
> to
> > this in https://issues.apache.org/jira/browse/KAFKA-3810.
>
> Good point!
>
> I think it makes sense to apply the strategy you propose in KAFKA-3810 not
> only for new fetch request but also for old (“unlimited”) one.
>
> So we return full message back even if first unread message exceeds
> partition limit. And we continue doing so until response limit is reached.
> What do you think?
>
>
> >
> > Thanks,
> >
> > Jun
> >
>
> Andrey.
>
>


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-16 Thread Andrey L. Neporada
Hi, Jun!

> On 16 Aug 2016, at 18:52, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> For 2, we actually can know the next message size. In LogSegment.read(), we
> first use the offset index to find the file position close to the requested
> offset and then scan the log forward to find the message whose offset is at
> or larger than the requested offset. By the time we find such a message, we
> know exactly the size of the first message that we need to return in the
> fetch response. No additional read is needed. We can probably just
> propagate this information back to the caller. I added a comment related to
> this in https://issues.apache.org/jira/browse/KAFKA-3810.

Good point!

I think it makes sense to apply the strategy you propose in KAFKA-3810 not only 
for new fetch request but also for old (“unlimited”) one.

So we return full message back even if first unread message exceeds partition 
limit. And we continue doing so until response limit is reached.
What do you think?


> 
> Thanks,
> 
> Jun
> 

Andrey.



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-16 Thread Jun Rao
Hi, Andrey,

For 2, we actually can know the next message size. In LogSegment.read(), we
first use the offset index to find the file position close to the requested
offset and then scan the log forward to find the message whose offset is at
or larger than the requested offset. By the time we find such a message, we
know exactly the size of the first message that we need to return in the
fetch response. No additional read is needed. We can probably just
propagate this information back to the caller. I added a comment related to
this in https://issues.apache.org/jira/browse/KAFKA-3810.

Thanks,

Jun

On Tue, Aug 16, 2016 at 4:58 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi Jun!
> Thanks for feedback.
>
> > On 15 Aug 2016, at 20:04, Jun Rao  wrote:
> >
> > Hi, Andrey,
> >
> > Thanks for the update to the wiki. Just a few more minor comments.
> >
> > 1. "If *response_max_bytes* parameter is zero ("no limit"), the request
> is
> > processed *exactly* as before." Instead of using 0, it seems it's more
> > natural to use Int.MAX_INT to preserve the old behaviour.
> >
> OK, done.
>
> > 2. "For the first partition, server always fetches at least
> > *message.max.bytes."
> > *To be more precise, the server only fetches more bytes than
> > *response_max_bytes
> > *(and up *message.max.bytes*t) if there is a message whose size is larger
> > than *response_max_bytes.*
> >
>
> Unfortunately, there is no easy way to obtain the size of next message in
> ReplicaManager:fetchMessages() - you will need to issue extra small read
> from log to find it.
> So, unless I am missing something important, I would like to keep the
> proposal (and algorithm) as is.
>
> > 3. "The solution is to continue fetching from first empty partition in
> > round-robin fashion or to perform random shuffle of partitions." We
> > probably want to make it clearer by saying that this is for ordering the
> > partitions in the fetch request.
> >
> > 4. Just to make it clear. Could you include the new fetch request
> protocol
> > in the wiki (e.g.
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 4+-+Command+line+and+centralized+administrative+operations#KIP-4-
> Commandlineandcentralizedadministrativeoperations-
> MetadataRequest(version1))
> > and mark the new field?
> >
>
> OK, done.
>
> BTW, what is the target version of this KIP? Currently the inter-broker
> protocol version in KIP is set to 0.11.0-IV0. Do we want to target for 0.11
> or maybe somewhat earlier?
>
>
> > Jun
> >
>
> Thanks,
> Andrey.
>
>


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-16 Thread Ismael Juma
Hi Andrey,

On Tue, Aug 16, 2016 at 12:58 PM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:
>
> BTW, what is the target version of this KIP? Currently the inter-broker
> protocol version in KIP is set to 0.11.0-IV0. Do we want to target for 0.11
> or maybe somewhat earlier?
>

I suggest 0.10.1-IV0 for now. If we decide to name the next version
0.11.0.0, we can change it then.

Ismael


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-16 Thread Andrey L. Neporada
Hi Jun!
Thanks for feedback.

> On 15 Aug 2016, at 20:04, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> Thanks for the update to the wiki. Just a few more minor comments.
> 
> 1. "If *response_max_bytes* parameter is zero ("no limit"), the request is
> processed *exactly* as before." Instead of using 0, it seems it's more
> natural to use Int.MAX_INT to preserve the old behaviour.
> 
OK, done.

> 2. "For the first partition, server always fetches at least
> *message.max.bytes."
> *To be more precise, the server only fetches more bytes than
> *response_max_bytes
> *(and up *message.max.bytes*t) if there is a message whose size is larger
> than *response_max_bytes.*
> 

Unfortunately, there is no easy way to obtain the size of next message in 
ReplicaManager:fetchMessages() - you will need to issue extra small read from 
log to find it.
So, unless I am missing something important, I would like to keep the proposal 
(and algorithm) as is.

> 3. "The solution is to continue fetching from first empty partition in
> round-robin fashion or to perform random shuffle of partitions." We
> probably want to make it clearer by saying that this is for ordering the
> partitions in the fetch request.
> 
> 4. Just to make it clear. Could you include the new fetch request protocol
> in the wiki (e.g.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-MetadataRequest(version1))
> and mark the new field?
> 

OK, done.

BTW, what is the target version of this KIP? Currently the inter-broker 
protocol version in KIP is set to 0.11.0-IV0. Do we want to target for 0.11 or 
maybe somewhat earlier?


> Jun
> 

Thanks, 
Andrey.



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-15 Thread Jun Rao
Hi, Andrey,

Thanks for the update to the wiki. Just a few more minor comments.

1. "If *response_max_bytes* parameter is zero ("no limit"), the request is
processed *exactly* as before." Instead of using 0, it seems it's more
natural to use Int.MAX_INT to preserve the old behavior.

2. "For the first partition, server always fetches at least
*message.max.bytes."
 *To be more precise, the server only fetches more bytes than
*response_max_bytes
*(and up *message.max.bytes*t) if there is a message whose size is larger
than *response_max_bytes.*

3. "The solution is to continue fetching from first empty partition in
round-robin fashion or to perform random shuffle of partitions." We
probably want to make it clearer by saying that this is for ordering the
partitions in the fetch request.

4. Just to make it clear. Could you include the new fetch request protocol
in the wiki (e.g.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-4+-+Command+line+and+centralized+administrative+operations#KIP-4-Commandlineandcentralizedadministrativeoperations-MetadataRequest(version1))
and mark the new field?

Jun

On Mon, Aug 15, 2016 at 3:12 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi all!
>
> KIP-74 is updated to sync up with mail list discussion.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+
> Add+Fetch+Response+Size+Limit+in+Bytes
>
> Your feedback is highly appreciated.
>
> Thanks,
> Andrey.
>
>
> > On 12 Aug 2016, at 20:49, Andrey L. Neporada 
> wrote:
> >
> > Hi!
> >> On 12 Aug 2016, at 20:22, Jun Rao  wrote:
> >>
> >> Hi, Andrey,
> >>
> >> Yes, I agree that it's more work for the client to do the round-robin
> logic
> >> since it has to be stateful. However, that applies to both the consumer
> >> client and the replica fetch thread. I just feel that it's weird to use
> one
> >> strategy in the replica fetch thread and another in the consumer
> client. An
> >> alternative is to just let the leader broker shuffle the partitions
> before
> >> filling up the bytes. This alleviates the need for the clients to
> maintain
> >> additional states. It's not as deterministic as doing round-robin in the
> >> client, but is probably good enough in practice.
> >>
> >
> > Oh, I got the point. You are talking about high-level consumer client
> API (i.e. Consumer.java).
> > Yes, it makes sense to use the same strategy as in ReplicaFetcher thread
> there.
> >
> >
> >> For 2), I am not sure if we want to set the default limit of fetch
> response
> >> in the consumer to be 0. Today, sending a response of more than 2GB will
> >> cause buffer overflow. How about defaulting it to 50BM? This should be
> >> enough for the common case. For the default fetch response limit in the
> >> replica fetcher, I'd recommend 10MB since there could be multiple
> replica
> >> fetcher threads to different brokers.
> >>
> >
> > Ok, I’ll put this numbers in PR - we can discuss them later.
> >
> >> Thanks,
> >>
> >> Jun
> >>
> >
> > Andrey.
> >
> >>
> >> On Fri, Aug 12, 2016 at 9:19 AM, Andrey L. Neporada <
> >> anepor...@yandex-team.ru> wrote:
> >>
> >>> Hi!
> >>>
>  On 12 Aug 2016, at 18:32, Jun Rao  wrote:
> 
>  Hi, Andrey,
> 
>  Why shouldn't the client library do reordering? It seems that if
>  ReplicaFetcher thread does round-robin, the consumer client should do
> >>> that
>  too?
> 
> >>>
> >>> IMHO the client library is not a good place to implement such logic.
> >>> For example, if we want to put round-robin logic in client lib, it will
> >>> become stageful (since it should remember first empty partition
> received
> >>> from server). It should also remember some kind of context to
> distinguish
> >>> one end-user (or partition set) from another, etc.
> >>>
> >>> Personally, I'd expect from client library to submit requests as is.
> >>> Anyway, I think it is difficult to come up with reordering logic that
> will
> >>> be good for all clients.
> >>>
> >>> So, I propose:
> >>>
> >>> 1) preserve per-partition limit in fetch request (as discussed)
> >>> 2) make default limit of fetch response to be 0 (which means no limit
> at
> >>> all). So, by default, new fetch request should behave exactly like old
> one
> >>> 3) implement round-robin logic in ReplicaFetcherThread and use non-zero
> >>> default fetch response limit there.
> >>> 4) document that all end-users who want to use FetchRequest with actual
> >>> response limit to implement some kind of round-robin to ensure
> fairness (if
> >>> they care about it)
> >>>
> >>>
>  Thanks,
> 
>  Jun
> 
> >>>
> >>> Thanks,
> >>> Andrey.
> >>>
> >>>
>  On Fri, Aug 12, 2016 at 3:56 AM, Andrey L. Neporada <
>  anepor...@yandex-team.ru> wrote:
> 
> > Hi!
> >
> >> On 12 Aug 2016, at 04:29, Jun Rao  wrote:
> >>
> >> Hi, Andrey,
> >>
> >> One potential benefit of keeping the per partition limit is for
> Kafka
> 

Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-15 Thread Andrey L. Neporada
Hi all!

KIP-74 is updated to sync up with mail list discussion.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

Your feedback is highly appreciated.

Thanks,
Andrey.


> On 12 Aug 2016, at 20:49, Andrey L. Neporada  wrote:
> 
> Hi!
>> On 12 Aug 2016, at 20:22, Jun Rao  wrote:
>> 
>> Hi, Andrey,
>> 
>> Yes, I agree that it's more work for the client to do the round-robin logic
>> since it has to be stateful. However, that applies to both the consumer
>> client and the replica fetch thread. I just feel that it's weird to use one
>> strategy in the replica fetch thread and another in the consumer client. An
>> alternative is to just let the leader broker shuffle the partitions before
>> filling up the bytes. This alleviates the need for the clients to maintain
>> additional states. It's not as deterministic as doing round-robin in the
>> client, but is probably good enough in practice.
>> 
> 
> Oh, I got the point. You are talking about high-level consumer client API 
> (i.e. Consumer.java).
> Yes, it makes sense to use the same strategy as in ReplicaFetcher thread 
> there.
> 
> 
>> For 2), I am not sure if we want to set the default limit of fetch response
>> in the consumer to be 0. Today, sending a response of more than 2GB will
>> cause buffer overflow. How about defaulting it to 50BM? This should be
>> enough for the common case. For the default fetch response limit in the
>> replica fetcher, I'd recommend 10MB since there could be multiple replica
>> fetcher threads to different brokers.
>> 
> 
> Ok, I’ll put this numbers in PR - we can discuss them later.
> 
>> Thanks,
>> 
>> Jun
>> 
> 
> Andrey.
> 
>> 
>> On Fri, Aug 12, 2016 at 9:19 AM, Andrey L. Neporada <
>> anepor...@yandex-team.ru> wrote:
>> 
>>> Hi!
>>> 
 On 12 Aug 2016, at 18:32, Jun Rao  wrote:
 
 Hi, Andrey,
 
 Why shouldn't the client library do reordering? It seems that if
 ReplicaFetcher thread does round-robin, the consumer client should do
>>> that
 too?
 
>>> 
>>> IMHO the client library is not a good place to implement such logic.
>>> For example, if we want to put round-robin logic in client lib, it will
>>> become stageful (since it should remember first empty partition received
>>> from server). It should also remember some kind of context to distinguish
>>> one end-user (or partition set) from another, etc.
>>> 
>>> Personally, I'd expect from client library to submit requests as is.
>>> Anyway, I think it is difficult to come up with reordering logic that will
>>> be good for all clients.
>>> 
>>> So, I propose:
>>> 
>>> 1) preserve per-partition limit in fetch request (as discussed)
>>> 2) make default limit of fetch response to be 0 (which means no limit at
>>> all). So, by default, new fetch request should behave exactly like old one
>>> 3) implement round-robin logic in ReplicaFetcherThread and use non-zero
>>> default fetch response limit there.
>>> 4) document that all end-users who want to use FetchRequest with actual
>>> response limit to implement some kind of round-robin to ensure fairness (if
>>> they care about it)
>>> 
>>> 
 Thanks,
 
 Jun
 
>>> 
>>> Thanks,
>>> Andrey.
>>> 
>>> 
 On Fri, Aug 12, 2016 at 3:56 AM, Andrey L. Neporada <
 anepor...@yandex-team.ru> wrote:
 
> Hi!
> 
>> On 12 Aug 2016, at 04:29, Jun Rao  wrote:
>> 
>> Hi, Andrey,
>> 
>> One potential benefit of keeping the per partition limit is for Kafka
>> stream. When reading messages from different partitions, KStream
>>> prefers
> to
>> read from partitions with smaller timestamps first and only advances
>>> the
>> KStream timestamp when it sees at least one message from every
>>> partition.
>> Being able to fill up multiple partitions in a single fetch response
>>> can
>> help KStream advance the timestamp quicker when there is backlog from
>>> the
>> input. So, it's probably better if we just add a new response limit
>>> while
>> keeping the per partition limit.
>> 
> 
> Yes, this makes sense for me.
> 
>> Also, for fairness across partitions, you mentioned "The solution is to
>> start fetching from first empty partition in round-robin fashion or to
>> perform random shuffle of partitions.".  It seems the former is more
>> deterministic. Did you use that in your implementation and should we
>> recommend that for non-java clients as well?
>> 
> 
> In my initial implementation I just did random shuffling on server side.
> Now I plan to use deterministic approach - do round-robin in
> ReplicaFetcher thread - I believe the client library itself shouldn’t do
> any form of reordering.
> 
> But we should definitely recommend some form of shuffling if client
>>> wants
> to limit response size.
> Not sure which shuffling method is 

Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Andrey L. Neporada
Hi!

> On 12 Aug 2016, at 18:32, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> Why shouldn't the client library do reordering? It seems that if
> ReplicaFetcher thread does round-robin, the consumer client should do that
> too?
> 

IMHO the client library is not a good place to implement such logic.
For example, if we want to put round-robin logic in client lib, it will become 
stageful (since it should remember first empty partition received from server). 
It should also remember some kind of context to distinguish one end-user (or 
partition set) from another, etc. 

Personally, I'd expect from client library to submit requests as is.
Anyway, I think it is difficult to come up with reordering logic that will be 
good for all clients.

So, I propose:

1) preserve per-partition limit in fetch request (as discussed)
2) make default limit of fetch response to be 0 (which means no limit at all). 
So, by default, new fetch request should behave exactly like old one
3) implement round-robin logic in ReplicaFetcherThread and use non-zero default 
fetch response limit there.
4) document that all end-users who want to use FetchRequest with actual 
response limit to implement some kind of round-robin to ensure fairness (if 
they care about it)
 

> Thanks,
> 
> Jun
> 

Thanks,
Andrey.


> On Fri, Aug 12, 2016 at 3:56 AM, Andrey L. Neporada <
> anepor...@yandex-team.ru> wrote:
> 
>> Hi!
>> 
>>> On 12 Aug 2016, at 04:29, Jun Rao  wrote:
>>> 
>>> Hi, Andrey,
>>> 
>>> One potential benefit of keeping the per partition limit is for Kafka
>>> stream. When reading messages from different partitions, KStream prefers
>> to
>>> read from partitions with smaller timestamps first and only advances the
>>> KStream timestamp when it sees at least one message from every partition.
>>> Being able to fill up multiple partitions in a single fetch response can
>>> help KStream advance the timestamp quicker when there is backlog from the
>>> input. So, it's probably better if we just add a new response limit while
>>> keeping the per partition limit.
>>> 
>> 
>> Yes, this makes sense for me.
>> 
>>> Also, for fairness across partitions, you mentioned "The solution is to
>>> start fetching from first empty partition in round-robin fashion or to
>>> perform random shuffle of partitions.".  It seems the former is more
>>> deterministic. Did you use that in your implementation and should we
>>> recommend that for non-java clients as well?
>>> 
>> 
>> In my initial implementation I just did random shuffling on server side.
>> Now I plan to use deterministic approach - do round-robin in
>> ReplicaFetcher thread - I believe the client library itself shouldn’t do
>> any form of reordering.
>> 
>> But we should definitely recommend some form of shuffling if client wants
>> to limit response size.
>> Not sure which shuffling method is better.
>> 
>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>> 
>> Thanks,
>> Andrey.
>> 
>> 



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Jun Rao
Hi, Andrey,

Why shouldn't the client library do reordering? It seems that if
ReplicaFetcher thread does round-robin, the consumer client should do that
too?

Thanks,

Jun

On Fri, Aug 12, 2016 at 3:56 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi!
>
> > On 12 Aug 2016, at 04:29, Jun Rao  wrote:
> >
> > Hi, Andrey,
> >
> > One potential benefit of keeping the per partition limit is for Kafka
> > stream. When reading messages from different partitions, KStream prefers
> to
> > read from partitions with smaller timestamps first and only advances the
> > KStream timestamp when it sees at least one message from every partition.
> > Being able to fill up multiple partitions in a single fetch response can
> > help KStream advance the timestamp quicker when there is backlog from the
> > input. So, it's probably better if we just add a new response limit while
> > keeping the per partition limit.
> >
>
> Yes, this makes sense for me.
>
> > Also, for fairness across partitions, you mentioned "The solution is to
> > start fetching from first empty partition in round-robin fashion or to
> > perform random shuffle of partitions.".  It seems the former is more
> > deterministic. Did you use that in your implementation and should we
> > recommend that for non-java clients as well?
> >
>
> In my initial implementation I just did random shuffling on server side.
> Now I plan to use deterministic approach - do round-robin in
> ReplicaFetcher thread - I believe the client library itself shouldn’t do
> any form of reordering.
>
> But we should definitely recommend some form of shuffling if client wants
> to limit response size.
> Not sure which shuffling method is better.
>
>
> > Thanks,
> >
> > Jun
> >
>
> Thanks,
> Andrey.
>
>


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Ismael Juma
That's a good point Jun and I agree that it makes sense to keep both limits
in that case.

Ismael

On Fri, Aug 12, 2016 at 2:29 AM, Jun Rao  wrote:

> Hi, Andrey,
>
> One potential benefit of keeping the per partition limit is for Kafka
> stream. When reading messages from different partitions, KStream prefers to
> read from partitions with smaller timestamps first and only advances the
> KStream timestamp when it sees at least one message from every partition.
> Being able to fill up multiple partitions in a single fetch response can
> help KStream advance the timestamp quicker when there is backlog from the
> input. So, it's probably better if we just add a new response limit while
> keeping the per partition limit.
>
> Also, for fairness across partitions, you mentioned "The solution is to
> start fetching from first empty partition in round-robin fashion or to
> perform random shuffle of partitions.".  It seems the former is more
> deterministic. Did you use that in your implementation and should we
> recommend that for non-java clients as well?
>
> Thanks,
>
> Jun
>
> On Wed, Aug 10, 2016 at 10:55 AM, Andrey L. Neporada <
> anepor...@yandex-team.ru> wrote:
>
> > Hi, Jun!
> >
> > Thanks for feedback!
> >
> > > On 10 Aug 2016, at 17:42, Jun Rao  wrote:
> > >
> > > Hi, Andrey,
> > >
> > > Thanks for the reply. A couple of more comments inline below.
> > >
> > > On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
> > > anepor...@yandex-team.ru > wrote:
> > >
> > >>
> > >> Yes, such cooperative configuration for fetch request may look a bit
> > weird.
> > >> But I don’t see other options if we want to remove partition limits
> from
> > >> fetch request.
> > >> In this case we need some server-side configuration for partition
> > limits.
> > >>
> > >>
> > > What if we keep the current partition level limit in the fetch request
> > and
> > > just add an additional response level limit? The default partition
> limit
> > > can be much smaller than the max message size and will only be used for
> > > fairness across partitions.
> > >
> >
> > Yes, we can just add global response limit and leave partition limits as
> > is.
> > In fact, my initial implementation (https://github.com/apache/kaf
> > ka/pull/1683) of this KIP preserves per-partition limits.
> > However, as it seems from KAFKA-2063 discussion, some people prefer to
> > deprecate partition level limit.
> > I have no real opinion on this topic - hope we can choose best option
> here.
> >
> > ...
> > >>
> > >> No, I mean that actual response side can be bigger than limit_bytes,
> but
> > >> less than limit_bytes + message.max.bytes.
> > >> This behaviour is a result of algorithm proposed in KIP (and in PR).
> > >>
> > >>
> > > Got it. An alternative is to only add a partition's data to the
> response
> > up
> > > to the remaining response limit. The only exception is that this is the
> > > first partition and the first message in that partition is larger than
> > the
> > > response limit. Then the bound will be max(limit_bytes,
> > message.max.bytes),
> > > which is tighter.
> > >
> >
> > Yes, this one looks better.
> >
> >
> > Thanks,
> > Andrey.
>


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Andrey L. Neporada
Thanks!
Will do.

> On 12 Aug 2016, at 08:29, Ben Stopford  wrote:
> 
> Andrey 
> 
> To make progress, I suggest you keep the partition-level limit in, at least 
> for now, and keep it on the FetchRequest too. 
> 
> B
> 



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Andrey L. Neporada
Hi!

> On 12 Aug 2016, at 04:29, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> One potential benefit of keeping the per partition limit is for Kafka
> stream. When reading messages from different partitions, KStream prefers to
> read from partitions with smaller timestamps first and only advances the
> KStream timestamp when it sees at least one message from every partition.
> Being able to fill up multiple partitions in a single fetch response can
> help KStream advance the timestamp quicker when there is backlog from the
> input. So, it's probably better if we just add a new response limit while
> keeping the per partition limit.
> 

Yes, this makes sense for me.

> Also, for fairness across partitions, you mentioned "The solution is to
> start fetching from first empty partition in round-robin fashion or to
> perform random shuffle of partitions.".  It seems the former is more
> deterministic. Did you use that in your implementation and should we
> recommend that for non-java clients as well?
> 

In my initial implementation I just did random shuffling on server side.
Now I plan to use deterministic approach - do round-robin in ReplicaFetcher 
thread - I believe the client library itself shouldn’t do any form of 
reordering.

But we should definitely recommend some form of shuffling if client wants to 
limit response size.
Not sure which shuffling method is better.

 
> Thanks,
> 
> Jun
> 

Thanks,
Andrey.



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Jun Rao
Hi, Andrey,

One potential benefit of keeping the per partition limit is for Kafka
stream. When reading messages from different partitions, KStream prefers to
read from partitions with smaller timestamps first and only advances the
KStream timestamp when it sees at least one message from every partition.
Being able to fill up multiple partitions in a single fetch response can
help KStream advance the timestamp quicker when there is backlog from the
input. So, it's probably better if we just add a new response limit while
keeping the per partition limit.

Also, for fairness across partitions, you mentioned "The solution is to
start fetching from first empty partition in round-robin fashion or to
perform random shuffle of partitions.".  It seems the former is more
deterministic. Did you use that in your implementation and should we
recommend that for non-java clients as well?

Thanks,

Jun

On Wed, Aug 10, 2016 at 10:55 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi, Jun!
>
> Thanks for feedback!
>
> > On 10 Aug 2016, at 17:42, Jun Rao  wrote:
> >
> > Hi, Andrey,
> >
> > Thanks for the reply. A couple of more comments inline below.
> >
> > On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
> > anepor...@yandex-team.ru > wrote:
> >
> >>
> >> Yes, such cooperative configuration for fetch request may look a bit
> weird.
> >> But I don’t see other options if we want to remove partition limits from
> >> fetch request.
> >> In this case we need some server-side configuration for partition
> limits.
> >>
> >>
> > What if we keep the current partition level limit in the fetch request
> and
> > just add an additional response level limit? The default partition limit
> > can be much smaller than the max message size and will only be used for
> > fairness across partitions.
> >
>
> Yes, we can just add global response limit and leave partition limits as
> is.
> In fact, my initial implementation (https://github.com/apache/kaf
> ka/pull/1683) of this KIP preserves per-partition limits.
> However, as it seems from KAFKA-2063 discussion, some people prefer to
> deprecate partition level limit.
> I have no real opinion on this topic - hope we can choose best option here.
>
> ...
> >>
> >> No, I mean that actual response side can be bigger than limit_bytes, but
> >> less than limit_bytes + message.max.bytes.
> >> This behaviour is a result of algorithm proposed in KIP (and in PR).
> >>
> >>
> > Got it. An alternative is to only add a partition's data to the response
> up
> > to the remaining response limit. The only exception is that this is the
> > first partition and the first message in that partition is larger than
> the
> > response limit. Then the bound will be max(limit_bytes,
> message.max.bytes),
> > which is tighter.
> >
>
> Yes, this one looks better.
>
>
> Thanks,
> Andrey.


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-12 Thread Ben Stopford
Andrey 

To make progress, I suggest you keep the partition-level limit in, at least for 
now, and keep it on the FetchRequest too. 

B


> On 10 Aug 2016, at 18:55, Andrey L. Neporada  wrote:
> 
> Hi, Jun!
> 
> Thanks for feedback!
> 
>> On 10 Aug 2016, at 17:42, Jun Rao  wrote:
>> 
>> Hi, Andrey,
>> 
>> Thanks for the reply. A couple of more comments inline below.
>> 
>> On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
>> anepor...@yandex-team.ru > wrote:
>> 
>>> 
>>> Yes, such cooperative configuration for fetch request may look a bit weird.
>>> But I don’t see other options if we want to remove partition limits from
>>> fetch request.
>>> In this case we need some server-side configuration for partition limits.
>>> 
>>> 
>> What if we keep the current partition level limit in the fetch request and
>> just add an additional response level limit? The default partition limit
>> can be much smaller than the max message size and will only be used for
>> fairness across partitions.
>> 
> 
> Yes, we can just add global response limit and leave partition limits as is.
> In fact, my initial implementation 
> (https://github.com/apache/kafka/pull/1683) of this KIP preserves 
> per-partition limits.
> However, as it seems from KAFKA-2063 discussion, some people prefer to 
> deprecate partition level limit.
> I have no real opinion on this topic - hope we can choose best option here.
> 
> ...
>>> 
>>> No, I mean that actual response side can be bigger than limit_bytes, but
>>> less than limit_bytes + message.max.bytes.
>>> This behaviour is a result of algorithm proposed in KIP (and in PR).
>>> 
>>> 
>> Got it. An alternative is to only add a partition's data to the response up
>> to the remaining response limit. The only exception is that this is the
>> first partition and the first message in that partition is larger than the
>> response limit. Then the bound will be max(limit_bytes, message.max.bytes),
>> which is tighter.
>> 
> 
> Yes, this one looks better.
> 
> 
> Thanks,
> Andrey.



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Andrey L. Neporada
Hi, Jun!

Thanks for feedback!

> On 10 Aug 2016, at 17:42, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> Thanks for the reply. A couple of more comments inline below.
> 
> On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
> anepor...@yandex-team.ru > wrote:
> 
>> 
>> Yes, such cooperative configuration for fetch request may look a bit weird.
>> But I don’t see other options if we want to remove partition limits from
>> fetch request.
>> In this case we need some server-side configuration for partition limits.
>> 
>> 
> What if we keep the current partition level limit in the fetch request and
> just add an additional response level limit? The default partition limit
> can be much smaller than the max message size and will only be used for
> fairness across partitions.
> 

Yes, we can just add global response limit and leave partition limits as is.
In fact, my initial implementation (https://github.com/apache/kafka/pull/1683) 
of this KIP preserves per-partition limits.
However, as it seems from KAFKA-2063 discussion, some people prefer to 
deprecate partition level limit.
I have no real opinion on this topic - hope we can choose best option here.

...
>> 
>> No, I mean that actual response side can be bigger than limit_bytes, but
>> less than limit_bytes + message.max.bytes.
>> This behaviour is a result of algorithm proposed in KIP (and in PR).
>> 
>> 
> Got it. An alternative is to only add a partition's data to the response up
> to the remaining response limit. The only exception is that this is the
> first partition and the first message in that partition is larger than the
> response limit. Then the bound will be max(limit_bytes, message.max.bytes),
> which is tighter.
> 

Yes, this one looks better.


Thanks,
Andrey.

Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Jun Rao
Hi, Andrey,

Thanks for the reply. A couple of more comments inline below.

On Wed, Aug 10, 2016 at 3:56 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi!
>
> > On 09 Aug 2016, at 20:46, Jun Rao  wrote:
> >
> > Hi, Andrey,
> >
> > Thanks for the proposal. It looks good overall. Some minor comments.
> >
> > 1. It seems that it's bit weird that fetch.partition.max.bytes is a
> broker
> > level configuration while fetch.limit.bytes is a client side
> configuration.
> > Intuitively, it seems both should be set by the client? If we do that,
> one
> > benefit is that we can validate that fetch.limit.bytes >=
> > fetch.partition.max.bytes on the client side.
> >
>
> Yes, such cooperative configuration for fetch request may look a bit weird.
> But I don’t see other options if we want to remove partition limits from
> fetch request.
> In this case we need some server-side configuration for partition limits.
>
>
What if we keep the current partition level limit in the fetch request and
just add an additional response level limit? The default partition limit
can be much smaller than the max message size and will only be used for
fairness across partitions.


> > 2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.
> bytes
> > seem to be more consistent with our current convention than
> > fetch.limit.bytes and replica.fetch.limit.bytes.
>
> Agree, will rename.
>
> >
> > 3. When you say "This way we can ensure that response size is less than (
> > *limit_bytes* + *message.max.bytes*).", it should be "less than
> > max(limit_bytes, message.max.bytes)", right?
> >
>
> No, I mean that actual response side can be bigger than limit_bytes, but
> less than limit_bytes + message.max.bytes.
> This behaviour is a result of algorithm proposed in KIP (and in PR).
>
>
Got it. An alternative is to only add a partition's data to the response up
to the remaining response limit. The only exception is that this is the
first partition and the first message in that partition is larger than the
response limit. Then the bound will be max(limit_bytes, message.max.bytes),
which is tighter.


> > Finally, KIP-73 (replication quota) is proposing a similar change to
> fetch
> > request protocol. We can probably just combine the two changes into one,
> > instead of bumping the fetch request version twice.
>
> Fine with that.
>
> >
> > Thanks,
> >
> > Jun
> >
>
> Thanks,
> Andrey.
>
>


Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Ben Stopford
Regarding the fetch.partition.max.bytes, there was some discussion on the Jira 
around removing this setting completely. It’s probably not the easiest thing 
for user’s to set, so there is certainly an argument for removing it. This 
would have the side effect that a catching up broker would fill responses from 
a single partition at a time, but as we’re ensuring fairness across requests 
I’m still left wondering if we need a partition level limit at all?

One middle ground would keep it configurable but with a default of Int.MaxValue.

B 

> On 10 Aug 2016, at 11:56, Andrey L. Neporada  wrote:
> 
> Hi!
> 
>> On 09 Aug 2016, at 20:46, Jun Rao  wrote:
>> 
>> Hi, Andrey,
>> 
>> Thanks for the proposal. It looks good overall. Some minor comments.
>> 
>> 1. It seems that it's bit weird that fetch.partition.max.bytes is a broker
>> level configuration while fetch.limit.bytes is a client side configuration.
>> Intuitively, it seems both should be set by the client? If we do that, one
>> benefit is that we can validate that fetch.limit.bytes >=
>> fetch.partition.max.bytes on the client side.
>> 
> 
> Yes, such cooperative configuration for fetch request may look a bit weird.
> But I don’t see other options if we want to remove partition limits from 
> fetch request.
> In this case we need some server-side configuration for partition limits.
> 
> 
>> 2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.bytes
>> seem to be more consistent with our current convention than
>> fetch.limit.bytes and replica.fetch.limit.bytes.
> 
> Agree, will rename.
> 
>> 
>> 3. When you say "This way we can ensure that response size is less than (
>> *limit_bytes* + *message.max.bytes*).", it should be "less than
>> max(limit_bytes, message.max.bytes)", right?
>> 
> 
> No, I mean that actual response side can be bigger than limit_bytes, but less 
> than limit_bytes + message.max.bytes.
> This behaviour is a result of algorithm proposed in KIP (and in PR).
> 
> 
>> Finally, KIP-73 (replication quota) is proposing a similar change to fetch
>> request protocol. We can probably just combine the two changes into one,
>> instead of bumping the fetch request version twice.
> 
> Fine with that.
> 
>> 
>> Thanks,
>> 
>> Jun
>> 
> 
> Thanks,
> Andrey.
> 



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-10 Thread Andrey L. Neporada
Hi!

> On 09 Aug 2016, at 20:46, Jun Rao  wrote:
> 
> Hi, Andrey,
> 
> Thanks for the proposal. It looks good overall. Some minor comments.
> 
> 1. It seems that it's bit weird that fetch.partition.max.bytes is a broker
> level configuration while fetch.limit.bytes is a client side configuration.
> Intuitively, it seems both should be set by the client? If we do that, one
> benefit is that we can validate that fetch.limit.bytes >=
> fetch.partition.max.bytes on the client side.
> 

Yes, such cooperative configuration for fetch request may look a bit weird.
But I don’t see other options if we want to remove partition limits from fetch 
request.
In this case we need some server-side configuration for partition limits.


> 2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.bytes
> seem to be more consistent with our current convention than
> fetch.limit.bytes and replica.fetch.limit.bytes.

Agree, will rename.

> 
> 3. When you say "This way we can ensure that response size is less than (
> *limit_bytes* + *message.max.bytes*).", it should be "less than
> max(limit_bytes, message.max.bytes)", right?
> 

No, I mean that actual response side can be bigger than limit_bytes, but less 
than limit_bytes + message.max.bytes.
This behaviour is a result of algorithm proposed in KIP (and in PR).


> Finally, KIP-73 (replication quota) is proposing a similar change to fetch
> request protocol. We can probably just combine the two changes into one,
> instead of bumping the fetch request version twice.

Fine with that.

> 
> Thanks,
> 
> Jun
> 

Thanks,
Andrey.



Re: [DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-09 Thread Jun Rao
Hi, Andrey,

Thanks for the proposal. It looks good overall. Some minor comments.

1. It seems that it's bit weird that fetch.partition.max.bytes is a broker
level configuration while fetch.limit.bytes is a client side configuration.
Intuitively, it seems both should be set by the client? If we do that, one
benefit is that we can validate that fetch.limit.bytes >=
fetch.partition.max.bytes on the client side.

2. Naming wise. fetch.response.max.bytes and replica.fetch.response.max.bytes
seem to be more consistent with our current convention than
fetch.limit.bytes and replica.fetch.limit.bytes.

3. When you say "This way we can ensure that response size is less than (
*limit_bytes* + *message.max.bytes*).", it should be "less than
max(limit_bytes, message.max.bytes)", right?

Finally, KIP-73 (replication quota) is proposing a similar change to fetch
request protocol. We can probably just combine the two changes into one,
instead of bumping the fetch request version twice.

Thanks,

Jun


On Mon, Aug 8, 2016 at 10:11 AM, Andrey L. Neporada <
anepor...@yandex-team.ru> wrote:

> Hi all!
>
> I’ve just created KIP-74: Add Fetch Response Size Limit in Bytes.
>
> The idea is to limit client memory consumption when fetching many
> partitions (especially useful for replication).
>
> Full details are here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+
> Add+Fetch+Response+Size+Limit+in+Bytes
>
> Thanks
> Andrey.
>
>


[DISCUSS] KIP-74: Add Fetch Response Size Limit in Bytes

2016-08-08 Thread Andrey L. Neporada
Hi all!

I’ve just created KIP-74: Add Fetch Response Size Limit in Bytes.

The idea is to limit client memory consumption when fetching many partitions 
(especially useful for replication).

Full details are here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

Thanks
Andrey.