[DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-06-02 Thread Mohan Parthasarathy
Hi,

I have created a proposal for adding some additional aggregation APIs like
count.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs

I have noted down some of the issues that need discussion. Thanks to
Matthias for helping me with the scope of the proposal.

Thanks
Mohan


Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-06-05 Thread Mohan Parthasarathy
Hi Alex,

Responses below.

On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil 
wrote:

> Hi Mohan,
>
> I like the idea of adding those methods to the API, but I'd like to make a
> suggestion:
>
> Although the most used scenario for min() / max() might possibly be for
> numeric values, I think they could also be
> useful on other objects like Dates, LocalDates or Strings. Why limit the
> API to Numbers only?
>
>
There was no specific reason. Just addressing the common scenario. But I
don't see why this can't be supported given your suggestion below.

Extending on the above, couldn't we change the API to provide a
> Comparator instead of the Function
> for those methods, and make them return a KTable instead? Not only
> would this approach not limit the
> usage of those methods to Numbers, but they'd also preserve the origin from
> the min/max value [1]. The extraction of
> a single (numeric?) value could be achieved by a subsequent .mapValues()
> operator, and this strategy would also
> allow us to reuse the stream's current value serde on min / max, making the
> Materialized an optional parameter.
>
> I like your idea though it is odd that min/max returns  KTable
instead of the KTable (like in count), but mapValues should do the
trick.

One extra complication of this approach is that now we'd have to handle
> repeated min/max values from different
> origins (two semantically different objects for which the comparator
> returns 0), but we could solve that by adding
> a parameter to specify whether to use the older or newer value (or assuming
> one of these options as default for a
> simpler API?).
>
> I am not sure whether this complexity is warranted. Why can't we just
stick to the way a regular Comparator works ? Can you give me a real world
example ?

>
> I know it's an implementation issue, but I'm curious on how you'd solve
> handling the  on
> the sum(). Since the multiple implementations of this interface don't have
> a common constructor nor an interface
> method to add two Numbers, would it be possible to implement sum() and
> retain the original VR type on the
> returned KTable?
>

Not knowing the schema of the value (V) has its own set of problems. As I
have alluded to in the proposal, this is a little bit messy. We already
have "reduce" which can be used to implement sum (mapValues().reduce()).
Thinking about it more, do you think "sum" would be useful ? One hacky way
to implement this is to inspect the type of the return when the "func" is
called the first time OR infer from the materialized or have an explicit
initializer.

Thanks
Mohan


> [1]: An example scenario for this would be to find the min / max Bidding
> for a product where, at the end of the
> auction, I need not only the min / max value of said Bidding, but also the
> bidder's contact information.
>
> Best,
> Alexandre
>
> On Wed, Jun 2, 2021 at 8:54 PM Mohan Parthasarathy 
> wrote:
>
> > Hi,
> >
> > I have created a proposal for adding some additional aggregation APIs
> like
> > count.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-747+Add+support+for+basic+aggregation+APIs
> >
> > I have noted down some of the issues that need discussion. Thanks to
> > Matthias for helping me with the scope of the proposal.
> >
> > Thanks
> > Mohan
> >
>


Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-06-08 Thread Mohan Parthasarathy
Hi Alex,

On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil 
wrote:

>
> My point here is that, when we're only interested in a max/min numeric
> value, it doesn't
> matter when we have repeated values, since we'd be only forwarding the
> number downstream,
> so I could disregard when the Comparator returns a zero value (meaning
> equals) and min/max
> would still be semantically correct. But when we're forwarding the original
> object downstream
> instead of its numeric property, it could mean different things
> semantically depending on how
> we handle the repeated values.
>
> As an example, if I were using max() on a stream of Biddings for products
> in an auction, the
> order of the biddings would probably influence the winner if two clients
> send Biddings with the
> same value. If we're only forwarding the Bidding value downstream (a double
> value of 100, for
> example), it doesn't matter how repeated values are handled, since the max
> price for this
> auction would still be 100.00, no matter what Bidding got selected in the
> end. But if we're
> forwarding the Biddings downstream instead, then it matters whether the
> winning Bidding sent
> downstream was originally posted by Client A or Client B.
>
> I'm not saying that an overloaded method to handle different options for
> how repeated values
> should be handled by min/max is mandatory, but it should be clear on the
> methods' docs
> what would happen when Comparator.compare() == 0. My preferred option for
> the default
> behaviour is to only forward a new value is smaller/bigger than the
> previous min/max value
> (ignoring compare() == 0), since it would emit less values downstream and
> would be easier
> to read ("I only send a value downstream if it's bigger/smaller than the
> previously selected
> value").
>
Thanks for the clarification. I like your suggestion unless someone feels
that they want an option to control this (i.e., when compare() == 0, return
the old value vs new value).


>
> > Not knowing the schema of the value (V) has its own set of problems. As I
> have alluded to
> > in the proposal, this is a little bit messy. We already have "reduce"
> which can be used to
> > implement sum (mapValues().reduce()).
> > Thinking about it more, do you think "sum" would be useful ? One hacky
> way to implement
> > this is to inspect the type of the return when the "func" is called the
> first time OR infer from
> > the materialized or have an explicit initializer.
>
> I think it might be useful for some use cases, yes, but it would be tricky
> to implement this in a
> way that handles generic Numbers and keeps their original implementation
> class. One
> simplification you could take is fixating VR to be a Double, and then use
> Number.doubleValue()
> to compute the sum.
>

Yeah, that would simplify quite a bit. I think you are suggesting this:

KTable sum(Function func)


> What you said about using reduce() to compute a sum() is also true for
> min() and max(). =) All
> three methods in this KIP would be a syntactic sugar for what could
> otherwise be implemented
> using reduce/aggregate, but I see value in implementing them and
> simplifying the adoption of
> those use cases.
>
> Agreed. I seem to have forgotten the reason as to why I started this KIP
:-). There is a long way to go.

-thanks
Mohan

Best regards,
> Alexandre
>
> On Sat, Jun 5, 2021 at 10:17 PM Mohan Parthasarathy 
> wrote:
>
> > Hi Alex,
> >
> > Responses below.
> >
> > On Fri, Jun 4, 2021 at 9:27 AM Alexandre Brasil <
> > alexandre.bra...@gmail.com>
> > wrote:
> >
> > > Hi Mohan,
> > >
> > > I like the idea of adding those methods to the API, but I'd like to
> make
> > a
> > > suggestion:
> > >
> > > Although the most used scenario for min() / max() might possibly be for
> > > numeric values, I think they could also be
> > > useful on other objects like Dates, LocalDates or Strings. Why limit
> the
> > > API to Numbers only?
> > >
> > >
> > There was no specific reason. Just addressing the common scenario. But I
> > don't see why this can't be supported given your suggestion below.
> >
> > Extending on the above, couldn't we change the API to provide a
> > > Comparator instead of the Function
> > > for those methods, and make them return a KTable instead? Not
> only
> > > would this approach not limit the
> > > usage of those methods to Numbers, but they'd also preserve the origin
> 

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-06-15 Thread Mohan Parthasarathy
Matthias,

On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax 
wrote:

> Hi,
>
> I think extending min/max to non-numeric types makes sense. Wondering
> why we should require a `Comparator` or if we should require that the
> types implement `Comparable` instead?
>
> Good question. This is what it would look like:

KTable min_comparable()
KTable min_comparator(Comparator comp)

For min_comparable to work, you still need the bounds "V extends Comparable<
V>". AFAICT, to avoid the "type parameter V hiding the type V" warning, it
has to be at the interface level like this:

 KStream>

which is a little messy unless there is a different way to do the same. The
comparator gives a simple way to define an anonymous function.

What do you think ?


> I also think, that min/max should not change the value type. Using
> `Long` for sum() make sense though, and also to require a ` Number>`.
>
> I guess these are the two possibilities:

 Long sum(Function func)
Long sum(Function func)

Both should work. "func" can return any subtypes of Number and I don't see
any advantages with the first version. Can you clarify ?

Thanks
Mohan


>
> -Matthias
>
> On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> > Hi Alex,
> >
> > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> alexandre.bra...@gmail.com>
> > wrote:
> >
> >>
> >> My point here is that, when we're only interested in a max/min numeric
> >> value, it doesn't
> >> matter when we have repeated values, since we'd be only forwarding the
> >> number downstream,
> >> so I could disregard when the Comparator returns a zero value (meaning
> >> equals) and min/max
> >> would still be semantically correct. But when we're forwarding the
> original
> >> object downstream
> >> instead of its numeric property, it could mean different things
> >> semantically depending on how
> >> we handle the repeated values.
> >>
> >> As an example, if I were using max() on a stream of Biddings for
> products
> >> in an auction, the
> >> order of the biddings would probably influence the winner if two clients
> >> send Biddings with the
> >> same value. If we're only forwarding the Bidding value downstream (a
> double
> >> value of 100, for
> >> example), it doesn't matter how repeated values are handled, since the
> max
> >> price for this
> >> auction would still be 100.00, no matter what Bidding got selected in
> the
> >> end. But if we're
> >> forwarding the Biddings downstream instead, then it matters whether the
> >> winning Bidding sent
> >> downstream was originally posted by Client A or Client B.
> >>
> >> I'm not saying that an overloaded method to handle different options for
> >> how repeated values
> >> should be handled by min/max is mandatory, but it should be clear on the
> >> methods' docs
> >> what would happen when Comparator.compare() == 0. My preferred option
> for
> >> the default
> >> behaviour is to only forward a new value is smaller/bigger than the
> >> previous min/max value
> >> (ignoring compare() == 0), since it would emit less values downstream
> and
> >> would be easier
> >> to read ("I only send a value downstream if it's bigger/smaller than the
> >> previously selected
> >> value").
> >>
> > Thanks for the clarification. I like your suggestion unless someone feels
> > that they want an option to control this (i.e., when compare() == 0,
> return
> > the old value vs new value).
> >
> >
> >>
> >>> Not knowing the schema of the value (V) has its own set of problems.
> As I
> >> have alluded to
> >>> in the proposal, this is a little bit messy. We already have "reduce"
> >> which can be used to
> >>> implement sum (mapValues().reduce()).
> >>> Thinking about it more, do you think "sum" would be useful ? One hacky
> >> way to implement
> >>> this is to inspect the type of the return when the "func" is called the
> >> first time OR infer from
> >>> the materialized or have an explicit initializer.
> >>
> >> I think it might be useful for some use cases, yes, but it would be
> tricky
> >> to implement this in a
> >> way that handles generic Numbers and keeps their original implementation
> >> class. One
> >> simplification you could take is fixating VR to be a Double,

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-06-21 Thread Mohan Parthasarathy
Alex,


On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil 
wrote:

> Mohan / Mathias,
>
> > > I think extending min/max to non-numeric types makes sense. Wondering
> > > why we should require a `Comparator` or if we should require that the
> > > types implement `Comparable` instead?
> > >
> > Good question. This is what it would look like:
> >
> > KTable min_comparable()
> > KTable min_comparator(Comparator comp)
>
> Not sure if I understood Mathias' proposal correctly, but I think that
> instead of going with
> your original proposal ( KTable min(Function VR> func...)
> or mine (KTable min(Comparator comparator...), we could simplify
> it a
> bit by using a function to extract a Comparable property from the original
> value:
>
> KTable min(Function> func...)
>
> I will let Matthias clarify. I am not sure why it is simpler than the
comparator one. Comparable is implemented by the type and not sure exposing
it this way makes it any better.

> I also think, that min/max should not change the value type. Using
> > `Long` for sum() make sense though, and also to require a ` > Number>`.
>
> Are there any reasons to limit the sum() to integers? Why not use a Double
> instead?
>
> Yeah, if the precision is important, then we should stick with Double.

-mohan

Best regards,
> Alexandre
>
> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy 
> wrote:
>
> > Matthias,
> >
> > On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
>  > >
> > wrote:
> >
> > > Hi,
> > >
> > > I think extending min/max to non-numeric types makes sense. Wondering
> > > why we should require a `Comparator` or if we should require that the
> > > types implement `Comparable` instead?
> > >
> > > Good question. This is what it would look like:
> >
> > KTable min_comparable()
> > KTable min_comparator(Comparator comp)
> >
> > For min_comparable to work, you still need the bounds "V extends
> > Comparable<
> > V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
> it
> > has to be at the interface level like this:
> >
> >  KStream>
> >
> > which is a little messy unless there is a different way to do the same.
> The
> > comparator gives a simple way to define an anonymous function.
> >
> > What do you think ?
> >
> >
> > > I also think, that min/max should not change the value type. Using
> > > `Long` for sum() make sense though, and also to require a ` > > Number>`.
> > >
> > > I guess these are the two possibilities:
> >
> >  Long sum(Function func)
> > Long sum(Function func)
> >
> > Both should work. "func" can return any subtypes of Number and I don't
> see
> > any advantages with the first version. Can you clarify ?
> >
> > Thanks
> > Mohan
> >
> >
> > >
> > > -Matthias
> > >
> > > On 6/8/21 5:00 PM, Mohan Parthasarathy wrote:
> > > > Hi Alex,
> > > >
> > > > On Tue, Jun 8, 2021 at 2:44 PM Alexandre Brasil <
> > > alexandre.bra...@gmail.com>
> > > > wrote:
> > > >
> > > >>
> > > >> My point here is that, when we're only interested in a max/min
> numeric
> > > >> value, it doesn't
> > > >> matter when we have repeated values, since we'd be only forwarding
> the
> > > >> number downstream,
> > > >> so I could disregard when the Comparator returns a zero value
> (meaning
> > > >> equals) and min/max
> > > >> would still be semantically correct. But when we're forwarding the
> > > original
> > > >> object downstream
> > > >> instead of its numeric property, it could mean different things
> > > >> semantically depending on how
> > > >> we handle the repeated values.
> > > >>
> > > >> As an example, if I were using max() on a stream of Biddings for
> > > products
> > > >> in an auction, the
> > > >> order of the biddings would probably influence the winner if two
> > clients
> > > >> send Biddings with the
> > > >> same value. If we're only forwarding the Bidding value downstream (a
> > > double
> > > >> value of 100, for
> > > >> example), it doesn't matter how repeated values are handled, since
> the
> > > max
&g

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-06-24 Thread Mohan Parthasarathy
Ivan,

I read through the discussion and your new proposal. I have a couple of
questions.

1. As we have cancelRepartition, wouldn't selectKey be sufficient. You
still have idExtractor. Maybe I misunderstood the discussion.
2. isPersistent should be replaced by Materialized. It looked like you
agreed to it as it might be useful to specify the retention also.

Thanks
Mohan


On Thu, Jun 24, 2021 at 8:12 AM Ivan Ponomarev 
wrote:

> Hello all,
>
> I have rewritten the KIP-655 summarizing what was agreed upon during
> this discussion (now the proposal is much simpler and less invasive).
>
> I have also created KIP-759 (cancelRepartition operation) and started a
> discussion for it.
>
> Regards,
>
> Ivan.
>
>
>
> 04.06.2021 8:15, Matthias J. Sax пишет:
> > Just skimmed over the thread -- first of all, I am glad that we could
> > merge KIP-418 and ship it :)
> >
> > About the re-partitioning concerns, there are already two tickets for it:
> >
> >   - https://issues.apache.org/jira/browse/KAFKA-4835
> >   - https://issues.apache.org/jira/browse/KAFKA-10844
> >
> > Thus, it seems best to exclude this topic from this KIP, and do a
> > separate KIP for it (if necessary, we can "pause" this KIP until the
> > repartition KIP is done). It's a long standing "issue" and we should
> > resolve it in a general way I guess.
> >
> > (Did not yet ready all responses in detail yet, so keeping this comment
> > short.)
> >
> >
> > -Matthias
> >
> > On 6/2/21 6:35 AM, John Roesler wrote:
> >> Thanks, Ivan!
> >>
> >> That sounds like a great plan to me. Two smaller KIPs are easier to
> agree on than one big one.
> >>
> >> I agree hopping and sliding windows will actually have a duplicating
> effect. We can avoid adding distinct() to the sliding window interface, but
> hopping windows are just a different parameterization of epoch-aligned
> windows. It seems we can’t do much about that except document the issue.
> >>
> >> Thanks,
> >> John
> >>
> >> On Wed, May 26, 2021, at 10:14, Ivan Ponomarev wrote:
> >>> Hi John!
> >>>
> >>> I think that your proposal is just fantastic, it simplifies things a
> lot!
> >>>
> >>> I also felt uncomfortable due to the fact that the proposed
> `distinct()`
> >>> is not somewhere near `count()` and `reduce(..)`. But
> >>> `selectKey(..).groupByKey().windowedBy(..).distinct()` didn't look like
> >>> a correct option for  me because of the issue with the unneeded
> >>> repartitioning.
> >>>
> >>> The bold idea that we can just CANCEL the repartitioning didn't came to
> >>> my mind.
> >>>
> >>> What seemed to me a single problem is in fact two unrelated problems:
> >>> `distinct` operation and cancelling the unneeded repartitioning.
> >>>
> >>>   > what if we introduce a parameter to `selectKey()` that specifies
> that
> >>> the caller asserts that the new key does _not_ change the data
> partitioning?
> >>>
> >>> I think a more elegant solution would be not to add a new parameter to
> >>> `selectKey` and all the other key-changing operations (`map`,
> >>> `transform`, `flatMap`, ...), but add a new operator
> >>> `KStream#cancelRepartitioning()` that resets `keyChangingOperation`
> flag
> >>> for the upstream node. Of course, "use it only if you know what you're
> >>> doing" warning is to be added. Well, it's a topic for a separate KIP!
> >>>
> >>> Concerning `distinct()`. If we use `XXXWindowedKStream` facilities,
> then
> >>> changes to the API are minimally invasive: we're just adding
> >>> `distinct()` to TimeWindowedKStream and SessionWindowedKStream, and
> >>> that's all.
> >>>
> >>> We can now define `distinct` as an operation that returns only a first
> >>> record that falls into a new window, and filters out all the other
> >>> records that fall into an already existing window. BTW, we can mock the
> >>> behaviour of such an operation with `TopologyTestDriver` using
> >>> `reduce((l, r) -> STOP)`.filterNot((k, v)->STOP.equals(v)).  ;-)
> >>>
> >>> Consider the following example (record times are in seconds):
> >>>
> >>> //three bursts of variously ordered records
> >>> 4, 5, 6
> >>> 23, 22, 24
> >>> 34, 33, 32
> >>> //'late arrivals'
> >>> 7, 22, 35
> >>>
> >>>
> >>> 1. 'Epoch-aligned deduplication' using tumbling windows:
> >>>
> >>>
> .groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(10))).distinct()
> >>>
> >>> produces
> >>>
> >>> (key@[0/1], 4)
> >>> (key@[2/3], 23)
> >>> (key@[3/4], 34)
> >>>
> >>> -- that is, one record per epoch-aligned window.
> >>>
> >>> 2. Hopping and sliding windows do not make much sense here, because
> they
> >>> produce multiple intersected windows, so that one record can be
> >>> multiplied, but we want deduplication.
> >>>
> >>> 3. SessionWindows work for 'data-aligned deduplication'.
> >>>
> >>>
> .groupByKey().windowedBy(SessionWindows.with(Duration.ofSeconds(10))).distinct()
> >>>
> >>>
> >>> produces only
> >>>
> >>> ([key@4000/4000], 4)
> >>> ([key@23000/23000], 23)
> >>>
> >>> because all the records bigger than 7 are stu

Re: [DISCUSS] KIP-501 Avoid out-of-sync or offline partitions when follower fetch requests are not processed in time

2021-06-27 Thread Mohan Parthasarathy
Hi Satish,

One small clarification regarding the proposal. I understand how Solution
(1) enables the other replicas to be chosen as the leader. But it is
possible that the other replicas may not be in sync yet and if unclean
leader election is not enabled, the other replicas may not become the
leader right ?

 It is not clear to me whether Solution 2 can happen independently. For
example, if the leader exceeds *leader.fetch.process.time.max.ms
* due to a transient condition,
should it relinquish leadership immediately ? That might be aggressive in
some cases. Detecting that a leader is slow cannot be determined by just
one occurrence, right ?

Thanks
Mohan


On Sun, Jun 27, 2021 at 4:01 AM Satish Duggana 
wrote:

> Hi Dhruvil,
> Thanks for looking into the KIP and providing your comments.
>
> There are two problems about the scenario raised in this KIP:
>
> a) Leader is slow and it is not available for reads or writes.
> b) Leader is causing the followers to be out of sync and cause the
> partitions unavailability.
>
> (a) should be detected and mitigated so that the broker can become a
> leader or replace with a different node if this node continues having
> issues.
>
> (b) will cause the partition to go under minimum ISR and eventually
> make that partition offline if the leader goes down. In this case,
> users have to enable unclean leader election for making the partition
> available. This may cause data loss based on the replica chosen as a
> leader. This is what several folks(including us) observed in their
> production environments.
>
> Solution(1) in the KIP addresses (b) to avoid offline partitions by
> not removing the replicas from ISR. This allows the partition to be
> available if the leader is moved to one of the other replicas in ISR.
>
> Solution (2) in the KIP extends solution (1) by relinquishing the
> leadership and allowing one of the other insync replicas to become a
> leader.
>
> ~Satish.
>


Re: [DISCUSS] KIP-501 Avoid out-of-sync or offline partitions when follower fetch requests are not processed in time

2021-06-28 Thread Mohan Parthasarathy
Hi Satish,


>
>
> >It is not clear to me whether Solution 2 can happen independently. For
> example, if the leader exceeds *leader.fetch.process.time.max.ms
> <http://leader.fetch.process.time.max.ms>* due to a transient condition,
> should it relinquish leadership immediately ? That might be aggressive in
> some cases. Detecting that a leader is slow cannot be determined by just
> one occurrence, right ?
>
> Solution(2) is an extension to Solution(1) as mentioned earlier in the
> KIP. This config is applicable only if
> `follower.fetch.pending.reads.insync.enable` is set as true. I have
> also updated the config description in the KIP to make that clear.
> In our observations, we do not always see this behavior continuously.
> It occurs intermittently and makes all the other requests pile up in
> the request queue. Sometimes, the broker goes down and makes the
> partitions offline.  Users need to set the config based on their
> host's configuration and behavior. We can also think about extending
> this config based on others observations.
>
>
That clarification in the document helps. But then setting the first option
to true does not necessarily mean that the condition is happening. Did you
mean to say that relinquish the leadership if it is taking longer than
leader.fetch.process.time.max.ms AND there are fetch requests pending which
are >= log-end-offset of the earlier fetch request ?

-Thanks
Mohan

> Thanks,
> Satish.
>
> On Mon, 28 Jun 2021 at 04:36, Mohan Parthasarathy 
> wrote:
> >
> > Hi Satish,
> >
> > One small clarification regarding the proposal. I understand how Solution
> > (1) enables the other replicas to be chosen as the leader. But it is
> > possible that the other replicas may not be in sync yet and if unclean
> > leader election is not enabled, the other replicas may not become the
> > leader right ?
> >
> >  It is not clear to me whether Solution 2 can happen independently. For
> > example, if the leader exceeds *leader.fetch.process.time.max.ms
> > <http://leader.fetch.process.time.max.ms>* due to a transient condition,
> > should it relinquish leadership immediately ? That might be aggressive in
> > some cases. Detecting that a leader is slow cannot be determined by just
> > one occurrence, right ?
> >
> > Thanks
> > Mohan
> >
> >
> > On Sun, Jun 27, 2021 at 4:01 AM Satish Duggana  >
> > wrote:
> >
> > > Hi Dhruvil,
> > > Thanks for looking into the KIP and providing your comments.
> > >
> > > There are two problems about the scenario raised in this KIP:
> > >
> > > a) Leader is slow and it is not available for reads or writes.
> > > b) Leader is causing the followers to be out of sync and cause the
> > > partitions unavailability.
> > >
> > > (a) should be detected and mitigated so that the broker can become a
> > > leader or replace with a different node if this node continues having
> > > issues.
> > >
> > > (b) will cause the partition to go under minimum ISR and eventually
> > > make that partition offline if the leader goes down. In this case,
> > > users have to enable unclean leader election for making the partition
> > > available. This may cause data loss based on the replica chosen as a
> > > leader. This is what several folks(including us) observed in their
> > > production environments.
> > >
> > > Solution(1) in the KIP addresses (b) to avoid offline partitions by
> > > not removing the replicas from ISR. This allows the partition to be
> > > available if the leader is moved to one of the other replicas in ISR.
> > >
> > > Solution (2) in the KIP extends solution (1) by relinquishing the
> > > leadership and allowing one of the other insync replicas to become a
> > > leader.
> > >
> > > ~Satish.
> > >
>


Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-08 Thread Mohan Parthasarathy
On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax  wrote:

> I was playing with the code a little bit, but it seems not to be easy to
> use generics to enforce that V is `Comparable`...
>
> We would need to introduce a new interface
>
>  interface ComparableStream>
> extends KStream
>  {
> KTable min();
>  }
>
> But it also requires a nasty cast to actually use it:
>
>   KStream stream =
> new StreamsBuilder().stream("");
>   KTable table =
> ((ComparableStream) stream).min();
>
> If the value-type does not implement `Comparable` the cast would not
> compile... Or would there be a simpler way to ensure that min() can only
> be called _if_ V is `Comparable`?
>
>
> So maybe passing in a `Comparator` might be the right way to go;
> might also be more flexible anyway. -- My original idea was just to
> maybe avoid the `Comparator` argument, as it would make the API nicer
> IMHO; fewer parameters is usually better...
>
> Yeah, I tried both and using Comparator seems more natural to me in this
case. I will update the document with the discussion here.



>
> I am not sure why we would want to pass `Function>
> func` into `min()`?
>
> Not sure. Also, do you have an opinion on Long vs Double ?

-thanks
Mohan


>
>
> -Matthias
>
>
>
> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> > Alex,
> >
> >
> > On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> alexandre.bra...@gmail.com>
> > wrote:
> >
> >> Mohan / Mathias,
> >>
> >>>> I think extending min/max to non-numeric types makes sense. Wondering
> >>>> why we should require a `Comparator` or if we should require that the
> >>>> types implement `Comparable` instead?
> >>>>
> >>> Good question. This is what it would look like:
> >>>
> >>> KTable min_comparable()
> >>> KTable min_comparator(Comparator comp)
> >>
> >> Not sure if I understood Mathias' proposal correctly, but I think that
> >> instead of going with
> >> your original proposal ( KTable
> min(Function >> VR> func...)
> >> or mine (KTable min(Comparator comparator...), we could
> simplify
> >> it a
> >> bit by using a function to extract a Comparable property from the
> original
> >> value:
> >>
> >> KTable min(Function> func...)
> >>
> >> I will let Matthias clarify. I am not sure why it is simpler than the
> > comparator one. Comparable is implemented by the type and not sure
> exposing
> > it this way makes it any better.
> >
> >> I also think, that min/max should not change the value type. Using
> >>> `Long` for sum() make sense though, and also to require a ` >>> Number>`.
> >>
> >> Are there any reasons to limit the sum() to integers? Why not use a
> Double
> >> instead?
> >>
> >> Yeah, if the precision is important, then we should stick with Double.
> >
> > -mohan
> >
> > Best regards,
> >> Alexandre
> >>
> >> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> mposde...@gmail.com>
> >> wrote:
> >>
> >>> Matthias,
> >>>
> >>> On Mon, Jun 14, 2021 at 9:18 PM Matthias J. Sax
> >>  >>>>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> I think extending min/max to non-numeric types makes sense. Wondering
> >>>> why we should require a `Comparator` or if we should require that the
> >>>> types implement `Comparable` instead?
> >>>>
> >>>> Good question. This is what it would look like:
> >>>
> >>> KTable min_comparable()
> >>> KTable min_comparator(Comparator comp)
> >>>
> >>> For min_comparable to work, you still need the bounds "V extends
> >>> Comparable<
> >>> V>". AFAICT, to avoid the "type parameter V hiding the type V" warning,
> >> it
> >>> has to be at the interface level like this:
> >>>
> >>>  KStream>
> >>>
> >>> which is a little messy unless there is a different way to do the same.
> >> The
> >>> comparator gives a simple way to define an anonymous function.
> >>>
> >>> What do you think ?
> >>>
> >>>
> >>>> I also think, that min/max should not change the value type. Using
> >>>> `Long` for sum() ma

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-08 Thread Mohan Parthasarathy
On Sun, Aug 8, 2021 at 3:56 PM Matthias J. Sax  wrote:

> >>> Not sure. Also, do you have an opinion on Long vs Double ?
>
> Not sure what you mean by `Long vs Double` ? Can you elaborate?
>
> We were discussing the return value for "sum". Alex suggested using
"Double" and you had "Long" in one of your responses I think. Do you have a
preference or reason one way or other ?

-mohan


> -Matthias
>
> On 8/8/21 7:41 AM, Mohan Parthasarathy wrote:
> > On Tue, Aug 3, 2021 at 6:56 PM Matthias J. Sax  wrote:
> >
> >> I was playing with the code a little bit, but it seems not to be easy to
> >> use generics to enforce that V is `Comparable`...
> >>
> >> We would need to introduce a new interface
> >>
> >>  interface ComparableStream>
> >> extends KStream
> >>  {
> >> KTable min();
> >>  }
> >>
> >> But it also requires a nasty cast to actually use it:
> >>
> >>   KStream stream =
> >> new StreamsBuilder().stream("");
> >>   KTable table =
> >> ((ComparableStream) stream).min();
> >>
> >> If the value-type does not implement `Comparable` the cast would not
> >> compile... Or would there be a simpler way to ensure that min() can only
> >> be called _if_ V is `Comparable`?
> >>
> >>
> >> So maybe passing in a `Comparator` might be the right way to go;
> >> might also be more flexible anyway. -- My original idea was just to
> >> maybe avoid the `Comparator` argument, as it would make the API nicer
> >> IMHO; fewer parameters is usually better...
> >>
> >> Yeah, I tried both and using Comparator seems more natural to me in this
> > case. I will update the document with the discussion here.
> >
> >
> >
> >>
> >> I am not sure why we would want to pass `Function>
> >> func` into `min()`?
> >>
> >> Not sure. Also, do you have an opinion on Long vs Double ?
> >
> > -thanks
> > Mohan
> >
> >
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> >>> Alex,
> >>>
> >>>
> >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> >> alexandre.bra...@gmail.com>
> >>> wrote:
> >>>
> >>>> Mohan / Mathias,
> >>>>
> >>>>>> I think extending min/max to non-numeric types makes sense.
> Wondering
> >>>>>> why we should require a `Comparator` or if we should require that
> the
> >>>>>> types implement `Comparable` instead?
> >>>>>>
> >>>>> Good question. This is what it would look like:
> >>>>>
> >>>>> KTable min_comparable()
> >>>>> KTable min_comparator(Comparator comp)
> >>>>
> >>>> Not sure if I understood Mathias' proposal correctly, but I think that
> >>>> instead of going with
> >>>> your original proposal ( KTable
> >> min(Function >>>> VR> func...)
> >>>> or mine (KTable min(Comparator comparator...), we could
> >> simplify
> >>>> it a
> >>>> bit by using a function to extract a Comparable property from the
> >> original
> >>>> value:
> >>>>
> >>>> KTable min(Function> func...)
> >>>>
> >>>> I will let Matthias clarify. I am not sure why it is simpler than the
> >>> comparator one. Comparable is implemented by the type and not sure
> >> exposing
> >>> it this way makes it any better.
> >>>
> >>>> I also think, that min/max should not change the value type. Using
> >>>>> `Long` for sum() make sense though, and also to require a ` >>>>> Number>`.
> >>>>
> >>>> Are there any reasons to limit the sum() to integers? Why not use a
> >> Double
> >>>> instead?
> >>>>
> >>>> Yeah, if the precision is important, then we should stick with Double.
> >>>
> >>> -mohan
> >>>
> >>> Best regards,
> >>>> Alexandre
> >>>>
> >>>> On Wed, Jun 16, 2021 at 1:01 AM Mohan Parthasarathy <
> >> mposde...@gmail.com>
> >>>> wrote:
> >>>>
> >>>>> Matthias,
> &

Re: [DISCUSS] KIP-747 Add support for basic aggregation APIs

2021-08-16 Thread Mohan Parthasarathy
On Mon, Aug 16, 2021 at 5:28 PM Matthias J. Sax  wrote:
>
> @Mohan:
>
> For sum(), I actually think that the return type should be same as the
> input type. In the end, sum() is a special case of reduce().
>
If you define something like this as you suggested (without defining
new interfaces):

 E sumG(Function func)

Internally the implementation still has to extract the "value" from
the return object "E" using longValue or doubleValue (to avoid loss of
precision) to do the sum. Once you do that, casting back to "E"  will
have an Unchecked cast error. I think the return type has to be fixed.
Possibilities:

Double sum(Function func)
Long sum(Function func)
 Long sum(Function func)
 Double sum(Function func)

Let me know If i am missing something.

Thanks
Mohan





> @Alexandre:
>
> Not sure if using a `Function` to get a `Comparable` is simpler that to
> implement a `Comparator`? Can you elaborate on this point? In the end,
> using a `Comparator` seems to provide highest flexibility?
>
>
> -Matthias
>
> On 8/15/21 10:27 AM, Alexandre Brasil wrote:
> >> I am not sure why we would want to pass `Function>
> >> func` into `min()`?
> >
> > I guess I misread/misunderstood your earlier suggestion.
> >
> > My line of thought was that, instead of using a method signature that
> > demands a Comparator in
> > min()/max(), we might use a property extractor (like the FK extractors on
> > some join() overloads) to
> > return a Comparable property that min()/max() could use to compare the
> > values.
> >
> > The benefit of this approach is that It would be simpler than implementing
> > comparators when most
> > use cases would probably compare properties of the values that already
> > implement Comparable (like
> > Numbers, Strings, Dates, etc), but on the other hand it would be more
> > limiting in disallowing the usage
> > of multiple properties of  or on defining how null property values
> > should be handled.
> >
> > On Tue, Aug 3, 2021 at 10:55 PM Matthias J. Sax  wrote:
> >
> >> I was playing with the code a little bit, but it seems not to be easy to
> >> use generics to enforce that V is `Comparable`...
> >>
> >> We would need to introduce a new interface
> >>
> >>  interface ComparableStream>
> >> extends KStream
> >>  {
> >> KTable min();
> >>  }
> >>
> >> But it also requires a nasty cast to actually use it:
> >>
> >>   KStream stream =
> >> new StreamsBuilder().stream("");
> >>   KTable table =
> >> ((ComparableStream) stream).min();
> >>
> >> If the value-type does not implement `Comparable` the cast would not
> >> compile... Or would there be a simpler way to ensure that min() can only
> >> be called _if_ V is `Comparable`?
> >>
> >>
> >> So maybe passing in a `Comparator` might be the right way to go;
> >> might also be more flexible anyway. -- My original idea was just to
> >> maybe avoid the `Comparator` argument, as it would make the API nicer
> >> IMHO; fewer parameters is usually better...
> >>
> >>
> >> I am not sure why we would want to pass `Function>
> >> func` into `min()`?
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >> On 6/21/21 11:23 AM, Mohan Parthasarathy wrote:
> >>> Alex,
> >>>
> >>>
> >>> On Wed, Jun 16, 2021 at 8:07 AM Alexandre Brasil <
> >> alexandre.bra...@gmail.com>
> >>> wrote:
> >>>
> >>>> Mohan / Mathias,
> >>>>
> >>>>>> I think extending min/max to non-numeric types makes sense. Wondering
> >>>>>> why we should require a `Comparator` or if we should require that the
> >>>>>> types implement `Comparable` instead?
> >>>>>>
> >>>>> Good question. This is what it would look like:
> >>>>>
> >>>>> KTable min_comparable()
> >>>>> KTable min_comparator(Comparator comp)
> >>>>
> >>>> Not sure if I understood Mathias' proposal correctly, but I think that
> >>>> instead of going with
> >>>> your original proposal ( KTable
> >> min(Function >>>> VR> func...)
> >>>> or mine (KTable min(Comparator comparator...), we could
> >> simplify
> >>>> it a
> >>>> bit by using a function to e

[jira] [Created] (KAFKA-8214) Handling RecordTooLargeException in the main thread

2019-04-10 Thread Mohan Parthasarathy (JIRA)
Mohan Parthasarathy created KAFKA-8214:
--

 Summary: Handling RecordTooLargeException in the main thread
 Key: KAFKA-8214
 URL: https://issues.apache.org/jira/browse/KAFKA-8214
 Project: Kafka
  Issue Type: Bug
 Environment: Kafka version 1.0.2
Reporter: Mohan Parthasarathy


How can we handle this exception in the main application ? If this task incurs 
this exception, then it does not commit the offset and hence it goes in a loop 
after that. This happens during aggregation process. We already have a limit on 
the message size of the topic which is 15 MB.


org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=2_6, processor=KSTREAM-SOURCE-16, 
topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, 
partition=6, offset=2049
    at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)

   
    at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
   
    at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)

     
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)

   
    at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)

   
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

 

Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort 
sending since an error caught with a previous record (key 
fe80::a112:a206:bc15:8e86&fe80::743c:160:c0be:9e66&0 value [B@20dced9e 
timestamp 1554238297629) to topic 
-detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to 
org.apache.kafka.common.errors.RecordTooLargeException: The message is 15728866 
bytes when serialized which is larger than the maximum request size you have 
configured with the max.request.size configuration. 
     
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
     
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
   
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
  
    at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)  

   
    at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)

   
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)

    
    at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)

    
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
  
    at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
  
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)
    
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
    
 
    at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)

    
  

[jira] [Resolved] (KAFKA-8214) Handling RecordTooLargeException in the main thread

2019-04-11 Thread Mohan Parthasarathy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-8214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mohan Parthasarathy resolved KAFKA-8214.

Resolution: Duplicate

> Handling RecordTooLargeException in the main thread
> ---
>
> Key: KAFKA-8214
> URL: https://issues.apache.org/jira/browse/KAFKA-8214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.2
>    Reporter: Mohan Parthasarathy
>Priority: Major
>
> How can we handle this exception in the main application ? If this task 
> incurs this exception, then it does not commit the offset and hence it goes 
> in a loop after that. This happens during aggregation process. We already 
> have a limit on the message size of the topic which is 15 MB.
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=2_6, processor=KSTREAM-SOURCE-16, 
> topic=r-detection-KSTREAM-AGGREGATE-STATE-STORE-12-repartition, 
> partition=6, offset=2049
>     at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:367)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:104)
>   
>  
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
>   
>    
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:862)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
>   
>      
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
>  
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_6] Abort 
> sending since an error caught with a previous record (key 
> fe80::a112:a206:bc15:8e86&fe80::743c:160:c0be:9e66&0 value [B@20dced9e 
> timestamp 1554238297629) to topic 
> -detection-KSTREAM-AGGREGATE-STATE-STORE-12-changelog due to 
> org.apache.kafka.common.errors.RecordTooLargeException: The message is 
> 15728866 bytes when serialized which is larger than the maximum request size 
> you have configured with the max.request.size configuration.  
>     
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:133)
>      
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
>   
>  
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:192)
>   
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:915)
>   
>    
>     at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:841)  
>   
>    
>     at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
>   
>   
>     at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
>   
>   
>     at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
>   
>     at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>