Re: [VOTE] KIP-764 Configurable backlog size for creating Acceptor

2021-08-08 Thread Haruki Okada
Thanks for your comment LI-san.

Could anyone else review and vote for the KIP?

I think the situation described in the KIP's motivation can happen in any
large-scale Kafka deployment, so may be helpful for many users while the
proposed changes are small enough.


Thanks,

2021年8月3日(火) 15:49 Xiangyuan LI :

> Hi Haruki Okada:
>   i read your comment, thx for your detail explain!
>   add backlog parameter is a useful suggestion, hope it could added to
> kafka.
>
> Haruki Okada  于2021年8月2日周一 上午7:43写道:
>
> > Hi, Kafka.
> >
> > I would like to start a vote on KIP that makes SocketServer acceptor's
> > backlog size configurable.
> >
> > KIP:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-764%3A+Configurable+backlog+size+for+creating+Acceptor
> >
> > Discussion thread:
> >
> >
> https://lists.apache.org/thread.html/rd77469b7de0190d601dd37bd6894e1352a674d08038bcfe7ff68a1e0%40%3Cdev.kafka.apache.org%3E
> >
> > Thanks,
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-08 Thread Mohan Parthasarathy
On Sun, Aug 8, 2021 at 3:56 PM Matthias J. Sax  wrote:

> >>> Not sure. Also, do you have an opinion on Long vs Double ?
>
> Not sure what you mean by `Long vs Double` ? Can you elaborate?
>
> We were discussing the return value for "sum". Alex suggested using
"Double" and you had "Long" in one of your responses I think. Do you have a
preference or reason one way or other ?

-mohan


> -Matthias
>
> On 8/8/21 7:41 AM, Mohan Parthasarathy wrote:
> > On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax  wrote:
> >
> >> I was playing with the code a little bit, but it seems not to be easy to
> >> use generics to enforce that V is `Comparable`...
> >>
> >> We would need to introduce a new interface
> >>
> >>  interface ComparableStream>
> >> extends KStream
> >>  {
> >> KTable min();
> >>  }
> >>
> >> But it also requires a nasty cast to actually use it:
> >>
> >>   KStream stream =
> >> new StreamsBuilder().stream("");
> >>   KTable table =
> >> ((ComparableStream) stream).min();
> >>
> >> If the value-type does not implement `Comparable` the cast would not
> >> compile... Or would there be a simpler way to ensure that min() can only
> >> be called _if_ V is `Comparable`?
> >>
> >>
> >> So maybe passing in a `Comparator` might be the right way to go;
> >> might also be more flexible anyway. -- My original idea was just to
> >> maybe avoid the `Comparator` argument, as it would make the API nicer
> >> IMHO; fewer parameters is usually better...
> >>
> >> Yeah, I tried both and using Comparator seems more natural to me in this
> > case. I will update the document with the discussion here.
> >
> >
> >
> >>
> >> I am not sure why we would want to pass `Function>
> >> func` into `min()`?
> >>
> >> Not sure. Also, do you have an opinion on Long vs Double ?
> >
> > -thanks
> > Mohan
> >
> >
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> >>> Alex,
> >>>
> >>>
> >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> >> alexandre.bra...@gmail.com>
> >>> wrote:
> >>>
>  Mohan / Mathias,
> 
> >> I think extending min/max to non-numeric types makes sense.
> Wondering
> >> why we should require a `Comparator` or if we should require that
> the
> >> types implement `Comparable` instead?
> >>
> > Good question. This is what it would look like:
> >
> > KTable min_comparable()
> > KTable min_comparator(Comparator comp)
> 
>  Not sure if I understood Mathias' proposal correctly, but I think that
>  instead of going with
>  your original proposal ( KTable
> >> min(Function  VR> func...)
>  or mine (KTable min(Comparator comparator...), we could
> >> simplify
>  it a
>  bit by using a function to extract a Comparable property from the
> >> original
>  value:
> 
>  KTable min(Function> func...)
> 
>  I will let Matthias clarify. I am not sure why it is simpler than the
> >>> comparator one. Comparable is implemented by the type and not sure
> >> exposing
> >>> it this way makes it any better.
> >>>
>  I also think, that min/max should not change the value type. Using
> > `Long` for sum() make sense though, and also to require a ` > Number>`.
> 
>  Are there any reasons to limit the sum() to integers? Why not use a
> >> Double
>  instead?
> 
>  Yeah, if the precision is important, then we should stick with Double.
> >>>
> >>> -mohan
> >>>
> >>> Best regards,
>  Alexandre
> 
>  On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> >> mposde...@gmail.com>
>  wrote:
> 
> > Matthias,
> >
> > On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
>   >>
> > wrote:
> >
> >> Hi,
> >>
> >> I think extending min/max to non-numeric types makes sense.
> Wondering
> >> why we should require a `Comparator` or if we should require that
> the
> >> types implement `Comparable` instead?
> >>
> >> Good question. This is what it would look like:
> >
> > KTable min_comparable()
> > KTable min_comparator(Comparator comp)
> >
> > For min_comparable to work, you still need the bounds "V extends
> > Comparable<
> > V>". AFAICT, to avoid the "type parameter V hiding the type V"
> warning,
>  it
> > has to be at the interface level like this:
> >
> >  KStream>
> >
> > which is a little messy unless there is a different way to do the
> same.
>  The
> > comparator gives a simple way to define an anonymous function.
> >
> > What do you think ?
> >
> >
> >> I also think, that min/max should not change the value type. Using
> >> `Long` for sum() make sense though, and also to require a ` extends
> >> Number>`.
> >>
> >> I guess these are the two possibilities:
> >
> >  Long sum(Function func)
> > Long sum(Function func)
> >
> > Both should work. "func" can return any subtypes of Number and I
> 

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-08 Thread Matthias J. Sax
Thanks for sharing your thoughts. I guess my first question about why
using the key boils down to the use case, and maybe you have something
else in mind than I do.

>> I see it this way: we define 'distinct' operation as returning a single
>> record per time window per selected key,

I believe this sentence explains your way of thinking about it. My way
of thinking about it is different though: KStream de-duplication means
to "filter/drop" duplicate records, and a record is by definition a
duplicate if key AND value are the same. --- Or are the use case, for
which there might be an "message ID" and even if the "message ID" is the
same, the content of the message might be different? If this holds, do
we really de-duplicate records (sounds more like, pick a random one)?


Just re-read some of my older replies, and I guess, back than I did just
comment about your KeyExtractor idea, without considering the
end-the-end picture. Thus, my reply below goes into a different
direction now:

We only need to apply a window because we need to purge state
eventually. To this end, I actually believe that applying a "sliding
window" is the best way to go: for each message we encounter, we start
the de-duplication window when the message arrives, don't emit any
duplicates as long as the window is open, and purge the state afterwards.

Of course, internally, the implementation must be quite different
compared to a regular aggregation: we need to pull the value into the
key, to create a unique window for each message, but this seems to be an
implementation detail. Thus, I am wondering if we should not try to put
a square pig into a round whole: the current windowing/aggregation API
is not designed for a KStream de-duplication use-case, because a
de-duplication is no aggregation to begin with. Why not use a different API:

KStream KStream#deduplicate(final Duration windowSize);

Including some more overloads to allow configuring the internal state
store (this state store should not be queryable similar to
KStream-KStream state stores...).


To recap: Could it be that the idea to apply a DISTINCT-aggregation is
for a different use-case than to remove duplicate messages from a KStream?



-Matthias


On 8/6/21 4:12 AM, Ivan Ponomarev wrote:
>>   - Why restrict de-duplication for the key only? Should we not also
>> consider the value (or make it somehow flexible and let the user choose?)
> 
> Wasn't it the first idea that we abandoned (I mean, to provide
> 'KeyExtractor' and so on)?
> 
> In order to keep things simple we decided to make
> 
> .selectKey(...) //here we select anything we need
>   //add markAsPartitioned from KIP-759 to taste
>   .groupByKey()
>   .windowed(...)
>   .distinct()
> //the only new operation that we add to the API, reusing
> //all the windowed aggregations' infrastructure
> 
>>   - I am wondering if the return type should be `KStream` instead of a
>> `KTable`? If I deduplicate a stream, I might expect a stream back? I
>> don't really consider a stream de-duplication an aggregation with
>> "mutable state"...
> 
> First, because it's going to be an operation on a
> Time/SessionWindowedKStream, and these operations usually return
> KTable, ...>. Then, it might be useful to know to which
> time window a deduplicated record actually belongs. And it is trivial
> task to turn this table back to a stream.
> 
>> IMHO, an unordered stream and it's ordered "cousin" should
>> yield the same result? -- Given your example it seems you want to keep
>> the first record base on offset order. Wondering why?
> 
> I see it this way: we define 'distinct' operation as returning a single
> record per time window per selected key, no matter what record. So it's
> ok if it yields different results for different orderings if its main
> property holds!
> 
> And since we can select any key we like, we can get any degree of
> 'deduplication granularity' and 'determinism'.
> 
>> While I agree that deduplication for overlapping window is questionable,
>> I am still wondering if you plan to disallow it (by adding a runtime
>> check and throwing an exception), or not?
> 
> Thanks for this point! I think that 'fail-fast' approach is good. We
> might need to throw an exception, I will add this into the KIP:
> 
> - SessionWindows -- OK
> - SlidingWindows -- Exception
> - TimeWindows --
>  tumbling -- OK
>  hopping  -- Exception
> 
> 
> Regards,
> 
> Ivan
> 
> 
> 04.08.2021 4:22, Matthias J. Sax пишет:
>> Couple of questions:
>>
>>   - Why restrict de-duplication for the key only? Should we not also
>> consider the value (or make it somehow flexible and let the user choose?)
>>
>>   - I am wondering if the return type should be `KStream` instead of a
>> `KTable`? If I deduplicate a stream, I might expect a stream back? I
>> don't really consider a stream de-duplication an aggregation with
>> "mutable state"...
>>
>> Also, why would the result key need to be windowed?
>>
>> Btw: How should out-of-order data be handled? 

Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-08 Thread Matthias J. Sax
Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a `KStream` passed in, you can skip repartitioning via
`selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
slightly prefer to add `markAsPartitioned()` as an operator.

(Maybe we should have expose a `PartitionedKStream` as first class
object to begin with... Hard to introduce now I guess...)


The concern about IQ is interesting -- I did not realize this impact.
Thanks for bringing it up.

> a repartition would be a no-op, ie that the stream (and its partitioning)
> would be the same
> whether or not a repartition is inserted. For this to be true, it then has
> to be the case that
> 
> Partitioner.partition(key) == Partitioner.partition(map(key))

@Sophie: I don't think this statement is correct. A `markAsPartition()`
only means, that the existing partitioning ensure that all messages of
the same new key are still in the same partition. Ie, it cannot happen
that two new keys (that are the same) are in a different partition.

However, if you would physically repartitiong on the new key using the
same hash-function as for the old key, there is no guarantee that the
same partitions would be picked... And that is why IQ breaks downstream.

Btw: using `markAsPartitioned()` could also be an issue for joins for
the same reason... I want to call out, that the Jira tickets that did
raise the concern about unnecessary repartitioning are about downstream
aggregations though...

Last but not least: we actually have a similar situation for
windowed-aggregations: The result of a window aggregation is partitioned
by the "plain key": if we write the result into a topic using the same
partitioning function, we would write to different partitions... (I
guess it was never an issue so far, as we don't have KIP-300 in place
yet...)

It's also not an issue for IQ, because we know the plain key, and thus
can route to the right task.


About a solution: I think it might be ok to say we don't need to solve
this problem, but it's the users responsibility to take IQ into account.
Ie, if they want to use IQ downstream, the need to repartition: for this
case, repartitioning is _NOT_ unnecessary... The same argument seems to
apply for the join case I mentioned above. -- Given that
`markAsPartitioned()` is an advanced feature, it seems ok to leave it to
the user to use correctly (we should of course call it out in the docs!).



-Matthias



On 8/7/21 7:45 PM, Sophie Blee-Goldman wrote:
> Before I dive in to the question of IQ and the approaches you proposed, can
> you just
> elaborate on the problem itself? By definition, the `markAsPartitioned`
> flag means that
> a repartition would be a no-op, ie that the stream (and its partitioning)
> would be the same
> whether or not a repartition is inserted. For this to be true, it then has
> to be the case that
> 
> Partitioner.partition(key) == Partitioner.partition(map(key))
> 
> The left-hand side of the above is precisely how we determine the partition
> number that
> a key belongs to when using IQ. It shouldn't matter whether the user is
> querying a key
> in a store upstream of the key-changing operation or a mapped key
> downstream of it
> -- either way we just apply the given Partitioner.
> 
> See StreamsMetadataState#getKeyQueryMetadataForKey
> 
> for where this happens
> 
> 
> If we're concerned that users might try to abuse the new
> `markAsPartitioned` feature,
> or accidentally misuse it, then we could add a runtime check that applies
> the Partitioner
> associated with 

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-08 Thread Matthias J. Sax
>>> Not sure. Also, do you have an opinion on Long vs Double ?

Not sure what you mean by `Long vs Double` ? Can you elaborate?


-Matthias

On 8/8/21 7:41 AM, Mohan Parthasarathy wrote:
> On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax  wrote:
> 
>> I was playing with the code a little bit, but it seems not to be easy to
>> use generics to enforce that V is `Comparable`...
>>
>> We would need to introduce a new interface
>>
>>  interface ComparableStream>
>> extends KStream
>>  {
>> KTable min();
>>  }
>>
>> But it also requires a nasty cast to actually use it:
>>
>>   KStream stream =
>> new StreamsBuilder().stream("");
>>   KTable table =
>> ((ComparableStream) stream).min();
>>
>> If the value-type does not implement `Comparable` the cast would not
>> compile... Or would there be a simpler way to ensure that min() can only
>> be called _if_ V is `Comparable`?
>>
>>
>> So maybe passing in a `Comparator` might be the right way to go;
>> might also be more flexible anyway. -- My original idea was just to
>> maybe avoid the `Comparator` argument, as it would make the API nicer
>> IMHO; fewer parameters is usually better...
>>
>> Yeah, I tried both and using Comparator seems more natural to me in this
> case. I will update the document with the discussion here.
> 
> 
> 
>>
>> I am not sure why we would want to pass `Function>
>> func` into `min()`?
>>
>> Not sure. Also, do you have an opinion on Long vs Double ?
> 
> -thanks
> Mohan
> 
> 
>>
>>
>> -Matthias
>>
>>
>>
>> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
>>> Alex,
>>>
>>>
>>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
>> alexandre.bra...@gmail.com>
>>> wrote:
>>>
 Mohan / Mathias,

>> I think extending min/max to non-numeric types makes sense. Wondering
>> why we should require a `Comparator` or if we should require that the
>> types implement `Comparable` instead?
>>
> Good question. This is what it would look like:
>
> KTable min_comparable()
> KTable min_comparator(Comparator comp)

 Not sure if I understood Mathias' proposal correctly, but I think that
 instead of going with
 your original proposal ( KTable
>> min(Function>>> VR> func...)
 or mine (KTable min(Comparator comparator...), we could
>> simplify
 it a
 bit by using a function to extract a Comparable property from the
>> original
 value:

 KTable min(Function> func...)

 I will let Matthias clarify. I am not sure why it is simpler than the
>>> comparator one. Comparable is implemented by the type and not sure
>> exposing
>>> it this way makes it any better.
>>>
 I also think, that min/max should not change the value type. Using
> `Long` for sum() make sense though, and also to require a ` Number>`.

 Are there any reasons to limit the sum() to integers? Why not use a
>> Double
 instead?

 Yeah, if the precision is important, then we should stick with Double.
>>>
>>> -mohan
>>>
>>> Best regards,
 Alexandre

 On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
>> mposde...@gmail.com>
 wrote:

> Matthias,
>
> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
 >
> wrote:
>
>> Hi,
>>
>> I think extending min/max to non-numeric types makes sense. Wondering
>> why we should require a `Comparator` or if we should require that the
>> types implement `Comparable` instead?
>>
>> Good question. This is what it would look like:
>
> KTable min_comparable()
> KTable min_comparator(Comparator comp)
>
> For min_comparable to work, you still need the bounds "V extends
> Comparable<
> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
 it
> has to be at the interface level like this:
>
>  KStream>
>
> which is a little messy unless there is a different way to do the same.
 The
> comparator gives a simple way to define an anonymous function.
>
> What do you think ?
>
>
>> I also think, that min/max should not change the value type. Using
>> `Long` for sum() make sense though, and also to require a `> Number>`.
>>
>> I guess these are the two possibilities:
>
>  Long sum(Function func)
> Long sum(Function func)
>
> Both should work. "func" can return any subtypes of Number and I don't
 see
> any advantages with the first version. Can you clarify ?
>
> Thanks
> Mohan
>
>
>>
>> -Matthias
>>
>> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
>>> Hi Alex,
>>>
>>> On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
>> alexandre.bra...@gmail.com>
>>> wrote:
>>>

 My point here is that, when we're only interested in a max/min
 numeric
 value, it doesn't
 matter when we have repeated values, since we'd be only forwarding
 the
 number downstream,

High kafka latency on simple benchmark

2021-08-08 Thread Виталий Ромашкин

Hi Devs,
 
Currently, I am bench-marking different transports.
The first one is Kafka.
I created a repo in my GitHub —  https://github.com/rvit34/transport-benchmark
The result for Kafka is not so good. For RPS 25K and higher latency is about 
1second and higher.
Maybe I'm doing something completely wrong but If I change transport from Kafka 
to Aeron my max latency is always under 100ms for any workload (100K RPS and 
higher).
So, might somebody check it out?
 
 
Best Regards, Vitaly.

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-08 Thread Mohan Parthasarathy
On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax  wrote:

> I was playing with the code a little bit, but it seems not to be easy to
> use generics to enforce that V is `Comparable`...
>
> We would need to introduce a new interface
>
>  interface ComparableStream>
> extends KStream
>  {
> KTable min();
>  }
>
> But it also requires a nasty cast to actually use it:
>
>   KStream stream =
> new StreamsBuilder().stream("");
>   KTable table =
> ((ComparableStream) stream).min();
>
> If the value-type does not implement `Comparable` the cast would not
> compile... Or would there be a simpler way to ensure that min() can only
> be called _if_ V is `Comparable`?
>
>
> So maybe passing in a `Comparator` might be the right way to go;
> might also be more flexible anyway. -- My original idea was just to
> maybe avoid the `Comparator` argument, as it would make the API nicer
> IMHO; fewer parameters is usually better...
>
> Yeah, I tried both and using Comparator seems more natural to me in this
case. I will update the document with the discussion here.



>
> I am not sure why we would want to pass `Function>
> func` into `min()`?
>
> Not sure. Also, do you have an opinion on Long vs Double ?

-thanks
Mohan


>
>
> -Matthias
>
>
>
> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> > Alex,
> >
> >
> > On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> alexandre.bra...@gmail.com>
> > wrote:
> >
> >> Mohan / Mathias,
> >>
>  I think extending min/max to non-numeric types makes sense. Wondering
>  why we should require a `Comparator` or if we should require that the
>  types implement `Comparable` instead?
> 
> >>> Good question. This is what it would look like:
> >>>
> >>> KTable min_comparable()
> >>> KTable min_comparator(Comparator comp)
> >>
> >> Not sure if I understood Mathias' proposal correctly, but I think that
> >> instead of going with
> >> your original proposal ( KTable
> min(Function >> VR> func...)
> >> or mine (KTable min(Comparator comparator...), we could
> simplify
> >> it a
> >> bit by using a function to extract a Comparable property from the
> original
> >> value:
> >>
> >> KTable min(Function> func...)
> >>
> >> I will let Matthias clarify. I am not sure why it is simpler than the
> > comparator one. Comparable is implemented by the type and not sure
> exposing
> > it this way makes it any better.
> >
> >> I also think, that min/max should not change the value type. Using
> >>> `Long` for sum() make sense though, and also to require a ` >>> Number>`.
> >>
> >> Are there any reasons to limit the sum() to integers? Why not use a
> Double
> >> instead?
> >>
> >> Yeah, if the precision is important, then we should stick with Double.
> >
> > -mohan
> >
> > Best regards,
> >> Alexandre
> >>
> >> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> mposde...@gmail.com>
> >> wrote:
> >>
> >>> Matthias,
> >>>
> >>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
> >>  
> >>> wrote:
> >>>
>  Hi,
> 
>  I think extending min/max to non-numeric types makes sense. Wondering
>  why we should require a `Comparator` or if we should require that the
>  types implement `Comparable` instead?
> 
>  Good question. This is what it would look like:
> >>>
> >>> KTable min_comparable()
> >>> KTable min_comparator(Comparator comp)
> >>>
> >>> For min_comparable to work, you still need the bounds "V extends
> >>> Comparable<
> >>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
> >> it
> >>> has to be at the interface level like this:
> >>>
> >>>  KStream>
> >>>
> >>> which is a little messy unless there is a different way to do the same.
> >> The
> >>> comparator gives a simple way to define an anonymous function.
> >>>
> >>> What do you think ?
> >>>
> >>>
>  I also think, that min/max should not change the value type. Using
>  `Long` for sum() make sense though, and also to require a `  Number>`.
> 
>  I guess these are the two possibilities:
> >>>
> >>>  Long sum(Function func)
> >>> Long sum(Function func)
> >>>
> >>> Both should work. "func" can return any subtypes of Number and I don't
> >> see
> >>> any advantages with the first version. Can you clarify ?
> >>>
> >>> Thanks
> >>> Mohan
> >>>
> >>>
> 
>  -Matthias
> 
>  On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> > Hi Alex,
> >
> > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
>  alexandre.bra...@gmail.com>
> > wrote:
> >
> >>
> >> My point here is that, when we're only interested in a max/min
> >> numeric
> >> value, it doesn't
> >> matter when we have repeated values, since we'd be only forwarding
> >> the
> >> number downstream,
> >> so I could disregard when the Comparator returns a zero value
> >> (meaning
> >> equals) and min/max
> >> would still be semantically correct. But when we're forwarding the
>  original
> >> object downstream
> >> instead of its 

Re: [DISCUSS] KIP-766: fetch/findSessions queries with open endpoints for SessionStore/WindowStore

2021-08-08 Thread Luke Chen
Hi all,
Sorry that I found the KIP link I provided in the previous email is wrong.
Updated link is as below.
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186876596

Thank you.
Luke


On Thu, Aug 5, 2021 at 2:31 PM Luke Chen  wrote:

> Hi everyone,
>
> I'd like to start the discussion for *KIP-766: fetch/findSessions queries
> with open endpoints for WindowStore/SessionStore*.
>
> This is a follow-up KIP for KIP-763: Range queries with open endpoints
> .
> In KIP-763, we focused on *ReadOnlyKeyValueStore*, in this KIP, we'll
> focus on *ReadOnlySessionStore* and *ReadOnlyWindowStore, *to have open
> endpoints queries for SessionStore/WindowStore.
>
> The KIP can be found here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186876596
> 
>
> Thank you.
> Luke
>
>


[jira] [Created] (KAFKA-13178) frequent network_exception trace at kafka producer.

2021-08-08 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-13178:


 Summary: frequent network_exception trace at kafka producer.
 Key: KAFKA-13178
 URL: https://issues.apache.org/jira/browse/KAFKA-13178
 Project: Kafka
  Issue Type: Bug
Reporter: kaushik srinivas


Running 3 node kafka cluster (4 cores and 4 cpu in k8s).

topics : 15, partitions each : 15 , replication factor : 3. min.insync.replicas 
: 2

producer is running with acks : "all"

We see frequent failures in the kafka producer with below trace

{"host":"ww","level":"WARN","log":{"classname":"org.apache.kafka.clients.producer.internals.Sender:595","message":"[Producer
 clientId=producer-1] Got error produce response with correlation id 2646 on 
topic-partition *-0, retrying (2 attempts left). Error: 
NETWORK_EXCEPTION","stacktrace":"","threadname":"kafka-producer-network-thread 
| 
producer-1"},"time":"2021-08-*04T02:22:20.529Z","timezone":"UTC","type":"log","system":"w","systemid":"3"}

 

What could be possible reasons for the above trace ? 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13177) partition failures and fewer shrink but a lot of isr expansions with increased num.replica.fetchers in kafka brokers

2021-08-08 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-13177:


 Summary: partition failures and fewer shrink but a lot of isr 
expansions with increased num.replica.fetchers in kafka brokers
 Key: KAFKA-13177
 URL: https://issues.apache.org/jira/browse/KAFKA-13177
 Project: Kafka
  Issue Type: Bug
Reporter: kaushik srinivas


Installing 3 node kafka broker cluster (4 core cpu and 4Gi memory on k8s)

topics : 15, partitions each : 15 replication factor 3, min.insync.replicas  : 2

producers running with acks : all

Initially the num.replica.fetchers was set to 1 (default) and we observed very 
frequent ISR shrinks and expansions. So the setups were tuned with a higher 
value of 4. 

Once after this change was done, we see below behavior and warning msgs in 
broker logs
 # Over a period of 2 days, there are around 10 shrinks corresponding to 10 
partitions, but around 700 ISR expansions corresponding to almost all 
partitions in the cluster(approx 50 to 60 partitions).
 # we see frequent warn msg of partitions being marked as failure in the same 
time span. Below is the trace --> {"type":"log", "host":"ww", 
"level":"WARN", "neid":"kafka-ww", "system":"kafka", 
"time":"2021-08-03T20:09:15.340", "timezone":"UTC", 
"log":{"message":"ReplicaFetcherThread-2-1003 - 
kafka.server.ReplicaFetcherThread - *[ReplicaFetcher replicaId=1001, 
leaderId=1003, fetcherId=2] Partition test-16 marked as failed"}}*

 

We see the above behavior continuously after increasing the 
num.replica.fetchers to 4 from 1. We did increase this to improve the 
replication performance and hence reduce the ISR shrinks.

But we see this strange behavior after the change. What would the above trace 
indicate and is marking partitions as failed just a WARN msgs and handled by 
kafka or is it something to worry about ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13176) frequent ISR shrinks and expansion with default num.replica.fetchers (1) under very low throughput conditions.

2021-08-08 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-13176:


 Summary: frequent ISR shrinks and expansion with default 
num.replica.fetchers (1) under very low throughput conditions.
 Key: KAFKA-13176
 URL: https://issues.apache.org/jira/browse/KAFKA-13176
 Project: Kafka
  Issue Type: Bug
Reporter: kaushik srinivas


Running a 3 node kafka cluster (2.3.x kafka) with 4 cores of cpu and 4Gi of 
memory on  a k8s environment.

num.replica.fetchers is configured to 1 (default value).

There are around 15 topics in the cluster and all of them receive a very low 
rate of records/sec (less than 100 per second most of the cases).

All the topics have more than 10 partitions and 3 replication each. 
min.insync.replicas is set to 2. And producers are run with acks level set to 
'all'.

we constantly observer ISR shrinks and expansions for almost each topic 
partition continuously. shrinks and expansions are mostly seperated by around 6 
to 8 seconds mostly usually.

During these shrinks and expands we see a lot of request time outs at the kafka 
producer side for these topics.

any known configuration items we can use to overcome this ? 

Confused about the fact of continuous ISR shrinks and expands with very low 
throughput topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)