Re: [VOTE] KIP-825: introduce a new API to control when aggregated results are produced

2022-04-11 Thread John Roesler
Thanks, Hao!

I have no concern about amending the KIP to add metrics.
Thanks for thinking of it.

Can you comment on the choice to add them as a task-level metric
instead of a processor-level metric? This will cause the metrics for
all windowed aggregations in a task that use final emission to be
mixed together. It might be fine, but we should at least document
that it was anticipated and the reasons for the choice. By the way,
if we do add them as processor-node metrics but want them to
be measured at info level, we should also state it, since processor-
node metrics are usually debug.

Also, I'm concerned that the name `emitted-records` will be
ambiguous in the larger context of all Kafka Streams metrics. If I'm
right in thinking that these metrics are only for measuring the
behavior of emit-final windowed aggregations, then we should
make sure that the metric name says as much. Maybe:

emit-final-records-[rate|total]
emit-final-latency-[avg|max] 

Thanks!
-John

On Mon, Apr 11, 2022, at 14:25, Hao Li wrote:
> Hi all,
>
> I would like to introduce two metrics in this KIP as well to measure the
> latency and number of records emitted for emit final. They are named:
>
> `emit-final-latency`
> `emitted-records`
>
> I've updated the KIP with details in
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced
>
> Can you take a look and see if you have any concerns for the metrics?
>
> Thanks,
> Hao
>
> On Fri, Mar 25, 2022 at 8:27 AM Hao Li  wrote:
>
>> Got it. Forgot that. Yeah, it’s still open and ppl can still vote. Thanks
>> for reminding!
>>
>> Hao Li
>>
>> > On Mar 25, 2022, at 8:22 AM, Guozhang Wang  wrote:
>> >
>> > Hello Hao,
>> >
>> > According to bylaws the voting has to last for at least 72 business
>> hours.
>> > So let's wait a bit longer to see if there are different opinions before
>> > calling it close.
>> >
>> >> On Thu, Mar 24, 2022 at 4:20 PM Hao Li 
>> wrote:
>> >>
>> >> The vote happened in the discussion thread since I started the vote
>> there
>> >> by mistake. But it passed there. To avoid having everyone vote again. I
>> >> copied the content from that thread here:
>> >>
>> >>  end of discussion thread vote
>> >> ==
>> >> The vote passed with 5 binding votes from John, Guozhang, Bruno,
>> Matthias
>> >> and Bill.
>> >>
>> >> Thanks all for the feedback and vote.
>> >>
>> >> Hao
>> >>
>> >>> On Thu, Mar 24, 2022 at 2:20 PM Bill Bejeck  wrote:
>> >>>
>> >>> Thanks for KIP Hao!
>> >>>
>> >>> Glad to see we settled on option 1
>> >>>
>> >>> +1(binding)
>> >>>
>> >>> On Thu, Mar 24, 2022 at 5:13 PM Matthias J. Sax 
>> >> wrote:
>> >>>
>>  +1 (binding)
>> 
>> 
>>  On 3/24/22 1:52 PM, Hao Li wrote:
>> > I hit reply on my phone in the mail app and changed the title and
>> >> text
>> > hoping it will start a new thread. Apparently it doesn't work...
>> >
>> > On Thu, Mar 24, 2022 at 12:36 PM Bruno Cadonna 
>>  wrote:
>> >
>> >> Hi Hao,
>> >>
>> >> Actually, this is the VOTE thread. Do you use GMail? Sometimes it
>> >> is a
>> >> bit weird how it shows e-mails in threads.
>> >>
>> >> Anyways, thank you for the KIP and your patience!
>> >>
>> >> +1 (binding)
>> >>
>> >> Best,
>> >> Bruno
>> >>
>> >> On 24.03.22 17:36, Hao Li wrote:
>> >>> Thanks John and Guozhang. Didn't realize I used this discussion
>> >>> thread
>> >> for
>> >>> voting. Let me start a new thread for voting. Will fix the KIP.
>> >>>
>> >>> On Thu, Mar 24, 2022 at 9:28 AM Guozhang Wang 
>> >> wrote:
>> >>>
>>  +1 (binding).
>> 
>>  Thanks Hao!
>> 
>> 
>>  Guozhang
>> 
>>  On Thu, Mar 24, 2022 at 9:20 AM John Roesler > >>>
>> >> wrote:
>> 
>> > Thanks, Hao!
>> >
>> > I'm +1 (binding)
>> >
>> > -John
>> >>
>> >>  start of discussion thread vote
>> >> ==
>> >>
>> >>
>> >>> On Thu, Mar 24, 2022 at 2:12 PM John Roesler 
>> wrote:
>> >>>
>> >>> Thanks, Hao,
>> >>>
>> >>> I’m +1 (binding)
>> >>>
>> >>> -John
>> >>>
>> >>> On Thu, Mar 24, 2022, at 11:38, Hao Li wrote:
>>  Hi all,
>> 
>>  I'd like to start a vote on Kafka Streams KIP-825:
>> 
>> >>>
>> >>
>> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%253A%2Bintroduce%2Ba%2Bnew%2BAPI%2Bto%2Bcontrol%2Bwhen%2Baggregated%2Bresults%2Bare%2Bproduced=gmail-imap=164882655100=AOvVaw0lio1vSNKnSnpYMVKn0eHe
>> 
>>  Thanks,
>>  Hao
>> >>>
>> >>
>> >>
>> >> --
>> >> Thanks,
>> >> Hao
>> >>
>> >
>> >
>> > --
>> > -- Guozhang
>>
>
>
> -- 
> Thanks,
> Hao


Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #853

2022-04-11 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #852

2022-04-11 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-821: Connect Transforms support for nested structures

2022-04-11 Thread Chris Egerton
Hi Jorge,

Wow, those examples are great! A few more remarks, but I think we're
getting close:

1. The examples differ across SMTs with the name of the newly-introduced
style property; some of them use "field.style", and some use
"fields.style". I think for consistency's sake we should stick with just
"field.style"; otherwise it could be painful for users to have to remember
which to use.

2. Some of the examples are off:
- TimestampConverter: the input in the second example ("when field names
include dots") doesn't contain a field with a dotted name
- ValueToKey: the config in the third example ("when field names include
dots") should probably use "parent..child.k2" as the
"transforms.smt1.fields" property

3. RE changes to InsertField:
- The InsertField SMT should also come with the new "field.style" property
in order to preserve backwards compatibility, right? I don't see it
included in the example configs for that one, just want to make sure
- I don't know of any cases where we use snake_case for property names in
Kafka; we should probably use "on.missing.parent" and "on.existing.field"
as the new property names for InsertField.
- Why is the "on_existing_field" (or "on.existing.field") property only
applied when the field style is nested? Couldn't it be useful for
non-nested fields as well?

Cheers,

Chris

On Sat, Apr 9, 2022 at 12:40 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:

> Again, great feedback Chris. Much appreciated.
> Added my comments below:
>
> On Tue, 5 Apr 2022 at 20:22, Chris Egerton 
> wrote:
>
> > Hi Jorge,
> >
> > Looking good! I have a few comments left but all but one or two are
> minor.
> >
> > 1. The motivation section states "This KIP is aimed to include support
> for
> > nested structures on the existing SMTs... and to include the abstractions
> > to reuse this in future SMTs". A good implementation of this KIP will
> > definitely isolate reusable logic into a separate abstraction that can be
> > easily pulled in to the SMTs we want to add nested field support to, but
> > unless we plan on making this kind of abstraction publicly available as
> > some kind of utility method or class that external SMT developers can
> > leverage, we can probably leave this part out as it's more of an
> > implementation detail.
> >
>
> Make sense, will leave this out of the KIP.
>
>
> >
> > 2. The Cast example is a little misleading, isn't it? It demonstrates the
> > escape syntax for fields with dot literals in their names, but it doesn't
> > demonstrate a way to actually use the Cast (or any other) SMT to access a
> > nested field in a record, which is the whole point of the KIP. I like the
> > example of escape syntax but we should probably also add one for nested
> > field access.
> >
>
> Agree. I have added examples to each SMT to be more clear about how it
> affects each function
> .
>
>
> >
> > 3. With the InsertField SMT, I'm wondering what the specific behavior
> will
> > be when some or all of the "middle layer" nested fields are missing. For
> > example, if I have a record with a value of { "k1": "v1 } and I apply
> > InsertField with topic.field = n1.n2.n3.topic, what will happen? Will the
> > updated value become { "k1": "v1", "n1": { "n2": { "n3": "topic" }}},
> will
> > an exception be thrown, or something else? This seems analogous to the
> > command line mkdir command, which (at least on some operating systems)
> > fails by default if you try to create a new nested directory where
> anything
> > but the last element in the path doesn't exist, but can be invoked with a
> > flag that instructs it to go ahead and create all levels of nested
> > directory instead. I'm leaning on the side of "just create everything"
> but
> > would be interested in your thoughts, and either way, we should probably
> > make sure the intended behavior is well-defined before voting.
> >
>
> This is an interesting case, thanks for catching this!
> The default behavior I see is to create parents if they are missing and
> overwrite fields
> if they already exist.
> I'm planning to include the following two flags if there is a need to
> overwrite this behavior:
> - `on_missing_parent` = (CREATE|IGNORE), default=CREATE
> - `on_existing_field` = (OVERWRITE|IGNORE), default=OVERWRITE
>
>
> >
> > 4. Similarly, what will the behavior be if any of the field elements
> > specified with InsertField already exist in the record value? Will we
> just
> > overwrite them? What's the behavior of InsertField today under similar
> > circumstances?
> >
>
> The current behavior is to overwrite the value.
>
>
> >
> > Cheers,
> >
> > Chris
> >
> > On Thu, Mar 31, 2022 at 4:15 PM Jorge Esteban Quilcate Otoya <
> > quilcate.jo...@gmail.com> wrote:
> >
> > > Thanks, Chris! Much appreciated all the feedback here.
> > >
> > > 1. You nailed it setting the design goal here: "it shouldn't be
> > impossible
> > > to use this new feature for any field name, no matter how convoluted.
> > It's
> > > fine if edge 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.2 #26

2022-04-11 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-825: introduce a new API to control when aggregated results are produced

2022-04-11 Thread Hao Li
Hi all,

I would like to introduce two metrics in this KIP as well to measure the
latency and number of records emitted for emit final. They are named:

`emit-final-latency`
`emitted-records`

I've updated the KIP with details in
https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%3A+introduce+a+new+API+to+control+when+aggregated+results+are+produced

Can you take a look and see if you have any concerns for the metrics?

Thanks,
Hao

On Fri, Mar 25, 2022 at 8:27 AM Hao Li  wrote:

> Got it. Forgot that. Yeah, it’s still open and ppl can still vote. Thanks
> for reminding!
>
> Hao Li
>
> > On Mar 25, 2022, at 8:22 AM, Guozhang Wang  wrote:
> >
> > Hello Hao,
> >
> > According to bylaws the voting has to last for at least 72 business
> hours.
> > So let's wait a bit longer to see if there are different opinions before
> > calling it close.
> >
> >> On Thu, Mar 24, 2022 at 4:20 PM Hao Li 
> wrote:
> >>
> >> The vote happened in the discussion thread since I started the vote
> there
> >> by mistake. But it passed there. To avoid having everyone vote again. I
> >> copied the content from that thread here:
> >>
> >>  end of discussion thread vote
> >> ==
> >> The vote passed with 5 binding votes from John, Guozhang, Bruno,
> Matthias
> >> and Bill.
> >>
> >> Thanks all for the feedback and vote.
> >>
> >> Hao
> >>
> >>> On Thu, Mar 24, 2022 at 2:20 PM Bill Bejeck  wrote:
> >>>
> >>> Thanks for KIP Hao!
> >>>
> >>> Glad to see we settled on option 1
> >>>
> >>> +1(binding)
> >>>
> >>> On Thu, Mar 24, 2022 at 5:13 PM Matthias J. Sax 
> >> wrote:
> >>>
>  +1 (binding)
> 
> 
>  On 3/24/22 1:52 PM, Hao Li wrote:
> > I hit reply on my phone in the mail app and changed the title and
> >> text
> > hoping it will start a new thread. Apparently it doesn't work...
> >
> > On Thu, Mar 24, 2022 at 12:36 PM Bruno Cadonna 
>  wrote:
> >
> >> Hi Hao,
> >>
> >> Actually, this is the VOTE thread. Do you use GMail? Sometimes it
> >> is a
> >> bit weird how it shows e-mails in threads.
> >>
> >> Anyways, thank you for the KIP and your patience!
> >>
> >> +1 (binding)
> >>
> >> Best,
> >> Bruno
> >>
> >> On 24.03.22 17:36, Hao Li wrote:
> >>> Thanks John and Guozhang. Didn't realize I used this discussion
> >>> thread
> >> for
> >>> voting. Let me start a new thread for voting. Will fix the KIP.
> >>>
> >>> On Thu, Mar 24, 2022 at 9:28 AM Guozhang Wang 
> >> wrote:
> >>>
>  +1 (binding).
> 
>  Thanks Hao!
> 
> 
>  Guozhang
> 
>  On Thu, Mar 24, 2022 at 9:20 AM John Roesler  >>>
> >> wrote:
> 
> > Thanks, Hao!
> >
> > I'm +1 (binding)
> >
> > -John
> >>
> >>  start of discussion thread vote
> >> ==
> >>
> >>
> >>> On Thu, Mar 24, 2022 at 2:12 PM John Roesler 
> wrote:
> >>>
> >>> Thanks, Hao,
> >>>
> >>> I’m +1 (binding)
> >>>
> >>> -John
> >>>
> >>> On Thu, Mar 24, 2022, at 11:38, Hao Li wrote:
>  Hi all,
> 
>  I'd like to start a vote on Kafka Streams KIP-825:
> 
> >>>
> >>
> https://www.google.com/url?q=https://cwiki.apache.org/confluence/display/KAFKA/KIP-825%253A%2Bintroduce%2Ba%2Bnew%2BAPI%2Bto%2Bcontrol%2Bwhen%2Baggregated%2Bresults%2Bare%2Bproduced=gmail-imap=164882655100=AOvVaw0lio1vSNKnSnpYMVKn0eHe
> 
>  Thanks,
>  Hao
> >>>
> >>
> >>
> >> --
> >> Thanks,
> >> Hao
> >>
> >
> >
> > --
> > -- Guozhang
>


-- 
Thanks,
Hao


[jira] [Created] (KAFKA-13820) Add debug-level logs to explain why a store is filtered out during interactive query

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13820:


 Summary: Add debug-level logs to explain why a store is filtered 
out during interactive query
 Key: KAFKA-13820
 URL: https://issues.apache.org/jira/browse/KAFKA-13820
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently Kafka Streams throws an InvalidStateStoreException when the desired 
store is not present on the local instance. It also throws the same exception 
with the same message when the store is present, but it not active (and stale 
queries are disabled).

This is an important distinction when debugging store unavailability, and a 
debug-level log is an un-intrusive mechanism to expose the information.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13819) Add application.server to Streams assignor logs when set

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13819:


 Summary: Add application.server to Streams assignor logs when set
 Key: KAFKA-13819
 URL: https://issues.apache.org/jira/browse/KAFKA-13819
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Currently, Streams assignment logs only include the consumer client id and the 
streams application id, but those are both randomly generated UUIDs that are 
not easy to coordinate to users' concept of the name of a host. To help bridge 
this gap, we can include the application.server (when set) in assignment logs. 
That way, users will also be able to see which host and port each member is 
associated with.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13818) Add generation to consumer assignor logs

2022-04-11 Thread John Roesler (Jira)
John Roesler created KAFKA-13818:


 Summary: Add generation to consumer assignor logs
 Key: KAFKA-13818
 URL: https://issues.apache.org/jira/browse/KAFKA-13818
 Project: Kafka
  Issue Type: Improvement
Reporter: John Roesler


Reading assignor logs is really confusing in large part because they are
spread across different layers of abstraction (the ConsumerCoordinator
and the ConsumerPartitionAssignor, which in Streams consists of several
layers of its own). Each layer in the abstraction reports useful information
that only it has access to, but because they are split over multiple lines, with
multiple members in the cluster, and (often) multiple rebalances taking place
in rapid succession, it's often hard to understand which logs are part of
which rebalance.

 

One thing we don't want to do is break encapsulation by exposing too much of 
the ConsumerCoordinator's internal state to components like the pluggable 
ConsumerPartitionAssignor.

 

We can accomplish what we want by adding the concept of a dynamic log context, 
so that the ConsumerCoordinator can add dynamic information like the generation 
id to be logged for correlation in other components without exposing any new 
information or metadata to those components themselves.

See [https://github.com/apache/kafka/pull/12020] for example.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13817) Schedule nextTimeToEmit to system time every time instead of just once

2022-04-11 Thread Hao Li (Jira)
Hao Li created KAFKA-13817:
--

 Summary: Schedule nextTimeToEmit to system time every time instead 
of just once
 Key: KAFKA-13817
 URL: https://issues.apache.org/jira/browse/KAFKA-13817
 Project: Kafka
  Issue Type: Improvement
Reporter: Hao Li


[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java#L229-L231.]

 

If this is just scheduled once, this can trigger emit every time if system time 
jumps a lot suddenly.

 

For example, 
 # nextTimeToEmit set to 1 and step is 1
 # If next system time jumps to 100, we will always emit for next 100 records



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [VOTE] KIP-813 Shared State Stores

2022-04-11 Thread Federico Valeri
Hi, thanks for the KIP.

+1

Just a minor note:

In the rejected alternatives I read "If B would like to lookup s, there are
two possible approaches to take", but then you describe only one.




On Tue, Apr 5, 2022, 6:17 PM Bill Bejeck  wrote:

> Thanks for the KIP, Daan.
>
> I've caught up on the discussion thread and I've gone over the KIP.  This
> seems like a good addition to me.
>
> +1 (binding)
>
> Thanks,
> Bill
>
> On Fri, Apr 1, 2022 at 2:13 PM Matthias J. Sax  wrote:
>
> > +1 (binding)
> >
> >
> > On 4/1/22 6:47 AM, John Roesler wrote:
> > > Thanks for the KIP, Daan!
> > >
> > > I’m +1 (binding)
> > >
> > > -John
> > >
> > > On Tue, Mar 29, 2022, at 06:01, Daan Gertis wrote:
> > >> I would like to start a vote on this one:
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-813%3A+Shareable+State+Stores
> > >>
> > >> Cheers,
> > >> D.
> >
>


[jira] [Created] (KAFKA-13816) Downgrading Connect rebalancing protocol from incremental to eager causes duplicate task instances

2022-04-11 Thread Chris Egerton (Jira)
Chris Egerton created KAFKA-13816:
-

 Summary: Downgrading Connect rebalancing protocol from incremental 
to eager causes duplicate task instances
 Key: KAFKA-13816
 URL: https://issues.apache.org/jira/browse/KAFKA-13816
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton


The rebalancing protocol for a Kafka Connect cluster can be downgraded from 
incremental to eager by adding a worker to the cluster with 
{{connect.protocol}} set to {{{}eager{}}}, or by stopping an existing worker in 
that cluster, reconfiguring it with the new protocol, and restarting it.

When the worker (re)joins the cluster, a rebalance takes place using the eager 
protocol, and duplicate task instances are created on the cluster.

This occurs because:
 * The leader does not send out an assignment that revokes all connectors and 
tasks for the cluster during that round
 * Workers do not respond to the downgrade in protocol by revoking all 
connectors and tasks that they were running before the rebalance that are not 
included in the new assignment they received during the rebalance

It's likely that this bug hasn't surfaced sooner because any subsequent 
rebalance should cause all connectors and tasks on all each in the cluster to 
be proactively revoked before the worker rejoins the group.

[KIP-415|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect#KIP415:IncrementalCooperativeRebalancinginKafkaConnect-Compatibility,Deprecation,andMigrationPlan]
 provides one way to address this:
{quote}To downgrade your cluster to use protocol version 0 from version 1 or 
higher with {{eager}} rebalancing policy what is required is to switch one of 
the workers back to {{eager}} mode. 
{panel}
{panel}
|{{connect.protocol = eager}}|

Once this worker joins, the group will downgrade to protocol version 0 and 
{{eager}} rebalancing policy, with immediately release of resources upon 
joining the group. This process will require a one-time double rebalancing, 
with the leader detecting the downgrade and first sending a downgraded 
assignment with empty assigned connectors and tasks and from then on just 
regular downgraded assignments. 
{quote}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted

2022-04-11 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13815:
--

 Summary: Avoid reinitialization for a replica that is being deleted
 Key: KAFKA-13815
 URL: https://issues.apache.org/jira/browse/KAFKA-13815
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


https://issues.apache.org/jira/browse/KAFKA-10002

identified that deletion of replicas can be slow when a StopReplica request is 
being
processed, and has implemented a change to improve the efficiency.
We found that the efficiency can be further improved by avoiding the 
reinitialization of the
leader epoch cache and partition metadata for a replica that needs to be 
deleted.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)