Re: [DISCUSS] KIP-1068: KIP-1068: New JMX Metrics for AsyncKafkaConsumer

2024-07-24 Thread Bruno Cadonna

Hi Mickael,

1. I agree that the title is misleading. It should be something like
"New metrics for the AsyncKafkaConsumer". Maybe it should even be 
"Metrics for the new consumer".


3. I am not sure I understand this comment. Exposed metrics are public 
as far as I understand. So adding new metrics requires a KIP. We had 
KIPs in the past that only introduced and/or removed new metrics. For 
example, 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1001%3A+Add+CurrentControllerId+Metric
I do not think there are any plans to make the AsyncKafkaConsumer class 
public. It is the implementation of the KafkaConsumer when the CONSUMER 
protocol is used. But I might be wrong. At least, I am against making it 
public.


Best,
Bruno

On 7/24/24 12:05 PM, Mickael Maison wrote:

Hi,

1. The title is a bit misleading. It's proposing to add new metrics,
JMX is just one of the mechanisms to export them.
2. +1 to not register the new metrics when using the classic consumer,
instead of setting them to 0. Similarly I assume existing metrics that
don't apply to the new consumers are not registered?
3. At the moment this KIP is not changing any public APIs. What's the
plan to make AsyncKafkaConsumer public?

Thanks,
Mickael



On Tue, Jul 23, 2024 at 6:03 PM Bruno Cadonna  wrote:


Hi Brenden,

BC1. In his first e-mail Andrew wrote "I would expect that the metrics
do not exist at all". I agree with him. I think it would be better to
not add those metrics at all if the CLASSIC protocol is used rather than
the metrics exist and are all constant 0. This should be possible by not
adding the metrics to the metrics registry if the CONSUMER protocol is
not used.

BC2. Is there a specific reason you do not propose
background-event-queue-time-max and background-event-queue-time-avg? If
folk think those are not useful we do not need to add them. However, if
those are not useful, why is background-event-queue-size useful. I was
just wondering about the asymmetry between background-event-queue and
application-event-queue.

Best,
Bruno



On 7/19/24 9:14 PM, Brenden Deluna wrote:

Hi Apoorv,
Thank you for your comments, I will address each.

AM1. I can see the usefulness in also having an
'application-event-queue-age-max' to get an idea of outliers and how they
may be affecting the average metric. I will add that.

AM2. I agree with you there, I think 'time' is a better descriptor here
than 'age'. I will update those metric names as well.

AM3. Similar to above comments, I will change the name of that metric to be
more consistent. And I think a max metric would also be useful here, adding
that.

AM4. Yes, good catch there. Will update that as well.

Thank you,
Brenden

On Fri, Jul 19, 2024 at 8:14 AM Apoorv Mittal 
wrote:


Hi Brendan,
Thanks for the KIP. The metrics are always helpful.

AM1: Is `application-event-queue-age-avg` enough or do we require `
application-event-queue-age-max` as well to differentiate with outliers?

AM2: The kafka producer defines metric `record-queue-time-avg` which
captures the time spent in the buffer. Do you think we should have a
similar name for `application-event-queue-age-avg` i.e. change to `
application-event-queue-time-avg`? Moreover other than similar naming,
`time` anyways seems more suitable than `age`, though minor. The `time`
usage is also aligned with the description of this metric.

AM3: Metric `application-event-processing-time` says "the average time,
that the consumer network.". Shall we have the `-avg` suffix in the
metric as we have defined for other metrics? Also do we require the max
metric as well for the same?

AM4: Is the telemetry name for `unsent-requests-queue-size` intended
as `org.apache.kafka.consumer.unsent.requests.size`,
or it should be corrected to `
org.apache.kafka.consumer.unsent.requests.queue.size`?

AM2:
Regards,
Apoorv Mittal
+44 7721681581


On Mon, Jul 15, 2024 at 2:45 PM Andrew Schofield <
andrew_schofi...@live.com>
wrote:


Hi Brenden,
Thanks for the updates.

AS4. I see that you’ve added `.ms` to a bunch of the metrics reflecting

the

fact that they’re measured in milliseconds. However, I observe that most
metrics
in Kafka that are measured in milliseconds, with some exceptions in Kafka
Connect
and MirrorMaker do not follow this convention. I would tend to err on the
side of
consistency with the existing metrics and not use `.ms`. However, that’s
just my
opinion, so I’d be interested to know what other reviewers of the KIP
think.

Thanks,
Andrew


On 12 Jul 2024, at 20:11, Brenden Deluna 

wrote:


Hey Lianet,

Thank you for your suggestions and feedback!


LM1. This has now been addressed.


LM2. I think that would be a valuable addition to the current set of
metrics, I will get that added.


LM3. Again great idea, that would certainly be helpful. Will add that

as

well.


Let me know if you have any more suggestions!


Thanks,

Brenden

On Fri, Jul 12, 2024 at 2:11 PM Brenden Deluna 

wrote:



Re: [DISCUSS] KIP-1068: KIP-1068: New JMX Metrics for AsyncKafkaConsumer

2024-07-23 Thread Bruno Cadonna

Hi Brenden,

BC1. In his first e-mail Andrew wrote "I would expect that the metrics 
do not exist at all". I agree with him. I think it would be better to 
not add those metrics at all if the CLASSIC protocol is used rather than 
the metrics exist and are all constant 0. This should be possible by not 
adding the metrics to the metrics registry if the CONSUMER protocol is 
not used.


BC2. Is there a specific reason you do not propose 
background-event-queue-time-max and background-event-queue-time-avg? If 
folk think those are not useful we do not need to add them. However, if 
those are not useful, why is background-event-queue-size useful. I was 
just wondering about the asymmetry between background-event-queue and 
application-event-queue.


Best,
Bruno



On 7/19/24 9:14 PM, Brenden Deluna wrote:

Hi Apoorv,
Thank you for your comments, I will address each.

AM1. I can see the usefulness in also having an
'application-event-queue-age-max' to get an idea of outliers and how they
may be affecting the average metric. I will add that.

AM2. I agree with you there, I think 'time' is a better descriptor here
than 'age'. I will update those metric names as well.

AM3. Similar to above comments, I will change the name of that metric to be
more consistent. And I think a max metric would also be useful here, adding
that.

AM4. Yes, good catch there. Will update that as well.

Thank you,
Brenden

On Fri, Jul 19, 2024 at 8:14 AM Apoorv Mittal 
wrote:


Hi Brendan,
Thanks for the KIP. The metrics are always helpful.

AM1: Is `application-event-queue-age-avg` enough or do we require `
application-event-queue-age-max` as well to differentiate with outliers?

AM2: The kafka producer defines metric `record-queue-time-avg` which
captures the time spent in the buffer. Do you think we should have a
similar name for `application-event-queue-age-avg` i.e. change to `
application-event-queue-time-avg`? Moreover other than similar naming,
`time` anyways seems more suitable than `age`, though minor. The `time`
usage is also aligned with the description of this metric.

AM3: Metric `application-event-processing-time` says "the average time,
that the consumer network.". Shall we have the `-avg` suffix in the
metric as we have defined for other metrics? Also do we require the max
metric as well for the same?

AM4: Is the telemetry name for `unsent-requests-queue-size` intended
as `org.apache.kafka.consumer.unsent.requests.size`,
or it should be corrected to `
org.apache.kafka.consumer.unsent.requests.queue.size`?

AM2:
Regards,
Apoorv Mittal
+44 7721681581


On Mon, Jul 15, 2024 at 2:45 PM Andrew Schofield <
andrew_schofi...@live.com>
wrote:


Hi Brenden,
Thanks for the updates.

AS4. I see that you’ve added `.ms` to a bunch of the metrics reflecting

the

fact that they’re measured in milliseconds. However, I observe that most
metrics
in Kafka that are measured in milliseconds, with some exceptions in Kafka
Connect
and MirrorMaker do not follow this convention. I would tend to err on the
side of
consistency with the existing metrics and not use `.ms`. However, that’s
just my
opinion, so I’d be interested to know what other reviewers of the KIP
think.

Thanks,
Andrew


On 12 Jul 2024, at 20:11, Brenden Deluna 

wrote:


Hey Lianet,

Thank you for your suggestions and feedback!


LM1. This has now been addressed.


LM2. I think that would be a valuable addition to the current set of
metrics, I will get that added.


LM3. Again great idea, that would certainly be helpful. Will add that

as

well.


Let me know if you have any more suggestions!


Thanks,

Brenden

On Fri, Jul 12, 2024 at 2:11 PM Brenden Deluna 

wrote:



Hi Lucas,

Thank you for the feedback! I have addressed your comments:


LB1. Good catch there, I will update the names as needed.


LB2. Good catch again! I will update the name to be more consistent.


LB3. Thank you for pointing this out, I realized that all metric

values

will actually be set to 0. I will specifiy this and explain why they

will

be 0.


Nit: This metric is referring to the queue of unsent requests in the
NetworkClientDelegate. For the metric descriptions I am trying to not
include too much of the implementation details, hence the reason that
description is quite short. I cannot think of other ways to describe

the

metric without going deeper into the implementation, but please do let

me

know if you have any ideas.


Thank you,

Brenden

On Fri, Jul 12, 2024 at 1:27 PM Lianet M.  wrote:


Hey Brenden, thanks for the KIP! Great to get more visibility into

the

new

consumer.

LM1. +1 on Lucas's suggestion for including the unit in the name,

seems

clearer and consistent (I do see several time metrics including ms)

LM2. What about a new metric for application-event-queue-time-ms. It

would

be a complement to the application-event-queue-size you're proposing,

and

it will tell us how long the events sit in the queue, waiting to be
processed (from the time the API call adds the 

Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol

2024-07-23 Thread Bruno Cadonna
ission to create

those topics

with specific ACLs, would CREATE on the cluster resource still be

required?


AS6: StreamsGroupInitialize can also fail with

TOPIC_AUTHORIZATION_FAILED

and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.

AS7: A tiny nit. You've used TopologyID (capitals) in

StreamsGroupHeartbeatRequest

and a few others, but in all other cases the fields which are ids are

spelled Id.

I suggest TopologyId.

Also, "interal" is probably meant to be "interval”.

AS8: For consumer groups, the `group.consumer.assignors`

configuration is a

list of class names. The assignors do have names too, but the

configuration which

enables them is in terms of class names. I wonder whether the broker’s
group.streams.assignor could actually be `group.streams.assignors`

and specified

as a list of the class names of the supplied assignors. I know you're

not supporting

other assignors yet, but when you do, I expect you would prefer to

have used class

names from the start.

The use of assignor names in the other places looks good to me.

AS9: I'd find it really helpful to have a bit of a description about

the purpose and

lifecycle of the 9 record types you've introduced on the

__consumer_offsets topic.

I did a cursory review but without really understanding what's

written when,

I can't do a thorough job of reviewing.

AS10: In the definitions of the record keys, such as
StreamsGroupCurrentMemberAssignmentKey, the versions of the fields

must

match the versions of the types.

Thanks,
Andrew


On 12 Jul 2024, at 09:04, Lucas Brutschy 

wrote:


Hi all,

I would like to start a discussion thread on KIP-1071: Streams
Rebalance Protocol. With this KIP, we aim to bring the principles

laid

down by KIP-848 to Kafka Streams, to make rebalances more reliable

and

scalable, and make Kafka Streams overall easier to deploy and

operate.

The KIP proposed moving the assignment logic to the broker, and
introducing a dedicated group type and dedicated RPCs for Kafka
Streams.

The KIP is here:


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol


This is joint work with Bruno Cadonna.

Please take a look and let us know what you think.

Best,
Lucas










[jira] [Created] (KAFKA-17109) Reduce log message load for failed locking

2024-07-10 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-17109:
-

 Summary: Reduce log message load for failed locking
 Key: KAFKA-17109
 URL: https://issues.apache.org/jira/browse/KAFKA-17109
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 3.8.0
Reporter: Bruno Cadonna


The following exception with stack traces is logged many times when state 
updater is enabled:

{code}
01:08:03 INFO  [KAFKA] TaskManager - stream-thread [acme-StreamThread-4] 
Encountered lock exception. Reattempting locking the state in the next 
iteration.
org.apache.kafka.streams.errors.LockException: stream-thread 
[acme-StreamThread-4] standby-task [1_15] Failed to lock the state directory 
for task 1_15
at 
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:96)
at 
org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:114)
at 
org.apache.kafka.streams.processor.internals.TaskManager.addTaskToStateUpdater(TaskManager.java:1008)
 
at 
org.apache.kafka.streams.processor.internals.TaskManager.addTasksToStateUpdater(TaskManager.java:995)
 
at 
org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:911)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1188)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:996)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:711)
 
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 {code}

The exception is expected since it happens because a lock on the task state 
directory is not yet been freed by a different stream thread on the same Kafka 
Streams client after an assignment. But with the state updater acquiring the 
lock is attempted in each poll iteration which is every 100 ms by default.

One option to reduce the log messages is to reduce the rate at which a lock is 
attempted to be acquired. The other is to reduce the logging.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.8.0 release

2024-07-08 Thread Bruno Cadonna

Hi Josep,

we found a bug in Streams. The bug is in the state updater that we 
enabled by default for 3.8.


I have already created a fix for it: 
https://github.com/apache/kafka/pull/16545


As you can see the fix is small and limited to a location in the code, 
so I do not think that it is risky.


The bug is not strictly a regression since the state updater was 
disabled in previous releases. However, for this release we soaked 
mainly with state updater enabled. So we need to soak Streams for a bit 
(couple of days? one week?) with state updater disabled to be confident 
that disabling does not introduce a real regression since the two code 
paths are interleaved.


Reverting the PRs of the state updater is not an option since they are 
many and they were merged over quite some time which makes reverts at 
least very risky and work intensive. Reverting would delay the release 
for sure.


In addition to the fix, I also opened a PR to disable the state updater: 
https://github.com/apache/kafka/pull/16542


I have already started soak testing for both PRs.

We did not find this bug before, because it manifests in a specific 
situation that occurs rather seldomly even in our soak where we inject 
various errors and infrastructure events.


The state updater is a long awaited improvement that besides other 
improvements solves a long-standing timeout issue with exactly-once 
processing: https://issues.apache.org/jira/browse/KAFKA-13295


Let me know, whether you prefer to disable the state updater or whether 
I should merge the fix into 3.8.


Best,
Bruno

On 7/5/24 12:01 PM, Josep Prat wrote:

Hi all,
Unfortunately, after 4 runs of the systems tests, we still can't have a
combined run with no errors. I created the JIRAs linked below to track
these.
I would think these are blockers for the release, but I'd be extremely
happy to be corrected!

KRaft Upgrade Failures: https://issues.apache.org/jira/browse/KAFKA-17083
Network Degrade Failures: https://issues.apache.org/jira/browse/KAFKA-17084
Streams Cooperative Rebalance Upgrade Failures
https://issues.apache.org/jira/browse/KAFKA-17085.
These system tests above fail consistently on CI and on my machine. If
anyone has the means to run system tests and can make these pass, please
let me know.

These add up to the existing
https://issues.apache.org/jira/browse/KAFKA-16138 (discovered during 3.7)
for the Quota test failures that can pass locally.

The status of the test runs as well as the logs of the runs can be found
here:
https://docs.google.com/document/d/1wbcyzO6GM2SYQaqTMITBTBjHgZgM7mmiAt7TUfh1xt8/edit

Best,

On Thu, Jul 4, 2024 at 3:27 PM Josep Prat  wrote:


Thanks Luke!

--
Josep Prat
Open Source Engineering Director, Aiven
josep.p...@aiven.io   |   +491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Thu, Jul 4, 2024, 14:04 Luke Chen  wrote:


Hi Josep,

I had run tests for tests/kafkatest/tests/client/quota_test.py based on
3.8
branch, and they all passed.

*19:54:24*
*19:54:24*
  SESSION REPORT (ALL TESTS)*19:54:24*  ducktape version:
0.11.4*19:54:24*  session_id:   2024-07-04--001*19:54:24*  run
time: 12 minutes 39.940 seconds*19:54:24*  tests run:
9*19:54:24*  passed:   9*19:54:24*  flaky:
0*19:54:24*  failed:   0*19:54:24*  ignored:
0*19:54:24*
*19:54:24*
  test_id:
kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.consumer_num=2*19:54:24*
  status: PASS*19:54:24*  run time:   3 minutes 51.280
seconds*19:54:24*

*19:54:24*
  test_id:
kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=.user.client-id.override_quota=True*19:54:24*
  status: PASS*19:54:24*  run time:   4 minutes 21.082
seconds*19:54:24*

*19:54:24*
  test_id:
kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=.user.client-id.override_quota=False*19:54:24*
  status: PASS*19:54:24*  run time:   5 minutes 14.854
seconds*19:54:24*

*19:54:24*
  test_id:
kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_broker_throttling_behavior=True*19:54:24*
  status: PASS*19:54:24*  run time:   3 minutes 0.505
seconds*19:54:24*

*19:54:24*
  test_id:
kafkatest.tests.client.quota_test.QuotaTest.test_quota.quota_type=client-id.old_client_throttling_behavior=True*19:54:24*
  status: PASS*19:54:24*  run time:   3 minutes 19.629
seconds*19:54:24*


Re: [VOTE] KIP-1056 - Deprecate `default.` prefix for exception handler in StreamsConfig

2024-07-02 Thread Bruno Cadonna

Thanks for the KIP!

+1 (binding) under the conditions Sophie pointed out.

Best,
Bruno

On 7/2/24 1:05 AM, Sophie Blee-Goldman wrote:

After reaching an agreement on the discussion thread, I'm giving this a +1
(binding) under the condition that we ship this change in 4.0 so as to
minimize the impact on users.

Thanks for the KIP!

-Sophie

On Thu, Jun 27, 2024 at 2:43 PM Sophie Blee-Goldman 
wrote:


I'll just note that I will personally be abstaining from this vote, but
won't vote -1 and just want to defer to the rest of the community on this.
I've stated my concerns in the discussion thread and will leave it at that
-- if we hear from users who actively support this change and want it to
happen then I'll even cast a +1 binding vote to help it get passed.

Best,
Sophie

On Thu, Jun 27, 2024 at 2:04 PM Muralidhar Basani
 wrote:


Bumping this thread.

Thanks,
Murali

On Sat, Jun 22, 2024 at 4:13 AM Matthias J. Sax  wrote:


+1 (binding) from my side.


I understand the concerns raised, but would personally move forward with
this KIP as-is. If we cannot get three votes, it would get naturally
discarded.


-Matthias

On 6/19/24 11:33 AM, Muralidhar Basani wrote:

Hi all,

I would like to call a vote on KIP-1056 - Deprecate `default.`

exception

handler configs in StreamsConfig.

KIP -




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig


Discussion thread -
https://lists.apache.org/thread/khmznzjd5cd7wbdyt2bk776nx19gycxc

Thanks,
Murali











Re: [DISCUSS] KIP-1056: Remove `default.` prefix for exception handler StreamsConfig

2024-07-02 Thread Bruno Cadonna

Hi Sophie,

I totally agree on evaluating the impact case by case!

Nobody brushed any concerns under any rug. It was merely a concern about 
a valid concern. I am sorry if it came across as brushing.


Great that we found a viable solution!

Best,
Bruno



On 7/2/24 1:01 AM, Sophie Blee-Goldman wrote:

Thanks Bruno -- I think shipping this with 4.0 when most users will expect
to
need code changes anyway is a good compromise. I'm happy to change my
position and cast a +1 vote on this. I suppose we will just need to wait a
few
weeks to actually merge these changes, given the interim short-cycle 3.9
release we're doing before 4.0?

However, I do want to address this:

what I do not like about the

concern is that it blocks us from ever changing low priority stuff in
the code base. Which IMO is bad from a maintainability perspective.



First off, I wholeheartedly agree that we should not block changes just
because
they are low priority. I hope that goes without saying :)

But my position was not "we should never do anything that disrupts users
unless
it's high priority" -- all I'm saying is that "we should evaluate each
proposal and
weigh the potential benefits against the potential cost and/or risks", and
take
things on a case by case basis, which I hope we can all agree on.

In sum: we should be careful not to put undue burden on our users just to
make
ourselves happy. It's up to us to have that discussion each time and I
don't think
it's fair to brush the concerns about a specific case under the rug in the
name of not
blocking low-priority changes in the future.

Anyways, I'll follow up on the vote thread to give this a +1. Whoever
reviews this,
please take care to time the merging of any PRs around (ie after) the 3.9
branch cut

Thanks all,
Sophie

On Mon, Jul 1, 2024 at 2:28 AM Bruno Cadonna  wrote:


Hi Sophie,

Thank you for your analysis!

I understand your valid concerns. However, what I do not like about the
concern is that it blocks us from ever changing low priority stuff in
the code base. Which IMO is bad from a maintainability perspective.

I agree that the change makes primarily us happy in short-term, but more
precise naming makes our users happy as well in the long run, expecially
the ones that newly join our otter-club.

I also think that people that set their builds to fail for deprecated
parts, expect to change code when they use a new version of Kafka
Streams. The users that do not let their builds fail for deprecations
have to change their code anyways when they switch to a new major release.

I empathize with the effort to minimize the work for our users, so my
proposal would be to ship this KIP with a release where we have other
wide radius deprecations and changes that our users need to do. For
example, we could ship the deprecated property names with 4.0. I guess
users need to change a couple of things in their code when they switch
to 4.0, so those deprecations will carry (almost) no weight.

What do you think?

Best,
Bruno


On 6/15/24 8:58 AM, Sophie Blee-Goldman wrote:

I think we should pause for a minute and have an honest conversation

about

whether the benefits of making this change outweigh the negatives. Here's
my quick roundup of the positives and negatives, feel free to add to this
list if there's else you think should be considered but then let's

evaluate

things.

Pros:
-general agreement that the "default" prefix is inaccurate
-one/two line change to update this code

Cons:
- A very large number of people set these configs and would all need to
update their code (either when we eventually remove the old deprecated
configs or right away to fix the deprecation warning. So while this is

not

a very "deep" disruption in that it's a small change, it's likely to be

an

extremely "wide" disruption in that many people are impacted
- This change is objectively rather trivial, and people might be annoyed
and/or confused as to why they need to make code changes for something so
small
- There seems to be no harm whatsoever done by the current state of

things,

as I have never once heard anyone complain or misinterpret the meaning of
these configs due to the "default" prefix. So is this solving a problem
that doesn't exist and just being pedantic in a way that is going to

affect

many users?

I won't vote -1 on this and am happy to let it go through if others are
strongly in favor. Just wanted to share my two cents and see if this
resonates with anyone else. Because to me, it kind of feels like we're
doing this to make ourselves happy, not our users. And I don't think we
should be flippant about the user experience like this

On Fri, Jun 14, 2024 at 3:39 PM Matthias J. Sax 

wrote:



*`DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC` is private and does
notneed to be covered in the KIP technically -- it's ok to just leave

it

inof course.*

Yea, technically it has no implications, but as we are anyways touchin

[jira] [Resolved] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-07-01 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-13295.
---
Fix Version/s: 3.8.0
   Resolution: Fixed

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>    Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.8.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1056: Remove `default.` prefix for exception handler StreamsConfig

2024-07-01 Thread Bruno Cadonna

Hi Sophie,

Thank you for your analysis!

I understand your valid concerns. However, what I do not like about the 
concern is that it blocks us from ever changing low priority stuff in 
the code base. Which IMO is bad from a maintainability perspective.


I agree that the change makes primarily us happy in short-term, but more 
precise naming makes our users happy as well in the long run, expecially 
the ones that newly join our otter-club.


I also think that people that set their builds to fail for deprecated 
parts, expect to change code when they use a new version of Kafka 
Streams. The users that do not let their builds fail for deprecations 
have to change their code anyways when they switch to a new major release.


I empathize with the effort to minimize the work for our users, so my 
proposal would be to ship this KIP with a release where we have other 
wide radius deprecations and changes that our users need to do. For 
example, we could ship the deprecated property names with 4.0. I guess 
users need to change a couple of things in their code when they switch 
to 4.0, so those deprecations will carry (almost) no weight.


What do you think?

Best,
Bruno


On 6/15/24 8:58 AM, Sophie Blee-Goldman wrote:

I think we should pause for a minute and have an honest conversation about
whether the benefits of making this change outweigh the negatives. Here's
my quick roundup of the positives and negatives, feel free to add to this
list if there's else you think should be considered but then let's evaluate
things.

Pros:
-general agreement that the "default" prefix is inaccurate
-one/two line change to update this code

Cons:
- A very large number of people set these configs and would all need to
update their code (either when we eventually remove the old deprecated
configs or right away to fix the deprecation warning. So while this is not
a very "deep" disruption in that it's a small change, it's likely to be an
extremely "wide" disruption in that many people are impacted
- This change is objectively rather trivial, and people might be annoyed
and/or confused as to why they need to make code changes for something so
small
- There seems to be no harm whatsoever done by the current state of things,
as I have never once heard anyone complain or misinterpret the meaning of
these configs due to the "default" prefix. So is this solving a problem
that doesn't exist and just being pedantic in a way that is going to affect
many users?

I won't vote -1 on this and am happy to let it go through if others are
strongly in favor. Just wanted to share my two cents and see if this
resonates with anyone else. Because to me, it kind of feels like we're
doing this to make ourselves happy, not our users. And I don't think we
should be flippant about the user experience like this

On Fri, Jun 14, 2024 at 3:39 PM Matthias J. Sax  wrote:


*`DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC` is private and does
notneed to be covered in the KIP technically -- it's ok to just leave it
inof course.*

Yea, technically it has no implications, but as we are anyways touching
code around this, could it be better to rename ?


Yes, we should still rename it. I was just pointing out, that the KIP
does technically not need to mention this as all, as it's an internal
change.


I think you can start a VOTE for this KIP, too.



-Matthias


On 6/14/24 1:35 PM, Muralidhar Basani wrote:

Thanks Matthias and Nick.

Regarding the prefix "uncaught." is fine with me too, but any other
opinions on this one ?

*101 follow-up:*

*Given that `DESERIALIZATION_EXCEPTION_HAN**DLER_CLASS_DOC` is public we*
*cannot just rename it, but must follow the same deprecation pattern as*
*for the config name itself. (It seems it was added as public by

mistake,*

*and the new variable for the _DOC string should be private...)*

Agree, as DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC is public, we shall
deprecate it. Updated kip.



*`DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC` is private and does
notneed to be covered in the KIP technically -- it's ok to just leave it
inof course.*

Yea, technically it has no implications, but as we are anyways touching
code around this, could it be better to rename ?







*102 follow-up:I believe for other config, we actually use the new config
if it's set,and only fall back to the old config if the new one is not

set.

If bothare set, the new one wins and we just log a WARN that the old one
isignored in favor of the new one. -- It might be best to stick
theestablished pattern?*

Agree, we should follow the same pattern. Updated kip.

Thanks,
Murali

On Fri, Jun 14, 2024 at 3:12 AM Matthias J. Sax 

wrote:



Using `uncaught.` prefix would be fine with me. (Even if I find it a
little be redundant personally?)



101 follow-up:

Given that `DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC` is public we
cannot just rename it, but must follow the same deprecation pattern as
for the config name itself. (It seems it was added as public 

Re: [VOTE] KIP-1049: Add config log.summary.interval.ms to Kafka Streams

2024-06-18 Thread Bruno Cadonna

Hi,

+1 (binding)

Since the voting was open for at least 72 hours and you got 4 binding +1 
and no -1, you can close the vote and open a PR.


Best,
Bruno

On 6/17/24 7:11 AM, jiang dou wrote:

Thank you for voting:
This KIP has received 3 binding votes. Can I assume that this KIP has been
voted through and started development?

Lucas Brutschy  于2024年6月13日周四 19:03写道:


+1 (binding)

thanks for the KIP!

On Thu, Jun 13, 2024 at 2:32 AM Matthias J. Sax  wrote:


+1 (binding)

On 6/11/24 1:17 PM, Sophie Blee-Goldman wrote:

+1 (binding)

Thanks for the KIP!

On Tue, Jun 11, 2024 at 5:37 AM jiang dou 

wrote:



HI
I would like to start a vote for KIP-1049: Add config
log.summary.interval.ms to Kafka Streams

KIP:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-1049%3A+Add+config+log.summary.interval.ms+to+Kafka+Streams

Discussion thread:
https://lists.apache.org/thread/rjqslkt46y5zlg0552rloqjfm5ddzk06

Thanks









Re: [VOTE] KIP-1035: StateStore managed changelog offsets

2024-06-13 Thread Bruno Cadonna

Thanks Nick!

Great KIP!

+1 (binding)

Best,
Bruno

On 6/13/24 2:31 AM, Matthias J. Sax wrote:

Thanks Nick.

+1 (binding)


Looking forward to get this all merged!


-Matthias

On 6/12/24 9:36 AM, Nick Telford wrote:

Hi everyone,

I'd like to call a vote on KIP-1035[1].

Regards,
Nick

1:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets



Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

2024-06-11 Thread Bruno Cadonna

Hi,

since there was not too much activity in this thread recently, I was 
wondering what the status of this discussion is.


I cannot find the examples in the KIP Sébastien mentioned in the last 
message to this thread. I can also not find the corresponding definition 
of the following method call in the KIP:


FAIL.withDeadLetterQueueRecord(record, "dlq-topic")

I have also some comments:

B1
Did you consider to prefix the dead letter queue topic names with the 
application ID to distinguish the topics between Streams apps? Or is the 
user responsible for the differentiation? If the user is responsible, we 
risk that faulty records of different Streams apps end up in the same 
dead letter queue.


B2
Is the name of the dead letter queue topic config 
DEFAULT_ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or 
ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG? In the KIP both names are used.


B3
What is exactly the trigger to send a record to the dead letter queue? 
Is setting ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG or is it adding a 
record to the return value of the exception handler?
What happens if I set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but do 
not add a record to the return value of the handler? What happens if I 
do not set ERRORS_DEADLETTERQUEUE_TOPIC_NAME_CONFIG but add a record to 
the return value of the handler?


Best,
Bruno

On 4/22/24 10:19 PM, Sebastien Viale wrote:

Hi,

Thanks for your remarks

L1. I would say "who can do the most can do the least", even though most people 
will fail and stop, we found it interesting to offer the possibility to 
fail-and-send-to-DLQ

L2: We did not consider extending the TimestampExtractor because we estimate it 
out of scope for this KIP. Perhaps it will be possible to include it in an 
ExceptionHandler later.

L3: we will include an example in the KIP, but as we mentioned earlier, the DLQ 
topic can be different in each custom Exception Handler:

When providing custom handlers, users would have the possibility to return:
  * FAIL
* CONTINUE
* FAIL.withDeadLetterQueueRecord(record, "dlq-topic")
* CONTINUE.withDeadLetterQueueRecord(record, "dlq-topic")

cheers !
Sébastien



De : Lucas Brutschy 
Envoyé : lundi 22 avril 2024 14:36
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi!

Thanks for the KIP, great stuff.

L1. I was a bit confused that the default configuration (once you set
a DLQ topic) is going to be fail-and-send-to-DLQ, if I understood
correctly. Is this something that will be a common use-case, and is it
a configuration that we want to encourage? It expected that you either
want to fail or skip-and-send-to-DLQ.

L2. Have you considered extending the `TimestampExtractor` interface
so that it can also produce to DLQ? AFAIK it's not covered by any of
the existing exception handlers, but it can cause similar failures
(potentially custom logic, depends on validity input record). There
could also be a default implementation as a subclass of
`ExtractRecordMetadataTimestamp`.

L3. It would be nice to include an example of how to produce to
multiple topics in the KIP, as I can imagine that this will be a
common use-case. I wasn't sure how much code would be involved to make
it work. If a lot of code is required, we may want to consider
exposing some utils that make it easier.

Cheers,
Lucas

This email was screened for spam and malicious content but exercise caution 
anyway.



On Sun, Apr 21, 2024 at 7:58 PM Damien Gasparina  wrote:


Hi everyone,

Following all the discussion on this KIP and KIP-1033, we introduced a
new container class containing only processing context metadata:
ProcessingMetadata. This new container class is actually part of
KIP-1033, thus, I added a hard dependency for this KIP on KIP-1033, I
think it's the wisest implementation wise.

I also clarified the interface of the enums:
withDeadLetterQueueRecords(Iterable> deadLetterQueueRecords) . Very likely most users would just
send one DLQ record, but there might be specific use-cases and what
can do more can do less, so I added an Iterable.

I took some time to think about the impact of storing the
ProcessingMetadata on the ProductionExceptionHandler. I think storing
the topic/offset/partition should be fine, but I am concerned about
storing the rawSourceKey/Value. I think it could impact some specific
use-cases, for example, a high-throughput Kafka Streams application
"counting" messages could have huge source input messages, and very
small sink messages, here, I assume storing the rawSourceKey/Value
could significantly require more memory than the actual Kafka Producer
buffer.

I think the safest approach is actually to only store the fixed-size
metadata for the ProductionExceptionHandler.handle:
topic/partition/offset/processorNodeId/taskId, it might be confusing

[jira] [Reopened] (KAFKA-14567) Kafka Streams crashes after ProducerFencedException

2024-06-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-14567:
---

> Kafka Streams crashes after ProducerFencedException
> ---
>
> Key: KAFKA-14567
> URL: https://issues.apache.org/jira/browse/KAFKA-14567
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Matthias J. Sax
>Priority: Blocker
>  Labels: eos
> Fix For: 3.8.0
>
>
> Running a Kafka Streams application with EOS-v2.
> We first see a `ProducerFencedException`. After the fencing, the fenced 
> thread crashed resulting in a non-recoverable error:
> {quote}[2022-12-22 13:49:13,423] ERROR [i-0c291188ec2ae17a0-StreamThread-3] 
> stream-thread [i-0c291188ec2ae17a0-StreamThread-3] Failed to process stream 
> task 1_2 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskExecutor)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_2, processor=KSTREAM-SOURCE-05, 
> topic=node-name-repartition, partition=2, offset=539776276, 
> stacktrace=java.lang.IllegalStateException: TransactionalId 
> stream-soak-test-72b6e57c-c2f5-489d-ab9f-fdbb215d2c86-3: Invalid transition 
> attempted from state FATAL_ERROR to state ABORTABLE_ERROR
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:974)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:394)
> at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:620)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1079)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:959)
> at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:257)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:207)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
> at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:88)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
> at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
> at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:791)
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:722)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:95)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:76)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1645)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:607)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:569)
> at 
> org.apache.kafka.streams.processor.in

[jira] [Created] (KAFKA-16903) Task should consider producer error previously occurred for different task

2024-06-06 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16903:
-

 Summary: Task should consider producer error previously occurred 
for different task
 Key: KAFKA-16903
 URL: https://issues.apache.org/jira/browse/KAFKA-16903
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.7.0
Reporter: Bruno Cadonna
Assignee: Bruno Cadonna


A task does not consider a producer error that occurred for a different task.

The following log messages show the issue.

Task {{0_2}} of a Streams app (EOSv2 enabled) crashes while sending records 
with an {{InvalidTxnStateException}}:

{code:java}
[2024-05-30 10:20:35,881] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_2] Error encountered 
sending record to topic stream-soak-test-node-name-repartition for task 0_2 due 
to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent. 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.

[2024-05-30 10:20:35,886] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] 
stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Failed to process stream 
task 0_2 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskExecutor)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending 
record to topic stream-soak-test-node-name-repartition for task 0_2 due to:
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted 
a transactional operation in an invalid state.
Exception handler choose to FAIL the processing, no more records would be sent.
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:316)
at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285)
at 
org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1565)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:311)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:272)
at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeExceptionally(ProducerBatch.java:236)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:829)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:818)
at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:770)
at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:702)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$null$2(Sender.java:627)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$3(Sender.java:612)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:612)
at 
org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$9(Sender.java:916)
at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:608)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:600)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:348)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:250)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.common.errors.InvalidTxnStateException: The 
producer attempted a transactional operation in an invalid state.
{code} 

Just before the exception of task 0_2  also task 0_0  encountered an exception 
while producing:

{code:java}
[2024-05-30 10:20:35,880] ERROR [kafka-producer-network-thread | 
i-0af25f5c2bd9bba31-StreamThread-1-producer] stream-thread 
[i-0af25f5c2bd9bba31-StreamThread-1] stream-task [0_0] Error encountered 
sending record to topic stream-soak-test-network-id-repartition for task 0_0 
due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer 
attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since 
the producer is fenced, indicating the task may be migrated out 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl

[jira] [Reopened] (KAFKA-13295) Long restoration times for new tasks can lead to transaction timeouts

2024-05-30 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-13295:
---
  Assignee: (was: Sagar Rao)

> Long restoration times for new tasks can lead to transaction timeouts
> -
>
> Key: KAFKA-13295
> URL: https://issues.apache.org/jira/browse/KAFKA-13295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>  Labels: eos, new-streams-runtime-should-fix
> Fix For: 3.4.0
>
>
> In some EOS applications with relatively long restoration times we've noticed 
> a series of ProducerFencedExceptions occurring during/immediately after 
> restoration. The broker logs were able to confirm these were due to 
> transactions timing out.
> In Streams, it turns out we automatically begin a new txn when calling 
> {{send}} (if there isn’t already one in flight). A {{send}} occurs often 
> outside a commit during active processing (eg writing to the changelog), 
> leaving the txn open until the next commit. And if a StreamThread has been 
> actively processing when a rebalance results in a new stateful task without 
> revoking any existing tasks, the thread won’t actually commit this open txn 
> before it goes back into the restoration phase while it builds up state for 
> the new task. So the in-flight transaction is left open during restoration, 
> during which the StreamThread only consumes from the changelog without 
> committing, leaving it vulnerable to timing out when restoration times exceed 
> the configured transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-29 Thread Bruno Cadonna
    2.  Do this from StreamThread#start, following a similar 
lock-based
   approach to the one used #computeTaskLags, where each 
StreamThread

just
   makes a pass over the task directories on disk and attempts to 
lock

them
   one by one. If they obtain the lock, check whether there is state
but no
   checkpoint, and write the checkpoint if needed. If it can't grab
the lock,
   then we know one of the other StreamThreads must be handling the
checkpoint
   file for that task directory, and we can move on.

Don't really feel too strongly about which approach is best,  doing 
it in

KafkaStreams#start is certainly the most simple while doing it in the
StreamThread's startup is more efficient. If we're worried about 
adding too

much weight to KafkaStreams#start then the 2nd option is probably best,
though slightly more complicated.

Thoughts?

On Tue, May 14, 2024 at 10:02 AM Nick Telford 
wrote:


Hi everyone,

Sorry for the delay in replying. I've finally now got some time to work

on

this.

Addressing Matthias's comments:

100.
Good point. As Bruno mentioned, there's already

AbstractReadWriteDecorator

which we could leverage to provide that protection. I'll add details on
this to the KIP.

101,102.
It looks like these points have already been addressed by Bruno. Let me
know if anything here is still unclear or you feel needs to be detailed
more in the KIP.

103.
I'm in favour of anything that gets the old code removed sooner, but
wouldn't deprecating an API that we expect (some) users to implement

cause

problems?
I'm thinking about implementers of custom StateStores, as they may be
confused by managesOffsets() being deprecated, especially since they

would

have to mark their implementation as @Deprecated in order to avoid

compile

warnings.
If deprecating an API *while it's still expected to be implemented* is
something that's generally done in the project, then I'm happy to do so
here.

104.
I think this is technically possible, but at the cost of considerable
additional code to maintain. Would we ever have a pathway to remove 
this

downgrade code in the future?


Regarding rebalance metadata:
Opening all stores on start-up to read and cache their offsets is an
interesting idea, especially if we can avoid re-opening the stores once

the
Tasks have been assigned. Scalability shouldn't be too much of a 
problem,

because typically users have a fairly short state.cleanup.delay, so the
number of on-disk Task directories should rarely exceed the number of

Tasks

previously assigned to that instance.
An advantage of this approach is that it would also simplify StateStore
implementations, as they would only need to guarantee that committed
offsets are available when the store is open.

I'll investigate this approach this week for feasibility and report 
back.


I think that covers all the outstanding feedback, unless I missed

anything?


Regards,
Nick

On Mon, 6 May 2024 at 14:06, Bruno Cadonna  wrote:


Hi Matthias,

I see what you mean.

To sum up:

With this KIP the .checkpoint file is written when the store closes.
That is when:
1. a task moves away from Kafka Streams client
2. Kafka Streams client shuts down

A Kafka Streams client needs the information in the .checkpoint file
1. on startup because it does not have any open stores yet.
2. during rebalances for non-empty state directories of tasks that are
not assigned to the Kafka Streams client.

With hard crashes, i.e., when the Streams client is not able to close
its state stores and write the .checkpoint file, the .checkpoint file
might be quite stale. That influences the next rebalance after 
failover

negatively.


My conclusion is that Kafka Streams either needs to open the state
stores at start up or we write the checkpoint file more often.

Writing the .checkpoint file during processing more often without
controlling the flush to disk would work. However, Kafka Streams would
checkpoint offsets that are not yet persisted on disk by the state
store. That is with a hard crash the offsets in the .checkpoint file
might be larger than the offsets checkpointed in the state store. That
might not be a problem if Kafka Streams uses the .checkpoint file only
to compute the task lag. The downside is that it makes the managing of
checkpoints more complex because now we have to maintain two
checkpoints: one for restoration and one for computing the task lag.
I think we should explore the option where Kafka Streams opens the

state

stores at start up to get the offsets.

I also checked when Kafka Streams needs the checkpointed offsets to
compute the task lag during a rebalance. Turns out Kafka Streams needs
them before sending the join request. Now, I am wondering if opening

the

state stores of unassigned tasks whose state directory exists locally

is

actually such a big issue due to the expected higher latency since it
happens actually before the Kafka Streams client joins the rebalance.

Best,
Bruno







On 5/4/24 12:05 

[jira] [Resolved] (KAFKA-16350) StateUpdater does not init transaction after canceling task close action

2024-05-16 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16350.
---
Resolution: Fixed

> StateUpdater does not init transaction after canceling task close action
> 
>
> Key: KAFKA-16350
> URL: https://issues.apache.org/jira/browse/KAFKA-16350
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>    Assignee: Bruno Cadonna
>Priority: Major
> Attachments: 
> tyh5pkfmgwfoe-org.apache.kafka.streams.integration.EosIntegrationTest-shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2,
>  processing threads true]-1-output.txt
>
>
> With EOSv2, we use a thread producer shared across all tasks. We init tx on 
> the producer with each _task_ (due to EOSv1 which uses a producer per task), 
> and have a guard in place to only init tx a single time.
> If we hit an error, we close the producer and create a new one, which is 
> still not initialized for transaction. At the same time, with state updater, 
> we schedule a "close task" action on error.
> For each task we get back, we do cancel the "close task" action, to actually 
> keep the task. If this happens for _all_ tasks, we don't have any task in 
> state CRATED at hand, and thus we never init the producer for transactions, 
> because we assume this was already done.
> On the first `send` request, we crash with an IllegalStateException:{{{}{}}}
> {code:java}
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION 
> {code}
> This bug is exposed via EOSIntegrationTest (logs attached).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-15 Thread Bruno Cadonna
OWN_PROCESS_ID,
 UNKNOWN_TASK_ID
}

Anything missing?

(also updated all the code block headings, thanks for noticing that Bruno)

On Fri, May 3, 2024 at 9:33 AM Matthias J. Sax  wrote:


117f: Good point by Bruno. We should check for this, and could have an
additional `INVALID_STANDBY_TASK` error code?


-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:

Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna 

wrote:


Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
the information whether a task is stateless or stateful into the task
assignor. However, the task assignor can return a standby task for a
stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their
content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:

Thanks Sophie. My bad. You are of course right about `TaskAssignment`
and the StreamsPartitionAssignor's responsibitliy to map tasks of a
instance to consumers. When I wrote my reply, I forgot about this

detail.


Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by

Guozhang?


Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking

about

how users would actually produce an assignment, I realized that it

seems

silly to make it their responsibility to distinguish between a

stateless

and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to
figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task

types",

I'm proposing to just flatten the AssignedTask.Type enum to only

"ACTIVE"

and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the

return

type
being a Map rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a part of

the

full assignment, and not necessarily a full assignment for each.

Notice

that they all have an input parameter of the same type:

Map
KafkaStreamsAssignment>. The idea is you can take the output of any

of

these and pass it in to another to generate or optimize another

piece of

the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we
return a
TaskAssignment, it will usually need to be unwrapped right away.

Perhaps

more importantly, I worry that returning a TaskAssignment will make

it

seem
like each of these utility methods return a "full" and final

assignment

that can just be returned as-is from the TaskAssignor's #assign

method.

Whereas they are each just a single step in the full assignment

process,

and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman

wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of
anything
better.  I like your proposals though, will update the KIP names.
I'll also
add a "NONE" option as well -- much better than just passing in null
for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with

the

same active task


Would also be an error if assigned to two consumers of the same
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients,
it's not
responsible for the assignment of tasks to consumers within a
KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a

valid

assignment from the TaskAssignor with only one copy of a task
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that
task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this
case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
caused
the StreamsPartitionAssignor to assign a task to multiple

consumers, it

presumably wouldn't even notice since it's a bug -- if it did


Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-14 Thread Bruno Cadonna

Thanks!

+1 (binding)

Best,
Bruno

On 5/13/24 8:38 PM, Kirk True wrote:

+1 (non-binding)

Thanks Fred!


On May 13, 2024, at 5:46 AM, Bill Bejeck  wrote:

Thanks for the KIP!

+1 (binding)

-Bill


On Tue, May 7, 2024 at 6:16 PM Sophie Blee-Goldman 
wrote:


+1 (binding)

thanks for the KIP!

On Fri, May 3, 2024 at 9:13 AM Matthias J. Sax  wrote:


+1 (binding)

On 5/3/24 8:52 AM, Federico Valeri wrote:

Hi Fred, this is a useful addition.

+1 non binding

Thanks

On Fri, May 3, 2024 at 4:11 PM Andrew Schofield
 wrote:


Hi Fred,
Thanks for the KIP. It’s turned out nice and elegant I think.

Definitely a worthwhile improvement.


+1 (non-binding)

Thanks,
Andrew


On 30 Apr 2024, at 14:02, Frédérik Rouleau

 wrote:


Hi all,

As there is no more activity for a while on the discuss thread, I

think we

can start a vote.
The KIP is available on




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception



If you have some feedback or suggestions, please participate to the
discussion thread:
https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5

Best regards,
Fred










Re: [VOTE] KIP-924: customizable task assignment for Streams

2024-05-07 Thread Bruno Cadonna

Thanks for the KIP!

Looking forward to a well-structured task assignor!

+1 (binding)

Best,
Bruno

On 5/3/24 2:44 AM, Matthias J. Sax wrote:

I left one more nit on the discuss thread. But overall LGTM.

+1 (binding)

Thanks Rohan and Sophie for driving this KIP.


-Matthias

On 4/29/24 2:07 PM, Sophie Blee-Goldman wrote:

+1 (binding)

thanks for driving this KIP!

On Tue, Apr 16, 2024 at 1:46 PM Rohan Desai  
wrote:




https://cwiki.apache.org/confluence/display/KAFKA/KIP-924%3A+customizable+task+assignment+for+Streams

As this KIP has been open for a while, and gone through a couple 
rounds of

review/revision, I'm calling a vote to get it approved.





Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-06 Thread Bruno Cadonna

Hi Matthias,

I see what you mean.

To sum up:

With this KIP the .checkpoint file is written when the store closes. 
That is when:

1. a task moves away from Kafka Streams client
2. Kafka Streams client shuts down

A Kafka Streams client needs the information in the .checkpoint file
1. on startup because it does not have any open stores yet.
2. during rebalances for non-empty state directories of tasks that are 
not assigned to the Kafka Streams client.


With hard crashes, i.e., when the Streams client is not able to close 
its state stores and write the .checkpoint file, the .checkpoint file 
might be quite stale. That influences the next rebalance after failover 
negatively.



My conclusion is that Kafka Streams either needs to open the state 
stores at start up or we write the checkpoint file more often.


Writing the .checkpoint file during processing more often without 
controlling the flush to disk would work. However, Kafka Streams would 
checkpoint offsets that are not yet persisted on disk by the state 
store. That is with a hard crash the offsets in the .checkpoint file 
might be larger than the offsets checkpointed in the state store. That 
might not be a problem if Kafka Streams uses the .checkpoint file only 
to compute the task lag. The downside is that it makes the managing of 
checkpoints more complex because now we have to maintain two 
checkpoints: one for restoration and one for computing the task lag.
I think we should explore the option where Kafka Streams opens the state 
stores at start up to get the offsets.


I also checked when Kafka Streams needs the checkpointed offsets to 
compute the task lag during a rebalance. Turns out Kafka Streams needs 
them before sending the join request. Now, I am wondering if opening the 
state stores of unassigned tasks whose state directory exists locally is 
actually such a big issue due to the expected higher latency since it 
happens actually before the Kafka Streams client joins the rebalance.


Best,
Bruno







On 5/4/24 12:05 AM, Matthias J. Sax wrote:
That's good questions... I could think of a few approaches, but I admit 
it might all be a little bit tricky to code up...


However if we don't solve this problem, I think this KIP does not really 
solve the core issue we are facing? In the end, if we rely on the 
`.checkpoint` file to compute a task assignment, but the `.checkpoint` 
file can be arbitrary stale after a crash because we only write it on a 
clean close, there would be still a huge gap that this KIP does not close?


For the case in which we keep the checkpoint file, this KIP would still 
help for "soft errors" in which KS can recover, and roll back the store. 
A significant win for sure. -- But hard crashes would still be an 
problem? We might assign tasks to "wrong" instance, ie, which are not 
most up to date, as the checkpoint information could be very outdated? 
Would we end up with a half-baked solution? Would this be good enough to 
justify the introduced complexity? In the, for soft failures it's still 
a win. Just want to make sure we understand the limitations and make an 
educated decision.


Or do I miss something?


-Matthias

On 5/3/24 10:20 AM, Bruno Cadonna wrote:

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. 
Might be something to be worth calling out explicitly in the KIP 
writeup. -- Now that I realize that the position is tracked inside 
the store (not outside as the changelog offsets) it makes much more 
sense to pull position into RocksDB itself. In the end, it's actually 
a "store implementation" detail how it tracks the position (and kinda 
leaky abstraction currently, that we re-use the checkpoint file 
mechanism to track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not 
too bad? When KS starts up, we could upon all stores we find on local 
disk pro-actively, and keep them all open until the first rebalance 
finishes:

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could 
work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated 
in the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the 
offsets managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position 
is the offset from the upstream source topic, right? -- In the end, 
the position is about IQ, and if we fail to update it, it only means 
that there is some gap when we might not be able to query a standby 
task, because we think it's not up-to-date enough even if it is, 
which would resolve itself soon? Ie, the position might "lag", but 
it

Re: [VOTE] KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing

2024-05-03 Thread Bruno Cadonna

Hi Damien, Sébastien, and Loïc,

Thanks for the KIP!

+1 (binding)

Best,
Bruno


On 4/26/24 4:00 PM, Damien Gasparina wrote:

Hi all,

We would like to start a vote for KIP-1033: Add Kafka Streams
exception handler for exceptions occurring during processing

The KIP is available on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing

If you have any suggestions or feedback, feel free to participate to
the discussion thread:
https://lists.apache.org/thread/1nhhsrogmmv15o7mk9nj4kvkb5k2bx9s

Best regards,
Damien Sebastien and Loic


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Bruno Cadonna

Hi,

the KIP looks great!

public static final String PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG = 
"process.exception.handler".


needs to be changed to

public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = 
"processing.exception.handler".


The name of the constant has been already corrected in the code block 
but the actual name of the config (i.e., the content of the constant) 
has not been changed yet.



Best,
Bruno


On 5/3/24 10:35 AM, Sebastien Viale wrote:

Hi,
So, we all agree to revert to the regular Headers interface in 
ErrorHandlerContext.
We will update the KIP accordingly.
@Sophie => Yes, this is the last remaining question, and it has been open for 
voting since last week.
Thanks

Sébastien

De : Andrew Schofield 
Envoyé : vendredi 3 mai 2024 06:44
À : dev@kafka.apache.org 
Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1033: Add Kafka Streams 
exception handler for exceptions occuring during processing

Warning This might be a fraudulent message! When clicking REPLY, your answers 
will NOT go to the sender (andrew_schofi...@live.com). Instead, replies will be 
sent to dev@kafka.apache.org. Be cautious!

Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew


This email was screened for spam and malicious content but exercise caution 
anyway.



On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:

I tend to agree that we should just return a pure Headers instead of
introducing a new class/interface to protect overwriting them. I think a
pretty good case has been made already so I won't add onto it, just wanted
to voice my support.

Is that the only remaining question on this KIP? Might be ok to move to a
vote now?

On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:


Hi all, thanks Damien for the KIP!

After looking into the KIP and comments, my only concern is aligned with
one of Matthias comments, around the ImmutableHeaders introduction, with
the motivation not being clear enough. The existing handlers already expose
the headers (indirectly). Ex.
ProductionExceptionHandler.handleSerializationException provides the
ProducerRecord as an argument, so they are already exposed in those
callbacks through record.headers(). Is there a reason to think that it
would be a problem to expose the headers in the
new ProcessingExceptionHandler, but that it's not a problem for the
existing handler?

If there is no real concern about the KS engine requiring those headers, it
feels hard to mentally justify the complexity we transfer to the user by
exposing a new concept into the callbacks to represent the headers. In the
end, it strays aways from the simple/consistent representation of Headers
used all over. Even if eventually the KS engine needs to use the headers
after the callbacks with certainty that they were not altered, still feels
like it's something we could attempt to solve internally, without having to
transfer "new concepts" into the user (ex. the deep-copy as it was
suggested, seems like the kind of trade-off that would maybe be acceptable
here to gain simplicity and consistency among the handlers with a single
existing representation of Headers).

Best!

Lianet



On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:


Thanks for the update.

I am wondering if we should use `ReadOnlyHeaders` instead of
`ImmutableHeaders` as interface name?

Also, the returned `Header` interface is technically not immutable
either, because `Header#key()` returns a mutable byte-array... Would we
need a `ReadOnlyHeader` interface?

If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
of `Headers` but it would rather be a standalone interface, and a
wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
immutable type instead of `byte[]` for the value()?

An alternative would be to deep-copy the value byte-array what would not
be free, but given that we are talking about exception handling, it
would not be on the hot code path, and thus might be acceptable?


The above seems to increase the complexity significantly though. Hence,
I have seconds thoughts on the immutability question:

Do we really need to worry about mutability after all, because in the
end, KS runtime won't read the Headers instance after the handler was
called, and if a user modifies the passed in headers, there won't be any
actual damage (ie, no side effects)? For this case, it might even be ok
to also not add `ImmutableHeaders` to begin with?



Sorry for the forth and back (yes, forth and back, because back and
forth does not make sense -- it's not logical -- just 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure I 
do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening RocksDB 
stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we crash, we don't want to lose the state, but when we 
update the .checkpoint only on a clean close, the .checkpoint might be 
stale (ie, still contains the checkpoint when we opened the store when 
we got a task assigned).




-Matthias

On 4/30/24 2:40 AM, Bruno Cadonna wrote:

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we 
need to also let the state store also manage the position otherwise 
they might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent 
offsets and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At 
least Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out thi

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Bruno Cadonna
ack to the party here :)

107: I agree with the rationale behind this, and
`numProcessingThreads` looks good to me as it covers both the current
and future scenarios.

117: I agree with Lucas and Bruno, and would add:
   * 117e: unknown taskID: fail
   * 117f: inconsistent task types (e.g. a known taskID was indicated
stateless from ApplicationState, but the returned AssignedTask states
stateful): fail
   * 117g: some ProcessID was not included in the returned Set: pass,
and interprets it as no tasks assigned to it.

And I'm open for any creative error codes folks would come up with :)


If any of these errors are detected, the StreamsPartitionAssignor will
immediately "fail" the rebalance and retry it by scheduling an 
immediate

followup rebalance.

I'm also a bit concerned here, as such endless retry loops have
happened in the past in my memory. Given that we would likely see most
of the user implementations be deterministic, I'm also leaning towards
failing the app immediately and let the crowd educates us if there are
some very interesting scenarios out there that are not on our radar to
re-consider this, rather than getting hard to debug cases in the dark.

-

And here are just some nits about the KIP writings itself:

* I think some bullet points under `User APIs` and `Read-only APIs`
should have a lower level indention? It caught me for a sec until I
realized there are just two categories.

* In TaskAssignmentUtils , why not let those util functions return
`TaskAssignment` (to me it feels more consistent with the user APIs),
but instead return a Map?


Guozhang

On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  
wrote:


I like the idea of error codes. Not sure if the name are ideal?
UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
difficult to understand?

Should we be very descriptive (and also try to avoid coupling it to 
the

threading model -- important for the first error code):
   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE

I think we also need to add NONE as option or make the error parameter
an `Optional`?



OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the

same active task


Would also be an error if assigned to two consumers of the same
client... Needs to be rephrased.




If any of these errors are detected, the StreamsPartitionAssignor

will immediately "fail" the rebalance and retry it by scheduling an
immediate followup rebalance.


Does this make sense? If we assume that the task-assignment is
deterministic, we would end up with an infinite retry loop? Also,
assuming that an client leave the group, we cannot assign some task 
any

longer... I would rather throw a StreamsException and let the client

crash.




-Matthias

On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:

One last thing: I added an error code enum to be returned from the
#onAssignmentComputed method in case of an invalid assignment. I

created

one code for each of the invalid cases we described above. The

downside is

that this means we'll have to go through a deprecation cycle if we

want to

loosen up the restrictions on any of the enforced cases. The upside

is that

we can very clearly mark what is an invalid assignment and this will
(hopefully) assist users who are new to customizing assignments by

clearly
denoting the requirements, and returning a clear error if they are 
not

followed.

Of course the StreamsPartitionAssignor will also do a "fallback &

retry" in

this case by returning the same assignment to the consumers and

scheduling

a followup rebalance. I've added all of this to the TaskAssignor  and
#onAssignmentComputed javadocs, and added a section under "Public

Changes"

as well.

Please let me know if there are any concerns, or if you have

suggestions

for how else we can handle an invalid assignment

On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Thanks guys! I agree with what Lucas said about 117c, we can always

loosen

a restriction later and I don't want to do anything now that might

get in

the way of the new threading models.

With that I think we're all in agreement on 117. I'll update the KIP

to

include what we've discussed

(and will fix the remaining #finalAssignment mention as well, thanks
Bruno. Glad to have such good proof readers! :P)

On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna 

wrote:



Hi again,

I forgot to ask whether you could add the agreement about handling
invalid assignment to the KIP.

Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in

some

situations
c) fail: For the reasons Lucas pointed out. I am missing a good 
use

case

here.
d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Bruno Cadonna

Hi again,

I forgot to ask whether you could add the agreement about handling 
invalid assignment to the KIP.


Best,
Bruno

On 4/30/24 2:00 PM, Bruno Cadonna wrote:

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some 
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use case 
here.

d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the 
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is 
called with the final computed GroupAssignment ..."



Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible* 
assignment,

or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised 
so far,

so I just want to remind folks that there's already a voting thread for
this. I cast my vote a few minutes ago so it should resurface in 
everyone's

inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered invalid by the existing assignors but are
still valid assignments in the sense that they can be used to 
generate a
valid consumer group assignment (from the perspective of the 
consumer group

protocol). An assignment that excludes a task is one such example, and
Sophie pointed out a good use case for it. I also think it makes 
sense to
allow these. It's hard to predict how a user might want to use the 
custom

assignor, and its reasonable to expect them to use it with care and not
hand-hold them.
2. cases that are not valid because it is impossible to compute a valid
consumer group assignment from them. In this case it seems totally
reasonable to just throw a fatal exception that gets passed to the 
uncaught

exception handler. If this case happens then there is some bug in the
user's assignor and its totally reasonable to fail the application 
in that
case. We _could_ try to be more graceful and default to one of the 
existing
assignors. But it's usually better to fail hard and fast when there 
is some

illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
wrote:


Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
wrote:

Thanks for the feedback Bruno! For the most part I think it makes 
sense,

but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we 
might want

to wrap the return type for `assign` in a class so that its easily
extensible. This makes sense to me. Whether we do that or not, we can

have

the return type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I
like your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition

assignor

is clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful

personally.
I think its worth adding an interface that lets the plugin 
observe the

final assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with

that.


On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 


wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation
behind it, does our controller also want to change how the
partitions->tasks mapping is formed?
You're right that this is out of place. I've removed this method as
it's not needed by the task assignor.

2. Just on the API la

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-30 Thread Bruno Cadonna

Hi all,

I think we are converging!

117
a) fail: Since it is an invalid consumer assignment
b) pass: I agree that not assigning a task might be reasonable in some 
situations
c) fail: For the reasons Lucas pointed out. I am missing a good use case 
here.

d) fail: It is invalid


Somewhere in the KIP you still use finalAssignment() instead of the 
wonderful method name onAssignmentComputed() ;-)
"... interface also includes a method named finalAssignment which is 
called with the final computed GroupAssignment ..."



Best,
Bruno


On 4/30/24 1:04 PM, Lucas Brutschy wrote:

Hi,

Looks like a great KIP to me!

I'm late, so I'm only going to comment on the last open point 117. I'm
against any fallbacks like "use the default assignor if the custom
assignment is invalid", as it's just going to hide bugs. For the 4
cases mentioned by Sophie:

117a) I'd fail immediately here, as it's an implementation bug, and
should not lead to a valid consumer group assignment.
117b) Agreed. This is a useful assignment and should be allowed.
117c) This is the tricky case. However, I'm leaning towards not
allowing this, unless we have a concrete use case. This will block us
from potentially using a single consumer for active and standby tasks
in the future. It's easier to drop the restriction later if we have a
concrete use case.
117d) Definitely fail immediately, as you said.

Cheers,
Lucas



On Mon, Apr 29, 2024 at 11:13 PM Sophie Blee-Goldman
 wrote:


Yeah I think that sums it up well. Either you computed a *possible* assignment,
or you returned something that makes it literally impossible for the
StreamsPartitionAssignor to decipher/translate into an actual group
assignment, in which case it should just fail

That's more or less it for the open questions that have been raised so far,
so I just want to remind folks that there's already a voting thread for
this. I cast my vote a few minutes ago so it should resurface in everyone's
inbox :)

On Thu, Apr 25, 2024 at 11:42 PM Rohan Desai 
wrote:


117: as Sophie laid out, there are two cases here right:
1. cases that are considered invalid by the existing assignors but are
still valid assignments in the sense that they can be used to generate a
valid consumer group assignment (from the perspective of the consumer group
protocol). An assignment that excludes a task is one such example, and
Sophie pointed out a good use case for it. I also think it makes sense to
allow these. It's hard to predict how a user might want to use the custom
assignor, and its reasonable to expect them to use it with care and not
hand-hold them.
2. cases that are not valid because it is impossible to compute a valid
consumer group assignment from them. In this case it seems totally
reasonable to just throw a fatal exception that gets passed to the uncaught
exception handler. If this case happens then there is some bug in the
user's assignor and its totally reasonable to fail the application in that
case. We _could_ try to be more graceful and default to one of the existing
assignors. But it's usually better to fail hard and fast when there is some
illegal state detected imo.

On Fri, Apr 19, 2024 at 4:18 PM Rohan Desai 
wrote:


Bruno, I've incorporated your feedback into the KIP document.

On Fri, Apr 19, 2024 at 3:55 PM Rohan Desai 
wrote:


Thanks for the feedback Bruno! For the most part I think it makes sense,
but leaving a couple follow-up thoughts/questions:

re 4: I think Sophie's point was slightly different - that we might want
to wrap the return type for `assign` in a class so that its easily
extensible. This makes sense to me. Whether we do that or not, we can

have

the return type be a Set instead of a Map as well.

re 6: Yes, it's a callback that's called with the final assignment. I
like your suggested name.

On Fri, Apr 5, 2024 at 12:17 PM Rohan Desai 
wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition

assignor

is clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful

personally.

I think its worth adding an interface that lets the plugin observe the
final assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with

that.


On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation
behind it, does our controller also want to change how the
partitions->tasks mapping is formed?
You're right that this is out of place. I've removed this method as
it's not needed by the task assignor.


2. Just on the API layering itself: it feels a bit weird to have the

three built-in functions (defaultStandbyTaskAssignment etc) sitting in
the ApplicationMetadata class. If we consider them as some default

util

functions, how about 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-30 Thread Bruno Cadonna

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we need 
to also let the state store also manage the position otherwise they 
might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent offsets 
and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At least 
Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex beast by itself, so worth to discuss by its own.



Couple of question / comment:


100 `StateStore#commit()`: The JavaDoc says "must not be called by 
users" -- I would propose to put a guard in place for this, by either 
throwing an exception (preferable) or adding a no-op implementation (at 
least for our own stores, by wrapping them -- we cannot enforce it for 
custom stores I assume), and document this contract explicitly.



101 adding `.position` to the store: Why do we actually need this? The 
KIP says "To ensure consistency with the committed data and changelog 
offsets" but I am not sure if I can follow? Can you elaborate why 
leaving the `.position` file as-is won't work?



If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I 
think

it is possible, and probably not too expensive, but the devil will be in
the detail.


This sounds like a significant overhead to me. We know that opening a 
single RocksDB takes about 500ms, and thus opening RocksDB to get this 
information might slow down rebalances significantly.



102: It's unclear to me, how `.position` information is added. The KIP 
only says: "position offsets will be stored in RocksDB, in the same 
column family as the changelog offsets". Do you intent to add this 
information to the map passed via `commit(final MapLong> changelogOffsets)`? The KIP should describe this in more detail. 
Also, if my assumption is correct, we might want to rename the parameter 
and also have a better JavaDoc description?



103: Should we make it mandatory (long-term) that all stores (including 
custom stores) manage their offsets internally? Maintaining both options 
and thus both code paths puts a burden on everyone and make the code 
messy. I would strongly prefer if we could have mid-term path to get rid 
of supporting both.  -- For this case, we should deprecate the newly 
added `managesOffsets()` method right away, to point out that we intend 
to remove it. If it's mandatory to maintain offsets for stores, we won't 
need this method any longer. In memory stores can just return null from 
#committedOffset().



104 "downgrading": I think it might be worth to add support for 
downgrading w/o the need to wipe stores? Leveraging `upgrade.from` 
parameter, we could build a two rolling bounce downgrade: (1) the new 
code is started with `upgrade.from` set to a lower version, telling the 
runtime to do the cleanup on `close()` -- (ie, ensure that all data is 
written into `.checkpoint` and `.position` file, and the newly added CL 
is deleted). In a second, rolling bounce, the old code would be able to 
open RocksDB. -- I understand that this implies much more work, but 
downgrade seems to be common enough, that it might be worth it? Even if 
we did not always support this in the past, we have the face the fact 
that KS is getting more and more adopted and as a more mature product 
should support this?





-Matthias







On 4/21/24 11:58 PM, Bruno Cadonna wrote:

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but 
locally existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can 
read the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the stat

Re: [ANNOUNCE] New committer: Igor Soarez

2024-04-24 Thread Bruno Cadonna
Congrats!

Best,
Bruno

Am 25. April 2024 05:18:19 MESZ schrieb Yash Mayya :
>Congratulations Igor!
>
>On Wed, 24 Apr, 2024, 23:36 Colin McCabe,  wrote:
>
>> Hi all,
>>
>> The PMC of Apache Kafka is pleased to announce a new Kafka committer, Igor
>> Soarez.
>>
>> Igor has been a Kafka contributor since 2019. In addition to being a
>> regular contributor and reviewer, he has made significant contributions to
>> improving Kafka's JBOD support in KRaft mode. He has also contributed to
>> discussing and reviewing many KIPs such as KIP-690, KIP-554, KIP-866, and
>> KIP-938.
>>
>> Congratulations, Igor!
>>
>> Thanks,
>>
>> Colin (on behalf of the Apache Kafka PMC)
>>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-22 Thread Bruno Cadonna

Hi Damien,

Thanks a lot for the updates!

I have the following comments:

(1)
Could you rename ProcessingMetadata to ErrorHandlerContext or 
ErrorHandlerMetadata (I am preferring the former)? I think it makes it 
clearer for what this context/metadata is for.



(2)
Is there any reason you did not use something like

Record sourceRecord()

in ProcessingMetadata instead of sourceRawKey() and sourceRawValue() and 
headers()? The headers() method refers to the record read from the input 
topic of the sub-topology, right? If yes, maybe that is also something 
to mention more explicitly.



(3)
Since you added the processor node ID to the ProcessingMetadata, you can 
remove it from the signature of method handle() in 
ProcessingExceptionHandler.



(4)
Where are the mentioned changes to the DeserializationExceptionHandler?


(5)
To be consistent, the order of the parameters in the 
ProductionExceptionHandler should be:

1. context
2. record
3. exception


(6)
I am wondering where the implementation of ProcessingMetadata gets the 
sourceRawKey/Value from. Do we need additional changes in 
ProcessingContext and implementations?



Best,
Bruno


On 4/21/24 2:23 PM, Damien Gasparina wrote:

Hi Everyone,

Following the discussions on KIP-1033 and KIP-1034, we did a few changes:
   - We introduced a new ProcessingMetadata class containing only the
ProcessorContext metadata: topic, partition, offset, headers[],
sourceRawKey, sourceRawValue, TaskId, ProcessorNodeName
   - To be consistent, we propose to deprecate the existing
DeserializationExceptionHandler and ProductionExceptionHandler methods
to rely on the new ProcessingMetadata
   - The creation and the ProcessingMetadata and the deprecation of old
methods is owned by KIP-1033, KIP-1034 (DLQ) is now only focusing on
Dead Letter Queue implementation without touching any interfaces. We
introduced a hard dependency for KIP-1034 regarding KIP-1033, we think
it's the wisest implementation wise.
- Instead of creating a new metric, KIP-1033 updates the
dropped-record metric.

Let me know what you think, if everything's fine, I think we should be
good to start a VOTE?

Cheers,
Damien





On Fri, 12 Apr 2024 at 22:24, Sophie Blee-Goldman  wrote:


Fully agree about creating a new class for the bits of ProcessingContext
that are specific to metadata only. In fact, more or less this same point
just came up in the related KIP 1034 for DLQs, since the RecordMetadata
can't always be trusted to remain immutable. Maybe it's possible to solve
both issues at once, with the same class?

On another related note -- I had actually also just proposed that we
deprecate the existing DeserializationExceptionHandler method and replace
it with one using the new PAPI as part of KIP-1034. But now that I'm
reading this, I would say it probably makes more sense to do in this KIP.
We can also push that out into a smaller-scoped third KIP if you want, but
clearly, there is some overlap here and so however you guys (the authors)
want to organize this part of the work is fine with me. I do think it
should be done alongside/before this KIP and 1034 though, for all the
reasons already stated.

Everything else in the discussion so far I agree with! The
ProcessingContext thing is the only open question in my mind

On Thu, Apr 11, 2024 at 5:41 AM Damien Gasparina 
wrote:


Hi Matthias, Bruno,

1.a During my previous comment, by Processor Node ID, I meant
Processor name. This is important information to expose in the handler
as it allows users to identify the location of the exception in the
topology.
I assume this information could be useful in other places, that's why
I would lean toward adding this as an attribute in the
ProcessingContext.

1.b Looking at the ProcessingContext, I do think the following 3
methods should not be accessible in the exception handlers:
getStateStore(), schedule() and commit().
Having a separate interface would make a cleaner signature. It would
also be a great time to ensure that all exception handlers are
consistent, at the moment, the
DeserializationExceptionHandler.handle() relies on the old PAPI
ProcessorContext and the ProductionExceptionHandler.handle() has none.
It could make sense to build the new interface in this KIP and track
the effort to migrate the existing handlers in a separate KIP, what do
you think?
Maybe I am overthinking this part and the ProcessingContext would be fine.

4. Good point regarding the dropped-record metric, as it is used by
the other handlers, I do think it makes sense to leverage it instead
of creating a new metric.
I will update the KIP to update the dropped-record-metric.

8. Regarding the DSL, I am aligned with Bruno, I think we could close
the gaps in a future KIP.

Cheers,
Damien


On Thu, 11 Apr 2024 at 11:56, Bruno Cadonna  wrote:


Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of
processor node metrics. That ID cannot be internal since it is exposed
in metrics. I think the processor

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-22 Thread Bruno Cadonna

Hi all,

How should we proceed here?

1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but locally 
existing task state


While I like option 2, I think option 1 is less risky and will give us 
the benefits of transactional state stores sooner. We should consider 
the interface approach afterwards, though.



Best,
Bruno



On 4/17/24 3:15 PM, Bruno Cadonna wrote:

Hi Nick and Sophie,

I think the task ID is not enough to create a state store that can read 
the offsets of non-assigned tasks for lag computation during 
rebalancing. The state store also needs the state directory so that it 
knows where to find the information that it needs to return from 
changelogOffsets().


In general, I think we should proceed with the plain .checkpoint file 
for now and iterate back to the state store solution later since it 
seems it is not that straightforward. Alternatively, Nick could timebox 
an effort to better understand what would be needed for the state store 
solution. Nick, let us know your decision.


Regarding your question about the state store instance. I am not too 
familiar with that part of the code, but I think the state store is 
build when the processor topology is build and the processor topology is 
build per stream task. So there is one instance of processor topology 
and state store per stream task. Try to follow the call in [1].


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153




On 4/16/24 8:59 PM, Nick Telford wrote:

That does make sense. The one thing I can't figure out is how per-Task
StateStore instances are constructed.

It looks like we construct one StateStore instance for the whole Topology
(in InternalTopologyBuilder), and pass that into ProcessorStateManager 
(via

StateManagerUtil) for each Task, which then initializes it.

This can't be the case though, otherwise multiple partitions of the same
sub-topology (aka Tasks) would share the same StateStore instance, which
they don't.

What am I missing?

On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman 
wrote:

I don't think we need to *require* a constructor accept the TaskId, 
but we

would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores can just
decide for themselves whether they want to opt-in/use the TaskId param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that accepts
a TaskId

Just to be super clear about the proposal, this is what I had in mind.
It's actually fairly simple and wouldn't add much to the scope of the
KIP (I think -- if it turns out to be more complicated than I'm 
assuming,

we should definitely do whatever has the smallest LOE to get this done

Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:

default T get(final TaskId taskId) {
 return get();
}

We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.

And that's it for public API changes! Internally, we would just adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add

@Override
public KeyValueStore get(final TaskId taskId) {
 return returnTimestampedStore ?
 new RocksDBTimestampedStore(name, metricsScope(), taskId) :
 new RocksDBStore(name, metricsScope(), taskId);
}

And of course add the TaskId parameter to each of the actual
state store constructors returned here.

Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.

On Mon, Apr 15, 2024 at 10:21 AM Nick Telford 
wrote:


Hi Sophie,

Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the

TaskId.

Is it enough to add the parameter to the StoreSupplier?

Would doing this be in-scope for this KIP, or are we over-complicating

it?


Nick

On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman 


wrote:


Somewhat minor point overall, but it actually drives me crazy that you
can't get access to the taskId of a StateStore until #init is called.

This

has caused me a huge headache personally (since the same is true for
processors and I was trying to do something that's probably too hacky

to

actually complain about here lol)

Can we just change the StateStoreSupplier to receive and pass along

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-18 Thread Bruno Cadonna
type

[5] I guess not. I think ApplicationMetadata was added during the initial
KIP discussion so that's probably why it doesn't follow the same naming
pattern. Personally I'm fine either way (I do think ApplicationMetadata
sounds a bit better but that's not a good enough reason :P)

Thanks Bruno!

On Wed, Apr 17, 2024 at 7:08 AM Bruno Cadonna  wrote:


Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka Streams
we do not really have the concept of node but we have the concept of
client (admittedly, we sometimes also use instance). I would like to
avoid introducing a new term to basically describe the Streams client.
I know that we already have a ClientState but that would be in a
different package.

(2)
Did you consider to use Instant instead of long as return type of
followupRebalanceDeadline()? Instant is a bit more flexible and readable
as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in
interface NodeAssignment.

(3)
Did you consider to use an enum instead of class AssignedTask? As far as
I understand not all combinations are possible. A stateless standby task
does not exist. An enum with values STATELESS, STATEFUL, STANDBY would
be clearer. Or even better instead of two methods in AssignedTask that
return a boolean you could have one method -- say type() -- that returns
the enum.

(4)
Does the return type of assignment need to be a map from task ID to
AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks
with AssignedTask containing the task ID?

(5)
I there a semantic difference between *State and *Metadata? I was
wondering whether ApplicationMetadata could also be ApplicationState for
the sake of consistency.

Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:

Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call

for

a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai 

wrote:



Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition

assignor is

clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful

personally. I

think its worth adding an interface that lets the plugin observe the

final

assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with

that.


On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation

behind

it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as

it's

not needed by the task assignor.


2. Just on the API layering itself: it feels a bit weird to have the

three built-in functions (defaultStandbyTaskAssignment etc) sitting in

the

ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static

util

methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been

moved

to TaskAssignorUtils


3. I personally prefer `NodeAssignment` to be a read-only object

containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users

use

whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.


1. For the API, thoughts on changing the method signature to return a

(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want

to

implement every method.
Based on some other discussion, I actually decided to get rid of the
plugin interface, and instead use config to specify individual plugin
behaviour. So the method you're referring to is no longer part of the
proposal.


3. Speaking of ApplicationMetadata, the javadoc says it's read only

but

theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if

we

could flip that interface such that it becomes part of the output

instead

of an input to the plugin.
I've moved those methods to a util class. They're really utility

methods

the assignor might want to call to do some default or optimized

assignment

for some cases like rack-awareness.


4. We should consider wrappin

Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-04-17 Thread Bruno Cadonna

Hi,

sorry, I am late to the party.

I have a couple of comments:

(1)
I would prefer Client* instead of Node* in the names. In Kafka Streams 
we do not really have the concept of node but we have the concept of 
client (admittedly, we sometimes also use instance). I would like to 
avoid introducing a new term to basically describe the Streams client.
I know that we already have a ClientState but that would be in a 
different package.


(2)
Did you consider to use Instant instead of long as return type of 
followupRebalanceDeadline()? Instant is a bit more flexible and readable 
as a plain long, IMO. BTW, you list followupRebalanceDeadline() twice in 
interface NodeAssignment.


(3)
Did you consider to use an enum instead of class AssignedTask? As far as 
I understand not all combinations are possible. A stateless standby task 
does not exist. An enum with values STATELESS, STATEFUL, STANDBY would 
be clearer. Or even better instead of two methods in AssignedTask that 
return a boolean you could have one method -- say type() -- that returns 
the enum.


(4)
Does the return type of assignment need to be a map from task ID to 
AssignedTask? Wouldn't it be enough to be a collection of AssignedTasks 
with AssignedTask containing the task ID?


(5)
I there a semantic difference between *State and *Metadata? I was 
wondering whether ApplicationMetadata could also be ApplicationState for 
the sake of consistency.


Best,
Bruno


On 4/5/24 11:18 PM, Sophie Blee-Goldman wrote:

Cool, looks good to me!

Seems like there is no further feedback, so maybe we can start to call for
a vote?

However, since as noted we are setting aside time to discuss this during
the sync next Thursday, we can also wait until after that meeting to
officially kick off the vote.

On Fri, Apr 5, 2024 at 12:19 PM Rohan Desai  wrote:


Thanks for the feedback Sophie!

re1: Totally agree. The fact that it's related to the partition assignor is
clear from just `task.assignor`. I'll update.
re3: This is a good point, and something I would find useful personally. I
think its worth adding an interface that lets the plugin observe the final
assignment. I'll add that.
re4: I like the new `NodeAssignment` type. I'll update the KIP with that.

On Thu, Nov 9, 2023 at 11:18 PM Rohan Desai 
wrote:


Thanks for the feedback so far! I think pretty much all of it is
reasonable. I'll reply to it inline:


1. All the API logic is granular at the Task level, except the

previousOwnerForPartition func. I’m not clear what’s the motivation

behind

it, does our controller also want to change how the partitions->tasks
mapping is formed?
You're right that this is out of place. I've removed this method as it's
not needed by the task assignor.


2. Just on the API layering itself: it feels a bit weird to have the

three built-in functions (defaultStandbyTaskAssignment etc) sitting in

the

ApplicationMetadata class. If we consider them as some default util
functions, how about introducing moving those into their own static util
methods to separate from the ApplicationMetadata “fact objects” ?
Agreed. Updated in the latest revision of the kip. These have been moved
to TaskAssignorUtils


3. I personally prefer `NodeAssignment` to be a read-only object

containing the decisions made by the assignor, including the
requestFollowupRebalance flag. For manipulating the half-baked results
inside the assignor itself, maybe we can just be flexible to let users

use

whatever struts / their own classes even, if they like. WDYT?
Agreed. Updated in the latest version of the kip.


1. For the API, thoughts on changing the method signature to return a

(non-Optional) TaskAssignor? Then we can either have the default
implementation return new HighAvailabilityTaskAssignor or just have a
default implementation class that people can extend if they don't want to
implement every method.
Based on some other discussion, I actually decided to get rid of the
plugin interface, and instead use config to specify individual plugin
behaviour. So the method you're referring to is no longer part of the
proposal.


3. Speaking of ApplicationMetadata, the javadoc says it's read only but

theres methods that return void on it? It's not totally clear to me how
that interface is supposed to be used by the assignor. It'd be nice if we
could flip that interface such that it becomes part of the output instead
of an input to the plugin.
I've moved those methods to a util class. They're really utility methods
the assignor might want to call to do some default or optimized

assignment

for some cases like rack-awareness.


4. We should consider wrapping UUID in a ProcessID class so that we

control
the interface (there are a few places where UUID is directly used).
I like it. Updated the proposal.


5. What does NodeState#newAssignmentForNode() do? I thought the point

was
for the plugin to make the assignment? Is that the result of the default
logic?
It doesn't need to be part of the interface. I've removed 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-17 Thread Bruno Cadonna
, it's clear that this can't work for one simple

reason:

StateStores don't know their associated TaskId (and hence, their
StateDirectory) until the init() call. Therefore, committedOffset()

can't

be called before init(), unless we also added a StateStoreContext

argument

to committedOffset(), which I think might be trying to shoehorn too

much

into committedOffset().

I still don't like the idea of the Streams engine maintaining the

cache

of

changelog offsets independently of stores, mostly because of the
maintenance burden of the code duplication, but it looks like we'll

have

to

live with it.

Unless you have any better ideas?

Regards,
Nick

On Wed, 10 Apr 2024 at 14:12, Nick Telford 

wrote:



Hi Bruno,

Immediately after I sent my response, I looked at the codebase and

came

to

the same conclusion. If it's possible at all, it will need to be

done

by

creating temporary StateManagers and StateStores during rebalance.

I

think

it is possible, and probably not too expensive, but the devil will

be

in

the detail.

I'll try to find some time to explore the idea to see if it's

possible

and

report back, because we'll need to determine this before we can

vote

on

the

KIP.

Regards,
Nick

On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna 

wrote:



Hi Nick,

Thanks for reacting on my comments so quickly!


2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the task

is

not

assigned locally, we do not create those tasks. To get the offsets

with

your approach, we would need to either create kind of inactive

tasks

besides active and standby tasks or store and manage state

managers

of

non-assigned tasks differently than the state managers of assigned
tasks. Additionally, the cleanup thread that removes unassigned

task

directories needs to concurrently delete those inactive tasks or
task-less state managers of unassigned tasks. This seems all quite

messy

to me.
Could we create those state managers (or state stores) for locally
existing but unassigned tasks on demand when
TaskManager#getTaskOffsetSums() is executed? Or have a different
encapsulation for the unused task directories?


Best,
Bruno



On 4/10/24 11:31 AM, Nick Telford wrote:

Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had

originally

adapted this from the guarantees outlined in KIP-892. But it's

difficult to

provide these guarantees without the KIP-892 transaction

buffers.

Instead,

we'll add the guarantees back into the JavaDoc when KIP-892

lands.


2.
Good point! This is the only part of the KIP that was

(significantly)

changed when I extracted it from KIP-892. My prototype currently

maintains

this "cache" of changelog offsets in .checkpoint, but doing so

becomes

very

messy. My intent with this change was to try to better

encapsulate

this

offset "caching", especially for StateStores that can cheaply

provide

the

offsets stored directly in them without needing to duplicate

them

in

this

cache.

It's clear some more work is needed here to better encapsulate

this.

My

immediate thought is: what if we construct *but don't

initialize*

the

StateManager and StateStores for every Task directory on-disk?

That

should

still be quite cheap to do, and would enable us to query the

offsets

for

all on-disk stores, even if they're not open. If the

StateManager

(aka.

ProcessorStateManager/GlobalStateManager) proves too expensive

to

hold

open

for closed stores, we could always have a "StubStateManager" in

its

place,

that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna 

wrote:



Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not

use

the

"get"-prefix for getters on the public API. Could you please

change

getCommittedOffsets() to committedOffsets()?


2.
It is not clear to me how TaskManager#getTaskOffsetSums()

should

read

offsets of tasks the stream thread does not own but that have a

state

directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not own a

task

it

does also not create any state stores for the task, which means

there

is

no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written

for

all

state stores on close -- not only for the RocksDBStore -- and

that

this

checkpoint file is read in TaskManager#getTaskOffsetSums() for

the

tasks

that have a state directory on the client but are not currently

assigned

to any stream thread of the Streams client.


3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since

init(StateStore)

*MUST* be available to readers, even after a restart.

Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-14 Thread Bruno Cadonna

Congratulations, Greg!

Best,
Bruno

On 4/15/24 7:33 AM, Claude Warren wrote:

Congrats Greg!  All the hard work paid off.

On Mon, Apr 15, 2024 at 6:58 AM Ivan Yurchenko  wrote:


Congrats Greg!

On Sun, Apr 14, 2024, at 22:51, Sophie Blee-Goldman wrote:

Congrats Greg! Happy to have you

On Sun, Apr 14, 2024 at 9:26 AM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


Congrats, Greg!!

On Sun 14. Apr 2024 at 15.05, Josep Prat 
wrote:


Congrats Greg!!!


Best,

Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Sun, Apr 14, 2024, 12:30 Divij Vaidya 

wrote:



Congratulations Greg!

--
Divij Vaidya



On Sun, Apr 14, 2024 at 6:39 AM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:


Congratulations, Greg!

On Sun, Apr 14, 2024 at 8:57 AM Yash Mayya 


wrote:



Congrats Greg!

On Sun, 14 Apr, 2024, 05:56 Randall Hauch, 

wrote:



Congratulations, Greg!

On Sat, Apr 13, 2024 at 6:36 PM Luke Chen 


wrote:



Congrats, Greg!

On Sun, Apr 14, 2024 at 7:05 AM Viktor Somogyi-Vass
 wrote:


Congrats Greg! :)

On Sun, Apr 14, 2024, 00:35 Bill Bejeck <

bbej...@gmail.com>

wrote:



Congrats Greg!

-Bill

On Sat, Apr 13, 2024 at 4:25 PM Boudjelda Mohamed Said

<

bmsc...@gmail.com>

wrote:


Congratulations Greg

On Sat 13 Apr 2024 at 20:42, Chris Egerton <

ceger...@apache.org>

wrote:



Hi all,

Greg Harris has been a Kafka committer since July

2023.

He

has

remained

very active and instructive in the community since

becoming a

committer.

It's my pleasure to announce that Greg is now a

member

of

Kafka

PMC.


Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC






























Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-11 Thread Bruno Cadonna

Hi Matthias,


1.a
With processor node ID, I mean the ID that is exposed in the tags of 
processor node metrics. That ID cannot be internal since it is exposed 
in metrics. I think the processor name and the processor node ID is the 
same thing. I followed how the processor node ID is set in metrics and I 
ended up in addProcessor(name, ...).



1.b
Regarding ProcessingContext, I also thought about a separate class to 
pass-in context information into the handler, but then I dismissed the 
idea because I thought I was overthinking it. Apparently, I was not 
overthinking it if you also had the same idea. So let's consider a 
separate class.



4.
Regarding the metric, thanks for pointing to the dropped-record metric, 
Matthias. The dropped-record metric is used with the deserialization 
handler and the production handler. So, it would make sense to also use 
it for this handler. However, the dropped-record metric only records 
records that are skipped by the handler and not the number of calls to 
the handler. But that difference is probably irrelevant since in case of 
FAIL, the metric will be reset anyways since the stream thread will be 
restarted. In conclusion, I think the dropped-record metric in 
combination with a warn log message might be the better choice to 
introducing a new metric.



8.
Regarding the DSL, I think we should close possible gaps in a separate KIP.


Best,
Bruno

On 4/11/24 12:06 AM, Matthias J. Sax wrote:

Thanks for the KIP. Great discussion.

I am not sure if I understand the proposal from Bruno to hand in the 
processor node id? Isn't this internal (could not even find it quickly). 
We do have a processor name, right? Or do I mix up something?


Another question is about `ProcessingContext` -- it contains a lot of 
(potentially irrelevant?) metadata. We should think carefully about what 
we want to pass in and what not -- removing stuff is hard, but adding 
stuff is easy. It's always an option to create a new interface that only 
exposes stuff we find useful, and allows us to evolve this interface 
independent of others. Re-using an existing interface always has the 
danger to introduce an undesired coupling that could bite us in the 
future. -- It make total sense to pass in `RecordMetadata`, but 
`ProcessingContext` (even if already limited compared to 
`ProcessorContext`) still seems to be too broad? For example, there is 
`getStateStore()` and `schedule()` methods which I think we should not 
expose.


The other interesting question is about "what record gets passed in". 
For the PAPI, passing in the Processor's input record make a lot of 
sense. However, for DSL operators, I am not 100% sure? The DSL often 
uses internal types not exposed to the user, and thus I am not sure if 
users could write useful code for this case? -- In general, I still 
agree that the handler should be implement with a try-catch around 
`Processor.process()` but it might not be too useful for DSL processor. 
Hence, I am wondering if we need to so something more in the DSL? I 
don't have a concrete proposal (a few high level ideas only) and if we 
don't do anything special for the DSL I am ok with moving forward with 
this KIP as-is, but we should be aware of potential limitations for DSL 
users. We can always do a follow up KIP to close gaps when we understand 
the impact better -- covering the DSL would also expand the scope of 
this KIP significantly...


About the metric: just to double check. Do we think it's worth to add a 
new metric? Or could we re-use the existing "dropped record metric"?




-Matthias


On 4/10/24 5:11 AM, Sebastien Viale wrote:

Hi,

You are right, it will simplify types.

We update the KIP

regards

Sébastien *VIALE***

*MICHELIN GROUP* - InfORMATION Technology
*Technical Expert Kafka*

  Carmes / Bâtiment A17 4e / 63100 Clermont-Ferrand

----
*De :* Bruno Cadonna 
*Envoyé :* mercredi 10 avril 2024 10:38
*À :* dev@kafka.apache.org 
*Objet :* [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing
Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Loïc, Damien, and Sébastien,

Great that we are converging!


3.
Damien and Loïc, I think in your examples the handler will receive
Record because an Record is passed to
the processor in the following code line:
https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152
 
<https://github.com/apache/kafka/blob/044d058e03bde8179ed57994d0a77ae9bff9fc10/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java#L152>

I see that we do not need to pass into the the handler a Record just because we do that for the DeserializationExceptionHandler
and the ProductionEx

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-10 Thread Bruno Cadonna

Hi Nick,

Thanks for reacting on my comments so quickly!


2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the task is not 
assigned locally, we do not create those tasks. To get the offsets with 
your approach, we would need to either create kind of inactive tasks 
besides active and standby tasks or store and manage state managers of 
non-assigned tasks differently than the state managers of assigned 
tasks. Additionally, the cleanup thread that removes unassigned task 
directories needs to concurrently delete those inactive tasks or 
task-less state managers of unassigned tasks. This seems all quite messy 
to me.
Could we create those state managers (or state stores) for locally 
existing but unassigned tasks on demand when 
TaskManager#getTaskOffsetSums() is executed? Or have a different 
encapsulation for the unused task directories?



Best,
Bruno



On 4/10/24 11:31 AM, Nick Telford wrote:

Hi Bruno,

Thanks for the review!

1, 4, 5.
Done

3.
You're right. I've removed the offending paragraph. I had originally
adapted this from the guarantees outlined in KIP-892. But it's difficult to
provide these guarantees without the KIP-892 transaction buffers. Instead,
we'll add the guarantees back into the JavaDoc when KIP-892 lands.

2.
Good point! This is the only part of the KIP that was (significantly)
changed when I extracted it from KIP-892. My prototype currently maintains
this "cache" of changelog offsets in .checkpoint, but doing so becomes very
messy. My intent with this change was to try to better encapsulate this
offset "caching", especially for StateStores that can cheaply provide the
offsets stored directly in them without needing to duplicate them in this
cache.

It's clear some more work is needed here to better encapsulate this. My
immediate thought is: what if we construct *but don't initialize* the
StateManager and StateStores for every Task directory on-disk? That should
still be quite cheap to do, and would enable us to query the offsets for
all on-disk stores, even if they're not open. If the StateManager (aka.
ProcessorStateManager/GlobalStateManager) proves too expensive to hold open
for closed stores, we could always have a "StubStateManager" in its place,
that enables the querying of offsets, but nothing else?

IDK, what do you think?

Regards,

Nick

On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna  wrote:


Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the
"get"-prefix for getters on the public API. Could you please change
getCommittedOffsets() to committedOffsets()?


2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read
offsets of tasks the stream thread does not own but that have a state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not own a task it
does also not create any state stores for the task, which means there is
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all
state stores on close -- not only for the RocksDBStore -- and that this
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks
that have a state directory on the client but are not currently assigned
to any stream thread of the Streams client.


3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore)
*MUST* be available to readers, even after a restart."

This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee
that the in-memory structures of the state store (e.g. memtable in the
case of RocksDB) are flushed so that the records and the committed
offsets are persisted.


4.
The wrapper that provides the legacy checkpointing behavior is actually
an implementation detail. I would remove it from the KIP, but still
state that the legacy checkpointing behavior will be supported when the
state store does not manage the checkpoints.


5.
Regarding the metrics, could you please add the tags, and the recording
level (DEBUG or INFO) as done in KIP-607 or KIP-444.


Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:

Hi everyone,

Based on some offline discussion, I've split out the "Atomic

Checkpointing"

section from KIP-892: Transactional Semantics for StateStores, into its

own

KIP

KIP-1035: StateStore managed changelog offsets


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets


While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892,

and

a hard dependenc

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-10 Thread Bruno Cadonna

Hi Walker,

Thanks for the updates!


(1) While I like naming the methods differently, I have also to say that 
I do not like addIsomorphicGlobalStore() because it does not really tell 
what the method does. I could also not come up with a better name than 
addGlobalStoreWithReprocessingOnRestore(). However, I had two ideas on 
which I would like to have your opinion.


(a) Add a new GlobalStoreBuilder in which users can set if the global 
state store should reprocess on restore. Additionally, to the option to 
enable or disable reprocessing on restore, you could also NOT offer a 
way to enable or disable logging in the GlobalStoreBuilder. Currently, 
if users enable logging for a store builder that they pass into 
addGlobalStore(), Kafka Streams needs to explicitly disable it again, 
which is not ideal.


(b) Add a new GlobalProcessorSupplier in which users can set if the 
global state store should reprocess on restore. Another ugliness that 
could be fixed with this is passing Void, Void to ProcessorSupplier. The 
GlobalProcessorSupplier would just have two type parameters . 
The nice aspect of this idea is that the option to enable/disable 
reprocessing on restore is only needed when a processor supplier is 
passed into the methods. That is not true for idea (a).



(2) Yes, that was my intent.


Best,
Bruno

On 4/9/24 9:33 PM, Walker Carlson wrote:

Hey all,

(1) no I hadn't considered just naming the methods differently. I actually
really like this idea and am for it. Except we need 3 different methods
now. One for no processor, one for a processor that should restore and one
that reprocesses. How about `addCustomGlobalStore` and
`addIsomorphicGlobalStore` and then just `addGlobalStateStore` for the no
processor case? If everyone likes that I can add that to the KIP and rename
the methods.

(2) we can have the the built in case use StoreBuilder and manually check for the TimestampedKeyValueStore. That is
fine with me.

Bruno I hope that was what you were intending.

(3) For the scala api, do we need to make it match the java api or are we
just making the minimum changes? as if we take point 1 I don't know how
much we need to change.

Thanks,
Walker


On Tue, Apr 2, 2024 at 8:38 AM Matthias J. Sax  wrote:


One more thing:

I was just looking into the WIP PR, and it seems we will also need to
change `StreamsBuilder.scala`. The KIP needs to cover this changes as well.


-Matthias

On 4/1/24 10:33 PM, Bruno Cadonna wrote:

Hi Walker and Matthias,

(2)
That is exactly my point about having a compile time error versus a
runtime error. The added flexibility as proposed by Matthias sounds good
to me.

Regarding the Named parameter, I was not aware that the processor that
writes records to the global state store is named according to the name
passed in by Consumed. I thought Consumed strictly specifies the names
of source processors. So I am fine with not having an overload with a
Named parameter.

Best,
Bruno

On 3/31/24 11:30 AM, Matthias J. Sax wrote:

Two more follow up thoughts:

(1) I am still not a big fan of the boolean parameter we introduce.
Did you consider to use different method names, like
`addReadOnlyGlobalStore()` (for the optimized method, that would not
reprocess data on restore), and maybe add `addModifiableGlobalStore()`
(not a good name, but we cannot re-use existing `addGlobalStore()` --
maybe somebody else has a good idea about a better `addXxxGlobalStore`
that would describe it well).

(2) I was thinking about Bruno's comment to limit the scope the store
builder for the optimized case. I think we should actually do
something about it, because in the end, the runtime (ie, the
`Processor` we hard wire) would need to pick a store it supports and
cast to the corresponding store? If the cast fails, we hit a runtime
exception, but by putting the store we cast to into the signature we
can actually convert it into a compile time error what seems better.
-- If we want, we could make it somewhat flexible and support both
`KeyValueStore` and `TimestampedKeyValueStore` -- ie, the signature
would be `KeyValueStore` but we explicitly check if the builder gives
us a `TimestampedKeyValueStore` instance and use it properly.

If putting the signature does not work for some reason, we should at
least clearly call it out in the JavaDocs what store type is expected.



-Matthias



On 3/28/24 5:05 PM, Walker Carlson wrote:

Hey all,

Thanks for the feedback Bruno, Almog and Matthias!

Almog: I like the idea, but I agree with Matthais. I actually looked at
that ticket a bit when doing this and found that while similar they are
actually pretty unrelated codewise. I would love to see it get taken
care
of.

Bruno and Matthias: The Named parameter doesn't really make sense to
me to
put it here. The store in the Store builder is already named through
what
Matthais described and the processor doesn't actually have a name. That
would be the processor node that gets named via the Named parameter

(in

the DSL

Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-10 Thread Bruno Cadonna
mer, measuring the number of calls to the
process handler. That's a good point, having the information at the task
level could be beneficial. I updated the KIP to change the metric level
and to clarify the wording.


5.
What do you think about naming the handler ProcessingExceptionHandler
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler
also use the noun of the action in their name and not the verb.


Good catch, I updated the KIP to rename it ProcessingExceptionHandler.


6.
What record is exactly passed to the handler?
Is it the input record to the task? Is it the input record to the
processor node? Is it the input record to the processor?


The input record of the processor. I assume that is the most user
friendly record in this context.


7.
Could you please add the packages of the Java classes/interfaces/enums
you want to add?


Done, without any surprises: package org.apache.kafka.streams.errors;


Thanks a lot for your reviews! Cheers,
Damien
This email was screened for spam and malicious content but exercise caution 
anyway.




On Tue, 9 Apr 2024 at 18:04, Bill Bejeck 
mailto:bbej...@gmail.com>> wrote:


Hi Damien, Sebastien and Loic,

Thanks for the KIP, this is a much-needed addition.
I like the approach of getting the plumbing in for handling processor
errors, allowing users to implement more complex solutions as needed.

Overall how where the KIP Is now LGTM, modulo outstanding comments. I
think adding the example you included in this thread to the KIP is a great
idea.

Regarding the metrics, I'm thinking along the same lines as Bruno. I'm
wondering if we can make do with a task-level metric at the INFO level and
the processor metric at DEBUG. IMHO, when it comes to tracking exceptions
in processing, these two areas are where users will want to focus, higher
level metrics wouldn't be as useful in this case.

Thanks,
Bill

On Tue, Apr 9, 2024 at 6:54 AM Bruno Cadonna 
mailto:cado...@apache.org>> wrote:


Hi again,

I have additional questions/comments.

6.
What record is exactly passed to the handler?
Is it the input record to the task? Is it the input record to the
processor node? Is it the input record to the processor?


7.
Could you please add the packages of the Java classes/interfaces/enums
you want to add?


Best,
Bruno


On 4/9/24 10:17 AM, Bruno Cadonna wrote:

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams
concepts you developed for kstreamplify so that everybody can take
advantage from your improvements.

I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which
basically is the processor node name -- in the ProcessingContext
interface. I think the processor node ID fits well in the
ProcessingContext interface since it already contains application ID

and

task ID and it would make the API for the handler cleaner.


3.
Could you elaborate -- maybe with an example -- when a record is in a
state in which it cannot be serialized? This is not completely clear to

me.



4.
Regarding the metrics, it is not entirely clear to me what the metric
measures. Is it the number of calls to the process handler or is it the
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to
put the task-level metrics to INFO reporting level and remove the
thread-level metric, similar to the dropped-records metric. You can
always roll-up the metrics to the thread level in your preferred
monitoring system. Or do you think we end up with to many metrics?


5.
What do you think about naming the handler ProcessingExceptionHandler
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler
also use the noun of the action in their name and not the verb.


Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface
signature, it was not on our mind that the record be forwarded with
the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people
would need to access the RecordMetadata.



3/ The use of Record is required, as the error could
occurred in the middle of a processor where records could be non
serializable objects

As it is a global error catching, the user may need little
information about the faulty record.

Assuming that users want to make some specific treatments to the
record, they can add a try / catch block in the topology.

It is up to users to cast record value and key in the implementation
of the ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman mailto:sop...@responsive.dev>>
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafk

[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription

2024-04-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-15538:
---

> Client support for java regex based subscription
> 
>
> Key: KAFKA-15538
> URL: https://issues.apache.org/jira/browse/KAFKA-15538
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Phuc Hong Tran
>Priority: Blocker
>  Labels: kip-848-client-support, newbie, regex
> Fix For: 3.8.0
>
>
> When using subscribe with a java regex (Pattern), we need to resolve it on 
> the client side to send the broker a list of topic names to subscribe to.
> Context:
> The new consumer group protocol uses [Google 
> RE2/J|https://github.com/google/re2j] for regular expressions and introduces 
> new methods in the consumer API to subscribe using a `SubscribePattern`. The 
> subscribe using a java `Pattern` will be still supported for a while but 
> eventually removed.
>  * When the subscribe with SubscriptionPattern is used, the client should 
> just send the regex to the broker and it will be resolved on the server side.
>  * In the case of the subscribe with Pattern, the regex should be resolved on 
> the client side.
> As part of this task, we should re-enable all integration tests defined in 
> the PlainTextAsyncConsumer that relate to subscription with pattern and that 
> are currently disabled for the new consumer + new protocol



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-04-09 Thread Bruno Cadonna

Hi Nick,

Thanks for breaking out the KIP from KIP-892!

Here a couple of comments/questions:

1.
In Kafka Streams, we have a design guideline which says to not use the 
"get"-prefix for getters on the public API. Could you please change 
getCommittedOffsets() to committedOffsets()?



2.
It is not clear to me how TaskManager#getTaskOffsetSums() should read 
offsets of tasks the stream thread does not own but that have a state 
directory on the Streams client by calling 
StateStore#getCommittedOffsets(). If the thread does not own a task it 
does also not create any state stores for the task, which means there is 
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is written for all 
state stores on close -- not only for the RocksDBStore -- and that this 
checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks 
that have a state directory on the client but are not currently assigned 
to any stream thread of the Streams client.



3.
In the javadocs for commit() you write

"... all writes since the last commit(Map), or since init(StateStore) 
*MUST* be available to readers, even after a restart."


This is only true for a clean close before the restart, isn't it?
If the task fails with a dirty close, Kafka Streams cannot guarantee 
that the in-memory structures of the state store (e.g. memtable in the 
case of RocksDB) are flushed so that the records and the committed 
offsets are persisted.



4.
The wrapper that provides the legacy checkpointing behavior is actually 
an implementation detail. I would remove it from the KIP, but still 
state that the legacy checkpointing behavior will be supported when the 
state store does not manage the checkpoints.



5.
Regarding the metrics, could you please add the tags, and the recording 
level (DEBUG or INFO) as done in KIP-607 or KIP-444.



Best,
Bruno

On 4/7/24 5:35 PM, Nick Telford wrote:

Hi everyone,

Based on some offline discussion, I've split out the "Atomic Checkpointing"
section from KIP-892: Transactional Semantics for StateStores, into its own
KIP

KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets

While KIP-892 was adopted *with* the changes outlined in KIP-1035, these
changes were always the most contentious part, and continued to spur
discussion even after KIP-892 was adopted.

All the changes introduced in KIP-1035 have been removed from KIP-892, and
a hard dependency on KIP-1035 has been added to KIP-892 in their place.

I'm hopeful that with some more focus on this set of changes, we can
deliver something that we're all happy with.

Regards,
Nick



Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi again,

I have additional questions/comments.

6.
What record is exactly passed to the handler?
Is it the input record to the task? Is it the input record to the 
processor node? Is it the input record to the processor?



7.
Could you please add the packages of the Java classes/interfaces/enums 
you want to add?



Best,
Bruno


On 4/9/24 10:17 AM, Bruno Cadonna wrote:

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface 
signature, it was not on our mind that the record be forwarded with 
the ProcessorContext.


    The ProcessingContext is sufficient, we do expect that most people 
would need to access the RecordMetadata.




3/ The use of Record is required, as the error could 
occurred in the middle of a processor where records could be non 
serializable objects


  As it is a global error catching, the user may need little 
information about the faulty record.


  Assuming that users want to make some specific treatments to the 
record, they can add a try / catch block in the topology.


  It is up to users to cast record value and key in the implementation 
of the ProcessorExceptionHandler.




Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception 
handler for exceptions occuring during processing


Warning External sender Do not click on any links or open any 
attachments unless you trust the sender and know the content is safe.


Hi Damien,

First off thanks for the KIP, this is definitely a much needed 
feature. On

the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct 
where

the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context 
of the

processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside 
whether

that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API 
here,
so I'll hold off until you clarify whether you even want forwarding or 
not.
We would also need to split the input record into a Record vs 
FixedKeyRecord


3. One notable difference between th

Re: [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-04-09 Thread Bruno Cadonna

Hi Loïc, Damien, and Sébastien,

Thanks for the KIP!
I find it really great that you contribute back to Kafka Streams 
concepts you developed for kstreamplify so that everybody can take 
advantage from your improvements.


I have a couple of questions/comments:

1. and 2.
I am wondering whether we should expose the processor node ID -- which 
basically is the processor node name -- in the ProcessingContext 
interface. I think the processor node ID fits well in the 
ProcessingContext interface since it already contains application ID and 
task ID and it would make the API for the handler cleaner.



3.
Could you elaborate -- maybe with an example -- when a record is in a 
state in which it cannot be serialized? This is not completely clear to me.



4.
Regarding the metrics, it is not entirely clear to me what the metric 
measures. Is it the number of calls to the process handler or is it the 
number of calls to process handler that returned FAIL?
If it is the former, I was also wondering whether it would be better to 
put the task-level metrics to INFO reporting level and remove the 
thread-level metric, similar to the dropped-records metric. You can 
always roll-up the metrics to the thread level in your preferred 
monitoring system. Or do you think we end up with to many metrics?



5.
What do you think about naming the handler ProcessingExceptionHandler 
instead of ProcessExceptionHandler?
The DeserializationExceptionHanlder and the ProductionExceptionHandler 
also use the noun of the action in their name and not the verb.



Best,
Bruno


On 4/8/24 3:48 PM, Sebastien Viale wrote:

Thanks for your review!

  All the points make sense for us!



We updated the KIP for points 1 and 4.



2/ We followed the DeserializationExceptionHandler interface signature, it was 
not on our mind that the record be forwarded with the ProcessorContext.

The ProcessingContext is sufficient, we do expect that most people would 
need to access the RecordMetadata.



3/ The use of Record is required, as the error could occurred 
in the middle of a processor where records could be non serializable objects

  As it is a global error catching, the user may need little information about 
the faulty record.

  Assuming that users want to make some specific treatments to the record, they 
can add a try / catch block in the topology.

  It is up to users to cast record value and key in the implementation of the 
ProcessorExceptionHandler.



Cheers

Loïc, Damien and Sébastien


De : Sophie Blee-Goldman 
Envoyé : samedi 6 avril 2024 01:08
À : dev@kafka.apache.org 
Objet : [EXT] Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for 
exceptions occuring during processing

Warning External sender Do not click on any links or open any attachments 
unless you trust the sender and know the content is safe.

Hi Damien,

First off thanks for the KIP, this is definitely a much needed feature. On
the
whole it seems pretty straightforward and I am in favor of the proposal.
Just
a few questions and suggestions here and there:

1. One of the #handle method's parameters is "ProcessorNode node", but
ProcessorNode is an internal class (and would expose a lot of internals
that we probably don't want to pass in to an exception handler). Would it
be sufficient to just make this a String and pass in the processor name?

2. Another of the parameters in the ProcessorContext. This would enable
the handler to potentially forward records, which imo should not be done
from the handler since it could only ever call #forward but not direct where
the record is actually forwarded to, and could cause confusion if users
aren't aware that the handler is effectively calling from the context of the
processor that threw the exception.
2a. If you don't explicitly want the ability to forward records, I would
suggest changing the type of this parameter to ProcessingContext, which
has all the metadata and useful info of the ProcessorContext but without
the
forwarding APIs. This would also lets us sidestep the following issue:
2b. If you *do* want the ability to forward records, setting aside whether
that
in of itself makes sense to do, we would need to pass in either a regular
ProcessorContext or a FixedKeyProcessorContext, depending on what kind
of processor it is. I'm not quite sure how we could design a clean API here,
so I'll hold off until you clarify whether you even want forwarding or not.
We would also need to split the input record into a Record vs FixedKeyRecord

3. One notable difference between this handler and the existing ones you
pointed out, the Deserialization/ProductionExceptionHandler, is that the
records passed in to those are in serialized bytes, whereas the record
here would be POJOs. You account for this by making the parameter
type a Record, but I just wonder how users would be
able to read the key/value and figure out what type it should be. For
example, would they need to maintain a map from 

Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-04-01 Thread Bruno Cadonna
n instead of boolean flag

reprocessOnRestore?

Yes, this is exactly the type of thing I was musing (but I don't 
have any

concrete suggestions). It feels like that would give the flexibility to

do

things like the motivation section of the KIP (allow bulk loading of
records without reprocessing) while also solving other limitations.

I'm supportive of the KIP as-is but was hoping somebody with more
experience would have a sudden inspiration for how to solve both issues
with one API! Anyway, I'll slide back into the lurking shadows for now

and

let the discussion continue :)

Cheers,
Almog

On Tue, Mar 26, 2024 at 4:22 AM Bruno Cadonna 

wrote:



Hi Almog,

Do you mean a API to configure restoration instead of boolean flag
reprocessOnRestore?

Do you already have an idea?

The proposal in the KIP is focused on the processor that updates the
global state whereas in the case of GlobalKTable and source KTable the
issues lies in the deserialization of records from the input 
topics, but

only if the deserialization error handler is configured to drop the
problematic record. Additionally, for source KTable the source topic
optimization must be turned on to run into the issue. I am 
wondering how

a unified API for global stores, GlobalKTable, and source KTable might
look like.

While it is an interesting question, I am in favor of deferring 
this to

a separate KIP.

Best,
Bruno

On 3/26/24 12:49 AM, Almog Gavra wrote:

Hello Folk!

Glad to see improvements to the GlobalKTables in discussion! I think

they

deserve more love :)

Scope creep alert (which I'm generally against and certainly still

support
this KIP without but I want to see if there's an elegant way to 
address

both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier"

which

I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the 
source-changelog

optimization. Since this API could be used to signal "I want to

reprocess
on restore" I'm wondering whether it makes sense to design this 
API in

a
way that could be extended for KTables as well so a fix for 
KAFKA-8037

would be possible with the same mechanism. Thoughts?

Cheers,
Almog

On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
 wrote:


Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't

match

the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined 
processors but

then
they might as well use the `globalTable` method. I think the add 
state

store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna 

wrote:



Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code correctly.
I would avoid using the same name for the source node and the state
store, because it limits the flexibility in naming. Why do you not

use

Named for the name of the source node?

2.
Did you consider Matthias' proposal to restrict the type of the 
store

builder to `StoreBuilder` (or even
`StoreBuilder`) for the case

where

the processor is built-in?


Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the 
major

points

as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling

the
restore behavior. It seems we can all agree with how the API 
should

look
with the default option we are adding. I think keeping the 
option to

load

directly from the topic into the store is a good idea. It is much

more

performant and could make a simple metric collector processor much

simpler.


I think something that Matthais said about creating a special 
class

of

processors for the global stores helps me think about the issue. I

tend

to
fall into the category that we should keep global stores open 
to the
possibility of having child nodes in the future. I don't really 
see

the
downside of having that as an option. It might not be best for 
a lot

of
cases, but something simple could be very useful to put in the 
PAPI.


I like the idea of having a `GlobalStoreParameters` but only if we

decide

to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean

flag

for

the reprocessOnRestore option. I expanded the description in the

docs

so

I

hope that helps.

I am mo

Re: [ANNOUNCE] New committer: Christo Lolov

2024-03-26 Thread Bruno Cadonna

Congrats, Christo!

Well deserved!

Best,
Bruno

On 3/26/24 1:04 PM, Luke Chen wrote:

Hi, Everyone,

The PMC of Apache Kafka is pleased to announce a new Kafka committer:
Christo Lolov.

Christo has been a Kafka contributor since 2021. He has made over 50
commits. He authored KIP-902, KIP-963, and KIP-1005, as well as many tiered
storage related tasks. He also co-drives the migration from EasyMock to
Mockito and from Junit 4 to JUnit 5.

Congratulations, Christo!

Thanks,
Luke (on behalf of the Apache Kafka PMC)



Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-26 Thread Bruno Cadonna

Hi Almog,

Do you mean a API to configure restoration instead of boolean flag 
reprocessOnRestore?


Do you already have an idea?

The proposal in the KIP is focused on the processor that updates the 
global state whereas in the case of GlobalKTable and source KTable the 
issues lies in the deserialization of records from the input topics, but 
only if the deserialization error handler is configured to drop the 
problematic record. Additionally, for source KTable the source topic 
optimization must be turned on to run into the issue. I am wondering how 
a unified API for global stores, GlobalKTable, and source KTable might 
look like.


While it is an interesting question, I am in favor of deferring this to 
a separate KIP.


Best,
Bruno

On 3/26/24 12:49 AM, Almog Gavra wrote:

Hello Folk!

Glad to see improvements to the GlobalKTables in discussion! I think they
deserve more love :)

Scope creep alert (which I'm generally against and certainly still support
this KIP without but I want to see if there's an elegant way to address
both problems). The KIP mentions that "Now the restore is done by
reprocessing using an instance from the customer processor supplier" which
I suppose fixed a long-standing bug (
https://issues.apache.org/jira/browse/KAFKA-8037) but only for
GlobalKTables and not for normal KTables that use the source-changelog
optimization. Since this API could be used to signal "I want to reprocess
on restore" I'm wondering whether it makes sense to design this API in a
way that could be extended for KTables as well so a fix for KAFKA-8037
would be possible with the same mechanism. Thoughts?

Cheers,
Almog

On Mon, Mar 25, 2024 at 11:06 AM Walker Carlson
 wrote:


Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't match
the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but then
they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna  wrote:


Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code correctly.
I would avoid using the same name for the source node and the state
store, because it limits the flexibility in naming. Why do you not use
Named for the name of the source node?

2.
Did you consider Matthias' proposal to restrict the type of the store
builder to `StoreBuilder` (or even
`StoreBuilder`) for the case where
the processor is built-in?


Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major

points

as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should

look

with the default option we are adding. I think keeping the option to

load

directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much

simpler.


I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend

to

fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we

decide

to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag

for

the reprocessOnRestore option. I expanded the description in the docs

so

I

hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker









Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-26 Thread Bruno Cadonna

Hi Walker,

I have follow-up comments.

1.
I think, we should add an overload to the StreamsBuilder class that 
allows to name the processor with Named. That makes that processor 
consistent with all other processors in the DSL regarding naming.


2.
I do not understand what you mean with "maximum flexibility". The 
built-in processor needs to assume a given state store interface. That 
means, users have to provide a state store that offers that interface. 
If they do not they will get a runtime exception. If we require a store 
builder for a given interface, we can catch the mistake at compile time. 
Let me know whether I misunderstood something.


Best,
Bruno


On 3/25/24 7:05 PM, Walker Carlson wrote:

Hey Bruno,

1) I'm actually not sure why that is in there. It certainly doesn't match
the convention. Best to remove it and match the other methods.

2) Yeah, I thought about it but I'm not convinced it is a necessary
restriction. It might be useful for the already defined processors but then
they might as well use the `globalTable` method. I think the add state
store option should go for maximum flexibility.

Best,
Walker



On Fri, Mar 22, 2024 at 10:01 AM Bruno Cadonna  wrote:


Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I
understand the code correctly.
I would avoid using the same name for the source node and the state
store, because it limits the flexibility in naming. Why do you not use
Named for the name of the source node?

2.
Did you consider Matthias' proposal to restrict the type of the store
builder to `StoreBuilder` (or even
`StoreBuilder`) for the case where
the processor is built-in?


Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major

points

as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should look
with the default option we are adding. I think keeping the option to load
directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much

simpler.


I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend

to

fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag

for

the reprocessOnRestore option. I expanded the description in the docs so

I

hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker







Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-22 Thread Bruno Cadonna

Hi Walker,

A couple of follow-up questions.

1.
Why do you propose to explicitly pass a parameter "storeName" in 
StreamsBuilder#addGlobalStore?
The StoreBuilder should already provide a name for the store, if I 
understand the code correctly.
I would avoid using the same name for the source node and the state 
store, because it limits the flexibility in naming. Why do you not use 
Named for the name of the source node?


2.
Did you consider Matthias' proposal to restrict the type of the store 
builder to `StoreBuilder` (or even 
`StoreBuilder`) for the case where 
the processor is built-in?



Best,
Bruno

On 3/13/24 11:05 PM, Walker Carlson wrote:

Thanks for the feedback Bruno, Matthias, and Lucas!

There is a decent amount but I'm going to try and just hit the major points
as I would like to keep this change simple.

I've made corrections for the mistakes pointed out. Thanks for the
suggestions everyone.

The main sticking point seems to be with the method of signalling the
restore behavior. It seems we can all agree with how the API should look
with the default option we are adding. I think keeping the option to load
directly from the topic into the store is a good idea. It is much more
performant and could make a simple metric collector processor much simpler.

I think something that Matthais said about creating a special class of
processors for the global stores helps me think about the issue. I tend to
fall into the category that we should keep global stores open to the
possibility of having child nodes in the future. I don't really see the
downside of having that as an option. It might not be best for a lot of
cases, but something simple could be very useful to put in the PAPI.

I like the idea of having a `GlobalStoreParameters` but only if we decide
to make the processor need to extend an interface like
'GobalStoreProcessor`. If not that seems excessive.

As of right now I don't see a better option than having a boolean flag for
the reprocessOnRestore option. I expanded the description in the docs so I
hope that helps.

I am more than willing to take other ideas on it.

thanks,
Walker



Re: [DISCUSS] Apache Kafka 3.6.2 release

2024-03-13 Thread Bruno Cadonna

Thanks Manikumar!

+1

Best,
Bruno

On 3/13/24 5:56 PM, Josep Prat wrote:

+1 thanks for volunteering!

Best
---

Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Wed, Mar 13, 2024, 17:17 Divij Vaidya  wrote:


+1

Thank you for volunteering.

--
Divij Vaidya



On Wed, Mar 13, 2024 at 4:58 PM Justine Olshan

wrote:


Thanks Manikumar!
+1 from me

Justine

On Wed, Mar 13, 2024 at 8:52 AM Manikumar 
wrote:


Hi,

I'd like to volunteer to be the release manager for a bug fix release

of

the 3.6 line.
If there are no objections, I'll send out the release plan soon.

Thanks,
Manikumar









[jira] [Reopened] (KAFKA-10199) Separate state restoration into separate threads

2024-03-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-10199:
---

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>    Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
> Fix For: 3.7.0
>
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1024: Make the restore behavior of GlobalKTables with custom processors configureable

2024-03-06 Thread Bruno Cadonna

Hi Walker,

Thanks for the KIP!

Great that you are going to fix this long-standing issue!

1.
I was wondering if we need the timestamp extractor as well as the key 
and value deserializer in Topology#addGlobalStore() that do not take a 
ProcessorSupplier? What about Consumed in StreamsBuilder#addGlobalStore()?
Since those methods setup a global state store that does not process any 
records, do they still need to deserialize records and extract 
timestamps? Name might still be needed, right?


2.
From an API point of view, it might make sense to put all 
processor-related arguments into a parameter object. Something like:

GlobalStoreParameters.globalStore().withKeySerde(keySerde).disableReprocessOnRestore()
Just an idea, open for discussion.

3.
Could you please go over the KIP and correct typos and other mistakes in 
the KIP?



Best,
Bruno



On 3/2/24 1:43 AM, Matthias J. Sax wrote:

Thanks for the KIP Walker.

Fixing this issue, and providing users some flexibility to opt-in/out on 
"restore reprocessing" is certainly a good improvement.


 From an API design POV, I like the idea to not require passing in a 
ProcessorSupplier to begin with. Given the current implementation of the 
restore process, this might have been the better API from the beginning 
on... Well, better late than never :)


For this new method w/o a supplier, I am wondering if we want to keep 
`addGlobalStore` or name it `addGlobalReadOnlyStore` -- we do a similar 
thing via KIP-813. Just an idea.


However, I am not convinced that adding a new boolean parameter is the 
best way to design the API. Unfortunately, I don't have any elegant 
proposal myself. Just a somewhat crazy idea to do a larger API change:


Making a step back, a global store, is by definition a terminal node -- 
we don't support to add child nodes. Hence, while we expose a full 
`ProcessorContext` interface, we actually limit what functionality it 
supports. Thus, I am wondering if we should stop using the generic 
`Processor` interface to begin with, but design a new one which is 
tailored to the needs of global stores? -- This would of course be of 
much larger scope than originally intended by this KIP, but it might be 
a great opportunity to kill two birds with one stone?


The only other question to consider is: do we believe that global stores 
will never have child nodes, or could we actually allow for child nodes 
in the future? If yes, it might not be smart to move off using 
`Processor` interface In general, I could imagine, especially as we 
now want to support "process on restore" to allow simple stateless 
operators like `map()` or `filter()` on a `GlobalTable` (or allow to add 
custom global processors) at some point in the future?


Just wanted to put this out to see what people think...


-Matthias


On 2/29/24 1:26 PM, Walker Carlson wrote:

Hello everybody,

I wanted to propose a change to our addGlobalStore methods so that the
restore behavior can be controlled on a preprocessor level. This should
help Kafka Stream users to better tune Global stores added with the
processor API to better fit their needs.

Details are in the kip here: https://cwiki.apache.org/confluence/x/E4t3EQ

Thanks,
Walker



Re: Subscribe to Developer mailing list

2024-03-04 Thread Bruno Cadonna

Hi LoÏC,

subscription to the mailing lists is self-service. See details under 
https://kafka.apache.org/contact


Best,
Bruno

On 2/29/24 9:48 AM, Loic Greffier wrote:

Hi @dev@kafka.apache.org ,

I am working as a Software Engineer at Michelin, and would like to 
subscribe to the Developer mailing list to be able to open a KIP and 
contribute to Apache Kafka.


LoÏC GREFFIER

*/GROUPE MICHELIN – /*/Development Technology Specialist « *DOTI/BS/SMI* »/

//

noun_94615_cc_blue8, Rue de la Grolière | R1-2 | 63100 Clermont-Ferrand 
Cedex 09 | France


loic.greff...@michelin.com 



Re: Permissions to contribute to Apache Kafka project

2024-02-29 Thread Bruno Cadonna

Hi Paul,

I gave you the permission in JIRA.

Currently, all Apache projects have this issue with the wiki account 
creation. There is a ongoing INFRA ticket about it: 
https://issues.apache.org/jira/browse/INFRA-25451


Sorry about that!

In any case, thank you for your interest in Apache Kafka!

Best,
Bruno

On 2/29/24 10:52 AM, Paul Amar wrote:

Hi there,

My Jira ID is  "paul_amar" and I'd like to ask permissions to contribute to 
Apache Kafka Project.
Meanwhile, I haven't been able to create a wiki ID as it does not allow me to 
create an account.

Greetings and let's collaborate soon.

Paul


PAUL AMAR
Groupe Michelin - Information Technology
Delivery Manager Orchestration - CBS/CORP/SI/BS/SC

Clermont-Ferrand / Les Carmes / A17 4ème étage
paul.a...@michelin.com
Classification : D3
Conservation : 90 jours

Ce courriel peut contenir des informations de nature confidentielle et 
destinées uniquement à l'usage du destinataire. Toute divulgation des 
informations contenues dans ce courriel est strictement interdite et peut être 
contraire à la législation applicable. Si vous avez reçu ce courriel par 
erreur, merci de nous le notifier immédiatement et de détruire ce courriel. Par 
avance merci.

This e-mail may contain confidential information and is intended solely for the 
use of the addressee. Any disclosure of this information is strictly prohibited 
and may be unlawful. If you have received this e-mail by mistake, please notify 
us immediately and delete this e-mail. Thank you in advance.




Re: [ANNOUNCE] Apache Kafka 3.7.0

2024-02-28 Thread Bruno Cadonna

Thanks Stan and all contributors for the release!

Best,
Bruno

On 2/27/24 7:01 PM, Stanislav Kozlovski wrote:

The Apache Kafka community is pleased to announce the release of
Apache Kafka 3.7.0

This is a minor release that includes new features, fixes, and
improvements from 296 JIRAs

An overview of the release and its notable changes can be found in the
release blog post:
https://kafka.apache.org/blog#apache_kafka_370_release_announcement

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.7.0/RELEASE_NOTES.html

You can download the source and binary release (Scala 2.12, 2.13) from:
https://kafka.apache.org/downloads#3.7.0

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you to the following 146 contributors to this release!
(Please report an unintended omission)

Abhijeet Kumar, Akhilesh Chaganti, Alieh, Alieh Saeedi, Almog Gavra,
Alok Thatikunta, Alyssa Huang, Aman Singh, Andras Katona, Andrew
Schofield, Anna Sophie Blee-Goldman, Anton Agestam, Apoorv Mittal,
Arnout Engelen, Arpit Goyal, Artem Livshits, Ashwin Pankaj,
ashwinpankaj, atu-sharm, bachmanity1, Bob Barrett, Bruno Cadonna,
Calvin Liu, Cerchie, chern, Chris Egerton, Christo Lolov, Colin
Patrick McCabe, Colt McNealy, Crispin Bernier, David Arthur, David
Jacot, David Mao, Deqi Hu, Dimitar Dimitrov, Divij Vaidya, Dongnuo
Lyu, Eaugene Thomas, Eduwer Camacaro, Eike Thaden, Federico Valeri,
Florin Akermann, Gantigmaa Selenge, Gaurav Narula, gongzhongqiang,
Greg Harris, Guozhang Wang, Gyeongwon, Do, Hailey Ni, Hanyu Zheng, Hao
Li, Hector Geraldino, hudeqi, Ian McDonald, Iblis Lin, Igor Soarez,
iit2009060, Ismael Juma, Jakub Scholz, James Cheng, Jason Gustafson,
Jay Wang, Jeff Kim, Jim Galasyn, John Roesler, Jorge Esteban Quilcate
Otoya, Josep Prat, José Armando García Sancio, Jotaniya Jeel, Jouni
Tenhunen, Jun Rao, Justine Olshan, Kamal Chandraprakash, Kirk True,
kpatelatwork, kumarpritam863, Laglangyue, Levani Kokhreidze, Lianet
Magrans, Liu Zeyu, Lucas Brutschy, Lucia Cerchie, Luke Chen, maniekes,
Manikumar Reddy, mannoopj, Maros Orsak, Matthew de Detrich, Matthias
J. Sax, Max Riedel, Mayank Shekhar Narula, Mehari Beyene, Michael
Westerby, Mickael Maison, Nick Telford, Nikhil Ramakrishnan, Nikolay,
Okada Haruki, olalamichelle, Omnia G.H Ibrahim, Owen Leung, Paolo
Patierno, Philip Nee, Phuc-Hong-Tran, Proven Provenzano, Purshotam
Chauhan, Qichao Chu, Matthias J. Sax, Rajini Sivaram, Renaldo Baur
Filho, Ritika Reddy, Robert Wagner, Rohan, Ron Dagostino, Roon, runom,
Ruslan Krivoshein, rykovsi, Sagar Rao, Said Boudjelda, Satish Duggana,
shuoer86, Stanislav Kozlovski, Taher Ghaleb, Tang Yunzi, TapDang,
Taras Ledkov, tkuramoto33, Tyler Bertrand, vamossagar12, Vedarth
Sharma, Viktor Somogyi-Vass, Vincent Jiang, Walker Carlson,
Wuzhengyu97, Xavier Léauté, Xiaobing Fang, yangy, Ritika Reddy,
Yanming Zhou, Yash Mayya, yuyli, zhaohaidao, Zihao Lin, Ziming Deng

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,

Stanislav Kozlovski
Release Manager for Apache Kafka 3.7.0


[jira] [Resolved] (KAFKA-9062) Handle stalled writes to RocksDB

2024-02-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9062.
--
Resolution: Won't Fix

> Handle stalled writes to RocksDB
> 
>
> Key: KAFKA-9062
> URL: https://issues.apache.org/jira/browse/KAFKA-9062
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> RocksDB may stall writes at times when background compactions or flushes are 
> having trouble keeping up. This means we can effectively end up blocking 
> indefinitely during a StateStore#put call within Streams, and may get kicked 
> from the group if the throttling does not ease up within the max poll 
> interval.
> Example: when restoring large amounts of state from scratch, we use the 
> strategy recommended by RocksDB of turning off automatic compactions and 
> dumping everything into L0. We do batch somewhat, but do not sort these small 
> batches before loading into the db, so we end up with a large number of 
> unsorted L0 files.
> When restoration is complete and we toggle the db back to normal (not bulk 
> loading) settings, a background compaction is triggered to merge all these 
> into the next level. This background compaction can take a long time to merge 
> unsorted keys, especially when the amount of data is quite large.
> Any new writes while the number of L0 files exceeds the max will be stalled 
> until the compaction can finish, and processing after restoring from scratch 
> can block beyond the polling interval



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Apache Kafka 3.8.0 release

2024-02-27 Thread Bruno Cadonna

Thanks Josep!

+1

Best,
Bruno

On 2/26/24 9:53 PM, Chris Egerton wrote:

Thanks Josep, I'm +1 as well.

On Mon, Feb 26, 2024 at 12:32 PM Justine Olshan
 wrote:


Thanks Joesp. +1 from me.

On Mon, Feb 26, 2024 at 3:37 AM Josep Prat 
wrote:


Hi all,

I'd like to volunteer as release manager for the Apache Kafka 3.8.0
release.
If there are no objections, I'll start building a release plan (or

adapting

the one Colin made some weeks ago) in the wiki in the next days.

Thank you.

--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |   <

https://www.facebook.com/aivencloud



      <
https://twitter.com/aiven_io>
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B







[jira] [Resolved] (KAFKA-16194) KafkaConsumer.groupMetadata() should be correct when first records are returned

2024-02-21 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16194.
---
Resolution: Fixed

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> ---
>
> Key: KAFKA-16194
> URL: https://issues.apache.org/jira/browse/KAFKA-16194
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>    Assignee: Bruno Cadonna
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
> txnProducer.beginTransaction();
> System.out.println("Begin transactions called");
> consumer.subscribe(Collections.singletonList("input"));
> System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
> ConsumerRecords records = 
> consumer.poll(Duration.ofSeconds(10));
> System.out.println("Returned " + records.count() + " records.");
> // Process and send txn messages.
> for (ConsumerRecord processedRecord : records) {
> txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
> }
> ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
> System.out.println("Group metadata inside test" + groupMetadata);
> Map offsetsToCommit = new HashMap<>();
> for (ConsumerRecord record : records) {
> offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
> new OffsetAndMetadata(record.offset() + 1));
> }
> System.out.println("Offsets to commit" + offsetsToCommit);
> // Send offsets to transaction with ConsumerGroupMetadata.
> txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
> System.out.println("Send offsets to transaction done");
> // Commit the transaction.
> txnProducer.commitTransaction();
> System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
> e.printStackTrace();
> txnProducer.close();
> } catch (KafkaException e) {
> e.printStackTrace();
> txnProducer.abortTransaction();
> } finally {
> txnProducer.close();
> consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16285) Make group metadata available when a new assignment is set in async Kafka consumer

2024-02-20 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16285:
-

 Summary: Make group metadata available when a new assignment is 
set in async Kafka consumer
 Key: KAFKA-16285
 URL: https://issues.apache.org/jira/browse/KAFKA-16285
 Project: Kafka
  Issue Type: Improvement
  Components: clients, consumer
Reporter: Bruno Cadonna


Currently, the new async Kafka consumer sends an event from the background 
thread to the application thread when the group metadata is updated. Group 
metadata is updated when the background thread receives a new assignment. More 
specifically, the member epoch is updated each time a new assignment is 
received and and the member ID is updated with the first assignment. 
In contrast to the group metadata update, the assignment is directly set in the 
subscription without sending an update event from the background thread to the 
application thread. That means that there is a delay between the application 
thread being aware of the update to the assignment and the application thread 
being aware of the update to the group metadata. This behavior differs with 
respect to the legacy consumer were the assignment and the group metadata is 
updated at the same time.
We should make the update to the group metadata available to the application 
thread when the update to the assignment is made available to the application 
thread so that assignment an group metadata are in sync. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: Improve flaky test reporting (KAFKA-12216)

2024-02-12 Thread Bruno Cadonna

Hi David,

I guess you meant to say

"This does not mean that we should NOT continue our effort to reduce the 
number of flaky tests."


I totally agree with what you wrote. I am also +1 on considering all 
failures for unit tests.


Best,
Bruno

On 2/12/24 9:11 AM, David Jacot wrote:

Hi folks,

I have been playing with `reports.junitXml.mergeReruns` setting in gradle
[1]. From the gradle doc:


When mergeReruns is enabled, if a test fails but is then retried and

succeeds, its failures will be recorded as  instead of
, within one . This is effectively the reporting
produced by the surefire plugin of Apache Maven™ when enabling reruns. If
your CI server understands this format, it will indicate that the test was
flaky. If it does not, it will indicate that the test succeeded as it will
ignore the  information. If the test does not succeed (i.e.
it fails for every retry), it will be indicated as having failed whether
your tool understands this format or not.

With this, we get really close to having green builds [2] all the time.
There are only a few tests which are too flaky. We should address or
disable those.

I think that this would help us a lot because it would reduce the noise
that we get in pull requests. At the moment, there are just too many failed
tests reported so it is really hard to know whether a pull request is
actually fine or not.

[1] applies it to both unit and integration tests. Following the discussion
in the `github build queue` thread, it may be better to only apply it to
the integration tests. Being stricter with unit tests would make sense.

This does not mean that we should continue our effort to reduce the number
of flaky tests. For this, I propose to keep using Gradle Entreprise. It
provides a nice report for them that we can leverage.

Thoughts?

Best,
David

[1] https://github.com/apache/kafka/pull/14862
[2]
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14862/19/tests



[jira] [Resolved] (KAFKA-16098) State updater may attempt to resume a task that is not assigned anymore

2024-01-10 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-16098.
---
Resolution: Fixed

> State updater may attempt to resume a task that is not assigned anymore
> ---
>
> Key: KAFKA-16098
> URL: https://issues.apache.org/jira/browse/KAFKA-16098
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Lucas Brutschy
>    Assignee: Bruno Cadonna
>Priority: Major
> Attachments: streams.log.gz
>
>
> A long-running soak test brought to light this `IllegalStateException`:
> {code:java}
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] Thread 
> encountered an error processing soak test 
> (org.apache.kafka.streams.StreamsSoakTest)
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> [2024-01-07 08:54:13,688] ERROR [i-0637ca8609f50425f-StreamThread-1] 
> stream-client [i-0637ca8609f50425f] Encountered the following exception 
> during processing and sent shutdown request for the entire application. 
> (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.StreamsException: 
> java.lang.IllegalStateException: No current assignment for partition 
> network-id-repartition-1
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:729)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645)
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition network-id-repartition-1
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:367)
>     at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.resume(SubscriptionState.java:753)
>     at 
> org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer.resume(LegacyKafkaConsumer.java:963)
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.transitRestoredTaskToRunning(TaskManager.java:857)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleRestoredTasksFromStateUpdater(TaskManager.java:979)
>     at 
> org.apache.kafka.streams.processor.internals.TaskManager.checkStateUpdater(TaskManager.java:791)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.checkStateUpdater(StreamThread.java:1141)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:949)
>     at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
>     ... 1 more {code}
> Log (with some common messages filtered) attached.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Divij Vaidya

2023-12-28 Thread Bruno Cadonna

Congratulations Divij! Well deserved!

Best,
Bruno

On 12/27/23 12:45 PM, Luke Chen wrote:

Hi, Everyone,

Divij has been a Kafka committer since June, 2023. He has remained very
active and instructive in the community since becoming a committer. It's my
pleasure to announce that Divij is now a member of Kafka PMC.

Congratulations Divij!

Luke
on behalf of Apache Kafka PMC



[jira] [Resolved] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid

2023-12-20 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15991.
---
Resolution: Fixed

> Flaky new consumer test testGroupIdNotNullAndValid
> --
>
> Key: KAFKA-15991
> URL: https://issues.apache.org/jira/browse/KAFKA-15991
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>    Assignee: Bruno Cadonna
>Priority: Major
>  Labels: consumer-threading-refactor, flaky-test, kip-848, 
> kip-848-client-support
> Fix For: 3.7.0
>
>
> Fails locally when running it in a loop with it's latest changes from 
> [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.]
>  Failed the build so temporarily disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16017) Checkpointed offset is incorrect when task is revived and restoring

2023-12-15 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-16017:
-

 Summary: Checkpointed offset is incorrect when task is revived and 
restoring 
 Key: KAFKA-16017
 URL: https://issues.apache.org/jira/browse/KAFKA-16017
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.1
Reporter: Bruno Cadonna






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-9545) Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`

2023-12-13 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reopened KAFKA-9545:
--

> Flaky Test `RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted`
> --
>
> Key: KAFKA-9545
> URL: https://issues.apache.org/jira/browse/KAFKA-9545
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jason Gustafson
>Assignee: Ashwin Pankaj
>Priority: Major
>
> https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/4678/testReport/org.apache.kafka.streams.integration/RegexSourceIntegrationTest/testRegexMatchesTopicsAWhenDeleted/
> {code}
> java.lang.AssertionError: Condition not met within timeout 15000. Stream 
> tasks not updated
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:367)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:366)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:337)
>   at 
> org.apache.kafka.streams.integration.RegexSourceIntegrationTest.testRegexMatchesTopicsAWhenDeleted(RegexSourceIntegrationTest.java:224)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-12-12 Thread Bruno Cadonna

Hi Alieh,

I just realized that your KIP uses allKeys() and KIP-997 uses 
withAllKeys() and the current RangeQuery uses withNoBounds(). We should 
agree on one of those.


I am in favor of withAllKeys() but I am also fine with 
withNoKeyBounds(). I just prefer the former because it is more concise.


Best,
Bruno

On 12/12/23 9:08 AM, Bruno Cadonna wrote:

Hi Alieh,

I think using TimestampedRangeQuery to query the latest versions is 
totally fine. If it is not, users will report it and we can add it then.


Best,
Bruno

On 12/11/23 6:22 PM, Alieh Saeedi wrote:

Thank you all.
I decided to remove the ordering from the KIP and maybe move it to the
subsequent KIPs (based on user demand).
I skimmed over the discussion thread, but we still had an open question
about how a user can retrieve the `latest()` values. I think what 
Matthias
suggested (using `TimestampedRangeQuery`) can be the solution. What do 
you

think? Bests,
Alieh

On Wed, Dec 6, 2023 at 1:57 PM Lucas Brutschy
 wrote:


Hi Alieh,

I think we do not have to restrict ourselves too much for the future
and complicate the implementation. The user can always store away and
sort, so we should only provide the ordering guarantee we can provide
efficiently, and we shouldn't restrict our future evolution too much
by this. I think a global ordering by timestamp is sufficient for this
KIP, so I vote for option 2.

Cheers,
Lucas

On Fri, Dec 1, 2023 at 8:45 PM Alieh Saeedi
 wrote:


Hi all,
I updated the KIP based on the changes made in the former KIP 
(KIP-968).

So

with the `ResultOrder` enum, the class `MultiVersionedRangeQuery` had

some

changes both in the defined fields and defined methods.

Based on the PoC PR, what we currently promise in the KIP about 
ordering

seems like a dream. I intended to enable the user to have a global

ordering

based on the key or timestamp (using `orderByKey()` or
`orderByTimestamp()`) and then even have a partial ordering based on 
the

other parameter.  For example, if the user specifies
`orderByKey().withDescendingKey().withAscendingTimestamps()`, then the
global ordering is based on keys in a descending order, and then all 
the
records with the same key are ordered ascendingly based on ts. The 
result

will be something like (k3,v1,t1), (k3,v2,t2), (k2,v2,t3), (k1.v1.t1)
(assuming that k1But in reality, we have limitations for having a global ordering 
based on

keys since we are iterating over the segments in a lazy manner.

Therefore,

when we are processing the current segment, we have no knowledge of the
keys in the next segment.

Now I have two suggestions:
1. Changing the `MultiVersionedRangeQuery` class as follows:

private final ResultOrder *segmentOrder*;
private final contentOrder *segmentContentOrder*; // can be KEY_WISE or
TIMESTAMP_WISE
private final ResultOrder  *keyOrder*;
private final ResultOrder *timestampOrder*;

This way, the global ordering is specified by the `segmentOrder`. It

means

we either show the results from the oldest to the latest segment
(ASCENDING) or from the latest to the oldest segment (DESCENDING).
Then, inside each segment, we guarantee a `segmentContentOrder` 
which can
be `KEY_WISE` or `TIMESTAMP_WISE`. The key order and timestamp order 
are

specified by the `keyOrder` and `timestampOrder` properties,

respectively.
If the content of a segment must be ordered key-wise and then we 
have two

records with the same key (it happens in older segments), then the
`timestampOrder` determines the order between them.

2. We define that global ordering can only be based on timestamps (the
`timestampOrder` property), and if two records have the same timestamp,

the

`keyOrder` determines the order between them.

I think the first suggestion gives more flexibility to the user, but it

is

more complicated. I mean, it needs good Javadocs.

I look forward to your ideas.

Cheers,
Alieh


On Mon, Nov 6, 2023 at 3:08 PM Alieh Saeedi 

wrote:



Thank you, Bruno and Matthias.
I updated the KIP as follows:
1. The one remaining `asOf` word in the KIP is removed.
2. Example 2 is updated. Thanks, Bruno for the correction.

Discussions and open questions
1. Yes, Bruno. We need `orderByKey()` and `orderByTimestamp()` as 
well.
Because the results must have a global ordering. Either based on 
key or

based on ts. For example, we can have
`orderByKey().withDescendingKey().withAscendingTimestamps()`. Then the
global ordering is based on keys in a descending order, and then all

the

records with the same key are ordered ascendingly based on ts. The

result

will be something like (k3,v1,t1), (k3,v2,t2), (k3,v1,t1), (k2,v2,t2),
(k1.v1.t1) (assuming that k1
yet.

Adding a new class or ignoring `latest()` for VersionedRangeQuery and
instead using the `TimestampedRangeQuery` as Matthias suggested.

Cheers,
Alieh

On Sat, Nov 4, 2023 at 1:38 AM Matthias J. Sax 

wrote:



Great discussion. Seems we are making good progress.

I see advantages and disadvantages in splitting out a "single-ts
key-

Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-12-12 Thread Bruno Cadonna
 how common is a key-range in a point in the
past? For this case, using
`MultiVersionedRangeQuery.withKeyRange().from(myTs).to(myTs)` seems
actually not to be a bad UX, and also does not really need to be
explained how to do this (compared to "latest" that required to pass

in

MAX_INT).


If we add a new query type, we avoid both issues (are they actually
issues?) and add some nice syntactic sugar to the API. The question

is,

if it's worth the effort and expanded API surface area?

To summarize:

Add new query type:


// queries latest; returns VersionedRecords
VersionedRangeQuery.withKeyRange(...);

VersionedRangeQuery.withKeyRange(...).asOf(ts);


vs

No new query type:


// queries latest; returns ValueAndTimestamps
TimestampedRangeQuery.withRange(...);

MultiVersionedRangeQuery.withKeyRange(...).from(myTs).to(myTs)




I guess, bottom line, I would be ok with either one and I am actually
not even sure which one I prefer personally. Just wanted to lay out

the

tradeoffs I see. Not sure if three are other considerations that would
tip the scale into either direction?



-Matthias

On 11/3/23 3:43 AM, Bruno Cadonna wrote:

Hi Alieh,

I like the examples!


1.
Some terms like `asOf` in the descriptions still need to be

replaced in

the KIP.


2.
In your last e-mail you state:

"How can a user retrieve the latest value? We have the same issue

with

kip-968 as well."

Why do we have the same issue in KIP-968?
If users need to retrieve the latest value for a specific key, they
should use KIP-960.


3.
Regarding querying the latest version (or an asOf version) of

records

in

a given key range, that is exactly why I proposed to split the query
class. One class would return the latest and the asOf versions

(i.e. a

single version) of records in a key range and the other class would
return all versions in a given time range (i.e. multiple versions)

of

the records in a given key range. The splitting in two classes

avoids

to

specify a time range and latest or a time range and asOf on a given

key

range.

Alternatively, you could keep one class and you could specify that

the

last call wins as you specified for fromTime() and toTime(). For
example, for





MultiVersionedRangeQuery.withLowerKeyBound(k1).fromTime(t1).toTime(t2).latest()


latest() wins. However, how would you interpret





MultiVersionedRangeQuery.withLowerKeyBound(k1).fromTime(t1).latest().toTime(t2)


Is it [t1, t2] or [-INF, t2]?
(I would say the latter, but somebody else would say differently)

The two class solution seems cleaner to me since we do not need to
consider those edge cases.
You could propose both classes in this KIP.


4.
Why do we need orderByKey() and orderByTimestamp()?
Aren't withAscendingKeys(), withDescendingKeys(),
withAscendingTimestamps(), and withDescendingTimestamps()

sufficient?



5.
In example 2, why is

key,value: 2,20, timestamp: 2023-01-10T10:00:00.00Z, valid till:
2023-01-25T10:00:00.00Z

not part of the result?
It is valid from 2023-01-10T10:00:00.00Z to 2023-01-25T10:00:00.00Z
which overlaps with the time range [2023-01-17T10:00:00.00Z,
2023-01-30T10:00:00.00Z] of the query.

(BTW, in the second example, you forgot to add "key" to the output.)


Best,
Bruno


On 10/25/23 4:01 PM, Alieh Saeedi wrote:

Thanks, Matthias and Bruno.
Here is a list of updates:

 1. I changed the variable and method names as I did for

KIP-968, as

 follows:
- "fromTimestamp" -> fromTime
- "asOfTimestamp"-> toTime
- "from(instant)" -> fromTime(instant)"
- asOf(instant)"->toTime(instant)
 2. As Bruno suggested for KIP-968, I added `orderByKey()`,
 `withAscendingKeys()`, and `withAscendingTimestamps()` methods

for

user
 readability.
 3. I updated the "Example" section as well.

Some points:

 1. Even though the kip suggests adding the `get(k

lowerkeybound, k

 upperkeybound, long fromtime, long totime)` method to the
interface, I
 added this method to the `rocksdbversionedstore` class for now.
 2. Matthias, you mentioned a very important point. How can a

user

 retrieve the latest value? We have the same issue with kip-968

as

well.
 Asking a user to call `toTime(max)` violates the API design

rules,

as you
 mentioned. So I think we must have a `latest()` method for both
KIP-968 and
 KIP-969. What do you think about that?


Cheers,
Alieh

On Thu, Oct 12, 2023 at 6:33 AM Matthias J. Sax 

wrote:



Thanks for the update.



To retrieve

  the latest value(s), the user must call just the asOf

method

with

the MAX

  value (asOf(MAX)). The same applies to KIP-968. Do you

think

it is

clumsy,

  Matthias?



Well, in KIP-968 calling `asOf` and passing in a timestamp is

optional,

and default is "latest", right? So while `asOf(MAX)` does the same
thing, practically users would never call `asOf` for a "latest&

Re: [ANNOUNCE] Apache Kafka 3.5.2

2023-12-11 Thread Bruno Cadonna

Congrats, Luke and thanks for running the release!

Best,
Bruno

On 12/11/23 12:34 PM, Luke Chen wrote:

The Apache Kafka community is pleased to announce the release for
Apache Kafka 3.5.2

This is a bugfix release. It contains many bug fixes including
upgrades the Snappy and Rocksdb dependencies.

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.5.2/RELEASE_NOTES.html


You can download the source and binary release from:
https://kafka.apache.org/downloads#3.5.2

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following contributors to this release!

A. Sophie Blee-Goldman, atu-sharm, bachmanity1, Calvin Liu, Chase
Thomas, Chris Egerton, Colin Patrick McCabe, David Arthur, Divij
Vaidya, Federico Valeri, flashmouse, Florin Akermann, Greg Harris,
hudeqi, José Armando García Sancio, Levani Kokhreidze, Lucas Brutschy,
Luke Chen, Manikumar Reddy, Matthias J. Sax, Mickael Maison, Nick
Telford, Okada Haruki, Omnia G.H Ibrahim, Robert Wagner, Rohan, Said
Boudjelda, sciclon2, Vincent Jiang, Xiaobing Fang, Yash Mayya

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!

Regards,
Luke


Re: [ANNOUNCE] Apache Kafka 3.6.1

2023-12-10 Thread Bruno Cadonna

Thanks for managing the release, Mickael!

Best,
Bruno

On 12/8/23 12:44 PM, Luke Chen wrote:

Hi Mickael,

Thanks for running this release!

Luke

On Thu, Dec 7, 2023 at 7:13 PM Mickael Maison  wrote:


The Apache Kafka community is pleased to announce the release for
Apache Kafka 3.6.1

This is a bug fix release and it includes fixes and improvements from 30
JIRAs.

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.6.1/RELEASE_NOTES.html

You can download the source and binary release (Scala 2.12 and Scala 2.13)
from:
https://kafka.apache.org/downloads#3.6.1


---

Apache Kafka is a distributed streaming platform with four core APIs:

** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 39 contributors to this release!
(Please report an unintended omission)

Anna Sophie Blee-Goldman, Arpit Goyal, atu-sharm, Bill Bejeck, Chris
Egerton, Colin P. McCabe, David Arthur, David Jacot, Divij Vaidya,
Federico Valeri, Greg Harris, Guozhang Wang, Hao Li, hudeqi,
iit2009060, Ismael Juma, Jorge Esteban Quilcate Otoya, Josep Prat,
Jotaniya Jeel, Justine Olshan, Kamal Chandraprakash, kumarpritam863,
Levani Kokhreidze, Lucas Brutschy, Luke Chen, Manikumar Reddy,
Matthias J. Sax, Mayank Shekhar Narula, Mickael Maison, Nick Telford,
Philip Nee, Qichao Chu, Rajini Sivaram, Robert Wagner, Sagar Rao,
Satish Duggana, Walker Carlson, Xiaobing Fang, Yash Mayya

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!

Regards,
Mickael





[jira] [Resolved] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer

2023-12-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-14438.
---
Resolution: Fixed

> Throw error when consumer configured with empty/whitespace-only group.id for 
> AsyncKafkaConsumer
> ---
>
> Key: KAFKA-14438
> URL: https://issues.apache.org/jira/browse/KAFKA-14438
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>    Assignee: Bruno Cadonna
>Priority: Blocker
>  Labels: kip-848-client-support, kip-848-e2e, kip-848-preview
> Fix For: 3.7.0
>
>
> Currently, a warning message is logged upon using an empty consumer groupId. 
> In the next major release, we should drop the support of empty ("") consumer 
> groupId.
> cc [~hachikuji]
> See 
> [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer]
>  for more detail.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-997 Support fetch(fromKey, toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and WindowRangeQuery

2023-11-29 Thread Bruno Cadonna

Hi,

Thanks for the updates!


1.
Could you please link the correct ticket in the KIP?

2.
Could you please adapt the motivation section and the title to the 
updated goal of the KIP? There is no fetch() or fetchAll() method in the 
query class.


3.
Could you please add the "// newly added" comment to all parts that were 
newly added? That is methods lowerKeyBound() and upperKeyBound().


4.
We should use a more fluent API as I proposed in my last e-mail:

Here again

WindowRangeQuery.withAllKeys().fromTime(time1).toTime(time2);
WindowRangeQuery.withKey(key1).fromTime(time1).toTime(time2);
WindowRangeQuery.withKeyRange(key1, key2).fromTime(time1).toTime(time2);

5.
We should also consider the order of the results similar as we did in 
KIP-968. Alternatively, we do not guarantee any order and postpone the 
order guarantees to a future KIP.



Best,
Bruno



On 11/17/23 3:11 AM, Matthias J. Sax wrote:

Thanks for the KIP.

Given how `WindowRangeQuery` works right now, it's really time to 
improve it.



1) Agree. It's not clear what will be added right now. I think we should 
deprecate existing `getKey()` w/o an actually replacement? For 
`getFromKey` and `getToKey` we should actually be `lowerKeyBound()` and 
`upperKeyBound()` to align to KIP-969?


Also wondering if we should deprecate existing `withKey()` and 
`withWindowStartRange`? `withKey` only works for SessionStores and 
implements a single-key full-time-range query. Similarly, 
`withWindowStartRange` only works for WindowedStore and implements an 
all-key time-range query. Thus, both are rather special and it seems the 
aim of this KIP is to generalize `WindowRangeQuery` to arbitrary 
key-range/time-range queries?


What raises one question about time-range semantics, given that we query 
windows with different semantics.


  - The current `WindowStore` semantics used for 
`WindowRangeQuery#withWindowStartRange` is considering only the window 
start time, ie, the window-start time must fall into the query 
time-range to be returned.


  - In contrast, `SessionStore` time ranges base on `findSession` use 
earliest-session-end-time and latest-session-end-time and thus implement 
an "window-bounds / search-time-range overlap query".


Is there any concern about semantic differences? I would also be 
possible to use the same semantics for both query types, and maybe even 
let the user pick with semantics they want (let users chose might 
actually be the best thing to do)? -- We can also do this incrementally, 
and limit the scope of this KIP (or keep the full KIP scope but 
implement it incrementally only)


Btw: I think we should not add any ordering at this point, and 
explicitly state that no ordering is guarantee whatsoever at this point.




2) Agreed. We should deprecate `getFromTime` and `getToTime` and add new 
method `fromTime` and `toTime`.




3) About the API. If we move forward with general key-range/time-range I 
agree that a more modular approach might be nice. Not sure right now, 
what the best approach would be for this? Looking into KIP-969, we might 
want to have:


  - static withKeyRange
  - static withLowerKeyBound
  - static withUpperKeyBound
  - static withAllKeys (KIP-969 actually uses `allKeys` ?)
  - fromTime
  - toTime

with default-time range would be "all / unbounded" ?



10: you mentioned that `WindowKeyQuery` functionality can be covered by 
`WindowRangeQuery`. I agree. For this case, it seems we want to 
deprecate `WindowKeyQuery` entirely?




-Matthias

On 11/16/23 1:19 AM, Bruno Cadonna wrote:

Hi Hanyu,

Thanks for the KIP!

1)
Could you please mark the pieces that you want to add to the API in 
the code listing in the KIP? You can add a comment like "// newly 
added" or similar. That would make reading the KIP a bit easier 
because one does not need to compare your code with the code in the 
current codebase.


2)
Could you -- as a side cleanup -- also change the getters to not use 
the get-prefix anymore, please? That was apparently an oversight when 
those methods were added. Although the API is marked as Evolving, I 
think we should still deprecate the getX() methods, since it does not 
cost us anything.


3)
I propose to make the API a bit more fluent. For example, something like

WindowRangeQuery.withKey(key).fromTime(t1).toTime(t2)

and

WindowRangeQuery.withAllKeys().fromTime(t1).toTime(t2)

and

WindowRangeQuery.withKeyRange(key1, key2).fromTime(t1).toTime(t2)

and maybe even in addition to the above add also the option to start 
with the time range


WindowRangeQuery.withWindowStartRange(t1, t2).fromKey(key1).toKey(key2)


4)
Could you also add some usage examples? Alieh did quite a nice job 
regarding usage examples in KIP-986.



Best,
Bruno

On 11/8/23 8:02 PM, Hanyu (Peter) Zheng wrote:

Hello everyone,

I would like to start the discussion for KIP-997: Support fetch(fromKey,
toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and
WindowRan

[jira] [Resolved] (KAFKA-15555) Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()

2023-11-23 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-1.
---
Resolution: Fixed

> Ensure wakeups are handled correctly in PrototypeAsyncConsumer.poll()
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>    Assignee: Bruno Cadonna
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-preview
>
> The implementation of the {{poll()}} method in {{PrototypeAsyncConsumer}} 
> does not disable wakeups in the same manner that {{KafkaConsumer}} does. 
> Investigate how to make the new implementation consistent with the 
> functionality of the existing implementation.
> There was a comment in the code that I plan to remove, but I will leave it 
> here for reference:
> {quote}// TODO: Once we implement poll(), clear wakeupTrigger in a finally 
> block: wakeupTrigger.clearActiveTask();{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-20 Thread Bruno Cadonna

Hi Alieh,

Although, I've already voted, I found a minor miss. You should also add 
a method isDescending() since the results could also be unordered now 
that we agreed that the results are unordered by default. If both -- 
isDescending() and isAscending -- are false neither 
withDescendingTimestamps() nor withAscendingTimestamps() was called.


Best,
Bruno

On 11/17/23 11:25 AM, Alieh Saeedi wrote:

Hi all,
Thank you for the feedback.

So we agreed on no default ordering for keys and TSs. So I must provide
both withAscendingXx() and with DescendingXx() for the class.
Apart from that, I think we can either remove the existing constructor for
the `VersionedRecord` class or follow the `Optional` thing.

Since many hidden aspects of the KIP are quite clear now and we have come
to a consensus about them, I think it 's time to vote ;-)
I look forward to your votes. Thanks a lot.

Cheers,
Alieh

On Fri, Nov 17, 2023 at 2:27 AM Matthias J. Sax  wrote:


Thanks, Alieh.

Overall SGTM. About `validTo` -- wondering if we should make it an
`Optional` and set to `empty()` by default?

I am totally ok with going with the 3-way option about ordering using
default "undefined". For this KIP (as it's all net new) nothing really
changes. -- However, we should amend `RangeQuery`/KIP-985 to align it.

Btw: so far we focused on key-ordering, but I believe the same "ordering
undefined by default" would apply to time-ordering, too? This might
affect KIP-997, too.


-Matthias

On 11/16/23 12:51 AM, Bruno Cadonna wrote:

Hi,

80)
We do not keep backwards compatibility with IQv1, right? I would even
say that currently we do not need to keep backwards compatibility among
IQv2 versions since we marked the API "Evolving" (do we only mean code
compatibility here or also behavioral compatibility?). I propose to try
to not limit ourselves for backwards compatibility that we explicitly
marked as evolving.
I re-read the discussion on KIP-985. In that discussion, we were quite
focused on what the state store provides. I see that for range queries,
we have methods on the state store interface that specify the order, but
that should be kind of orthogonal to the IQv2 query type. Let's assume
somebody in the future adds a state store implementation that is not
order based. To account for use cases where the order does not matter,
this person might also add a method to the state store interface that
does not guarantee any order. However, our range query type is specified
to guarantee order by default. So we need to add something like
withNoOrder() to the query type to allow the use cases that does not
need order and has the better performance in IQ. That does not look very
nice to me. Having the no-order-guaranteed option does not cost us
anything and it keeps the IQv2 interface flexible. I assume we want to
drop the Evolving annotation at some point.
Sorry for not having brought this up in the discussion about KIP-985.

Best,
Bruno





On 11/15/23 6:56 AM, Matthias J. Sax wrote:

Just catching up on this one.


50) I am also in favor of setting `validTo` in VersionedRecord for
single-key single-ts lookup; it seems better to return the proper
timestamp. The timestamp is already in the store and it's cheap to
extract it and add to the result, and it might be valuable information
for the user. Not sure though if we should deprecate the existing
constructor though, because for "latest" it's convenient to have?


60) Yes, I meant `VersionedRecord`. Sorry for the mixup.


80) We did discuss this question on KIP-985 (maybe you missed it
Bruno). It's kinda tricky.

Historically, it seems that IQv1, ie, the `ReadOnlyXxx` interfaces
provide a clear contract that `range()` is ascending and
`reverseRange()` is descending.

For `RangeQuery`, the question is, if we did implicitly inherit this
contract? Our conclusion on KIP-985 discussion was, that we did
inherit it. If this holds true, changing the contract would be a
breaking change (what might still be acceptable, given that the
interface is annotated as unstable, and that IQv2 is not widely
adopted yet). I am happy to go with the 3-option contract, but just
want to ensure we all agree it's the right way to go, and we are
potentially willing to pay the price of backward incompatibility.




Do we need a snapshot semantic or can we specify a weaker but still
useful semantic?


I don't think we necessarily need it, but as pointed out by Lucas, all
existing queries provide it. Overall, my main point is really about
not implementing something "random", but defining a proper binding
contract that allows users to reason about it.

I general, I agree that weaker semantics might be sufficient, but I am
not sure if we can implement anything weaker in a reasonable way?
Happy to be convinced otherwise. (I have some example, that I will
omit for now, as I hope we can actually go with snapshot semantics.)

The RocksDB Snaptshot idea from Lucas sound

Re: [VOTE] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-20 Thread Bruno Cadonna

Thanks Alieh,

I am +1 (binding).

However, although we agreed on not specifying an order of the results by 
default, there is still the following  sentence in the KIP:


"The order of the returned records is by default ascending by timestamp. 
The method withDescendingTimestamps() can reverse the order. Btw, 
withAscendingTimestamps() method can be used for code readability purpose. "


Could you please change it and also fix what Guozhang commented?

Best,
Bruno

On 11/19/23 2:12 AM, Guozhang Wang wrote:

Thanks Alieh,

I read through the wiki page and the DISCUSS thread, all LGTM except a
minor thing in javadoc:

"The query returns the records with a global ascending order of keys.
The records with the same key are ordered based on their insertion
timestamp in ascending order. Both the global and partial ordering are
modifiable with the corresponding methods defined for the class."

Since this KIP is only for a single key, there's no key ordering but
only timestamp ordering right? Maybe the javadoc can be updated
accordingly.

Otherwise, LGTM.

On Fri, Nov 17, 2023 at 2:36 AM Alieh Saeedi
 wrote:


Hi all,
Following my recent message in the discussion thread, I am opening the
voting for KIP-968. Thanks for your votes in advance.

Cheers,
Alieh


Re: [DISCUSS] KIP-997 Support fetch(fromKey, toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and WindowRangeQuery

2023-11-16 Thread Bruno Cadonna

Hi Hanyu,

Thanks for the KIP!

1)
Could you please mark the pieces that you want to add to the API in the 
code listing in the KIP? You can add a comment like "// newly added" or 
similar. That would make reading the KIP a bit easier because one does 
not need to compare your code with the code in the current codebase.


2)
Could you -- as a side cleanup -- also change the getters to not use the 
get-prefix anymore, please? That was apparently an oversight when those 
methods were added. Although the API is marked as Evolving, I think we 
should still deprecate the getX() methods, since it does not cost us 
anything.


3)
I propose to make the API a bit more fluent. For example, something like

WindowRangeQuery.withKey(key).fromTime(t1).toTime(t2)

and

WindowRangeQuery.withAllKeys().fromTime(t1).toTime(t2)

and

WindowRangeQuery.withKeyRange(key1, key2).fromTime(t1).toTime(t2)

and maybe even in addition to the above add also the option to start 
with the time range


WindowRangeQuery.withWindowStartRange(t1, t2).fromKey(key1).toKey(key2)


4)
Could you also add some usage examples? Alieh did quite a nice job 
regarding usage examples in KIP-986.



Best,
Bruno

On 11/8/23 8:02 PM, Hanyu (Peter) Zheng wrote:

Hello everyone,

I would like to start the discussion for KIP-997: Support fetch(fromKey,
toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and
WindowRangeQuery
The KIP can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-997%3A++Support+fetch%28fromKey%2C+toKey%2C+from%2C+to%29+to+WindowRangeQuery+and+unify+WindowKeyQuery+and+WindowRangeQuery

Any suggestions are more than welcome.

Many thanks,
Hanyu

On Wed, Nov 8, 2023 at 10:38 AM Hanyu (Peter) Zheng 
wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-997%3A++Support+fetch%28fromKey%2C+toKey%2C+from%2C+to%29+to+WindowRangeQuery+and+unify+WindowKeyQuery+and+WindowRangeQuery

--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]







Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-16 Thread Bruno Cadonna
bject monitors of our `RocksDB` interface) for the duration
of creating snapshots across all instances - which I think would also
be permissible performance-wise. Snapshots are just a sequence number
and should be pretty lightweight to create (they have, however,
downside when it comes to compaction just like iterators).

With that in mind, I would be in favor of at least exploring the
option of using snapshots for a consistent view here, before dropping
this useful guarantee.

Cheers,
Lucas

On Tue, Nov 14, 2023 at 2:20 PM Bruno Cadonna  wrote:


Hi Alieh,

Regarding the semantics/guarantees of the query type:

Do we need a snapshot semantic or can we specify a weaker but still
useful semantic?

An option could be to guarantee that:
1. the query returns the latest version when the query arrived at the
state store
2. the query returns a valid history, i.e., versions with adjacent and
non-overlapping validity intervals.

I think guarantee 1 is quite obvious. Guarantee 2 maybe needs some
explanation. If we do not avoid writes to the state store during the
processing of interactive queries, it might for example happen that the
latest version in the state store moves to data structures that are
responsible for older versions. In our RocksDB implementation that are
the segments. Thus, it could be that during query processing Kafka
Streams reads the latest value x and encounters again x in a segment
because a processor put a newer version of x in the versioned state
store. A similar scenario might also happen to earlier versions. If
Streams does not account for such cases it could return invalid 
histories.


Maybe such weaker guarantees are enough for most use cases.

You could consider implementing weaker guarantees as I described and if
there is demand propose stricter guarantees in a follow-up KIP.

Maybe there are also other simpler guarantees that make sense.

Best,
Bruno


On 11/9/23 12:30 PM, Bruno Cadonna wrote:

Hi,

Thanks for the updates!

First my take on previous comments:


50)
I am in favor of deprecating the constructor that does not take the
validTo parameter. That implies that I am in favor of modifying 
get(key,

asOf) to set the correct validTo.


60)
I am in favor of renaming ValueIterator to VersionedRecordIterator and
define it as:

public interface VersionedRecordIterator extends
Iterator>

(Matthias, you mixed up ValueAndTimestamp with VersionedRecord in your
last e-mail, didn't you? Just double-checking if I understood what you
are proposing.)


70)
I agree with Matthias that adding a new method on the
VersionedKeyValueStore interface defeats the purpose of one of the 
goals
of IQv2, i.e., not to need to extend the state store interface for 
IQ. I

would say if we do not need the method in normal processing, we should
not extend the public state store interface. BTW, nobody forces you to
StoreQueryUtils. I think that internal utils class was introduced for
convenience to leverage existing methods on the state store interface.


80)
Why do we limit ourselves by specifying a default order on the result?
Different state store implementation might have different strategies to
store the records which affects the order in which the records are
returned if they are not sorted before they are returned to the user.
Some users might not be interested in an order of the result and so
there is no reason those users pay the cost for sorting. I propose to
not specify a default order but sort the results (if needed) when
withDescendingX() and withAscendingX() is specified on the query 
object.



Regarding the snapshot guarantees for the iterators, I need to think a
bit more about it. I will come back to this thread in the next days.


Best,
Bruno


On 11/8/23 5:30 PM, Alieh Saeedi wrote:
Thank you, Bruno and Matthias, for keeping the discussion going and 
for

reviewing the PR.

Here are the KIP updates:

 - I removed the `peek()` from the `ValueIterator` interface since
we do
 not need it.
 - Yes, Bruno, the `validTo` field in the `VersionedRecod` 
class is

 exclusive. I updated the javadocs for that.


Very important critical open questions. I list them here based on
priority
(descendingly).

 - I implemented the `get(key, fromtime, totime)` method here

<https://github.com/aliehsaeedii/kafka/blob/9578b7cb7cdade22cc63f671693f5aeb993937ca/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L262>:
 the problem is that this implementation does not guarantee
consistency
 because processing might continue interleaved (no snapshot
semantics is
 implemented). More over, it materializes all results in memory.
    - Solution 1: Use a lock and release it after retrieving all
desired
    records from all segments.
   - positive point: snapshot semantics is implemented
   - negative points: 1) It is expensive since iterating 
over all

   segments may take a long time. 2) It still requires
mat

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-14 Thread Bruno Cadonna

Hi Alieh,

Regarding the semantics/guarantees of the query type:

Do we need a snapshot semantic or can we specify a weaker but still 
useful semantic?


An option could be to guarantee that:
1. the query returns the latest version when the query arrived at the 
state store
2. the query returns a valid history, i.e., versions with adjacent and 
non-overlapping validity intervals.


I think guarantee 1 is quite obvious. Guarantee 2 maybe needs some 
explanation. If we do not avoid writes to the state store during the 
processing of interactive queries, it might for example happen that the 
latest version in the state store moves to data structures that are 
responsible for older versions. In our RocksDB implementation that are 
the segments. Thus, it could be that during query processing Kafka 
Streams reads the latest value x and encounters again x in a segment 
because a processor put a newer version of x in the versioned state 
store. A similar scenario might also happen to earlier versions. If 
Streams does not account for such cases it could return invalid histories.


Maybe such weaker guarantees are enough for most use cases.

You could consider implementing weaker guarantees as I described and if 
there is demand propose stricter guarantees in a follow-up KIP.


Maybe there are also other simpler guarantees that make sense.

Best,
Bruno


On 11/9/23 12:30 PM, Bruno Cadonna wrote:

Hi,

Thanks for the updates!

First my take on previous comments:


50)
I am in favor of deprecating the constructor that does not take the 
validTo parameter. That implies that I am in favor of modifying get(key, 
asOf) to set the correct validTo.



60)
I am in favor of renaming ValueIterator to VersionedRecordIterator and 
define it as:


public interface VersionedRecordIterator extends 
Iterator>


(Matthias, you mixed up ValueAndTimestamp with VersionedRecord in your 
last e-mail, didn't you? Just double-checking if I understood what you 
are proposing.)



70)
I agree with Matthias that adding a new method on the 
VersionedKeyValueStore interface defeats the purpose of one of the goals 
of IQv2, i.e., not to need to extend the state store interface for IQ. I 
would say if we do not need the method in normal processing, we should 
not extend the public state store interface. BTW, nobody forces you to 
StoreQueryUtils. I think that internal utils class was introduced for 
convenience to leverage existing methods on the state store interface.



80)
Why do we limit ourselves by specifying a default order on the result? 
Different state store implementation might have different strategies to 
store the records which affects the order in which the records are 
returned if they are not sorted before they are returned to the user. 
Some users might not be interested in an order of the result and so 
there is no reason those users pay the cost for sorting. I propose to 
not specify a default order but sort the results (if needed) when 
withDescendingX() and withAscendingX() is specified on the query object.



Regarding the snapshot guarantees for the iterators, I need to think a 
bit more about it. I will come back to this thread in the next days.



Best,
Bruno


On 11/8/23 5:30 PM, Alieh Saeedi wrote:

Thank you, Bruno and Matthias, for keeping the discussion going and for
reviewing the PR.

Here are the KIP updates:

    - I removed the `peek()` from the `ValueIterator` interface since 
we do

    not need it.
    - Yes, Bruno, the `validTo` field in the `VersionedRecod` class is
    exclusive. I updated the javadocs for that.


Very important critical open questions. I list them here based on 
priority

(descendingly).

    - I implemented the `get(key, fromtime, totime)` method here

<https://github.com/aliehsaeedii/kafka/blob/9578b7cb7cdade22cc63f671693f5aeb993937ca/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L262>:
    the problem is that this implementation does not guarantee 
consistency
    because processing might continue interleaved (no snapshot 
semantics is

    implemented). More over, it materializes all results in memory.
   - Solution 1: Use a lock and release it after retrieving all 
desired

   records from all segments.
  - positive point: snapshot semantics is implemented
  - negative points: 1) It is expensive since iterating over all
  segments may take a long time. 2) It still requires
materializing results
  on memory
   - Solution 2: use `RocksDbIterator`.
  - positive points: 1) It guarantees snapshot segments. 2) It 
does

  not require materializing results in memory.
  - negative points: it is expensive because, anyway, we need to
  iterate over all (many) segments.

    Do you have any thoughts on this issue? (ref: Matthias's 
comment

<https://github.com/apache/kafka/pull/14626#pullrequestreview-1709280589>)

    - I added the

Re: [VOTE] KIP-892: Transactional StateStores

2023-11-14 Thread Bruno Cadonna

Hi Nick!

Thanks a lot for the KIP!

Looking forward to the implementation!

+1 (binding)

Best,
Bruno

On 11/14/23 2:23 AM, Sophie Blee-Goldman wrote:

+1 (binding)

Thanks a lot for this KIP!

On Mon, Nov 13, 2023 at 8:39 AM Lucas Brutschy
 wrote:


Hi Nick,

really happy with the final KIP. Thanks a lot for the hard work!

+1 (binding)

Lucas

On Mon, Nov 13, 2023 at 4:20 PM Colt McNealy  wrote:


+1 (non-binding).

Thank you, Nick, for making all of the changes (especially around the
`default.state.isolation.level` config).

Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Nov 13, 2023 at 7:15 AM Nick Telford 

wrote:



Hi everyone,

I'd like to call a vote for KIP-892: Transactional StateStores[1],

which

makes Kafka Streams StateStores transactional under EOS.

Regards,

Nick

1:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores








Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-09 Thread Bruno Cadonna

Hi,

Thanks for the updates!

First my take on previous comments:


50)
I am in favor of deprecating the constructor that does not take the 
validTo parameter. That implies that I am in favor of modifying get(key, 
asOf) to set the correct validTo.



60)
I am in favor of renaming ValueIterator to VersionedRecordIterator and 
define it as:


public interface VersionedRecordIterator extends 
Iterator>


(Matthias, you mixed up ValueAndTimestamp with VersionedRecord in your 
last e-mail, didn't you? Just double-checking if I understood what you 
are proposing.)



70)
I agree with Matthias that adding a new method on the 
VersionedKeyValueStore interface defeats the purpose of one of the goals 
of IQv2, i.e., not to need to extend the state store interface for IQ. I 
would say if we do not need the method in normal processing, we should 
not extend the public state store interface. BTW, nobody forces you to 
StoreQueryUtils. I think that internal utils class was introduced for 
convenience to leverage existing methods on the state store interface.



80)
Why do we limit ourselves by specifying a default order on the result? 
Different state store implementation might have different strategies to 
store the records which affects the order in which the records are 
returned if they are not sorted before they are returned to the user. 
Some users might not be interested in an order of the result and so 
there is no reason those users pay the cost for sorting. I propose to 
not specify a default order but sort the results (if needed) when 
withDescendingX() and withAscendingX() is specified on the query object.



Regarding the snapshot guarantees for the iterators, I need to think a 
bit more about it. I will come back to this thread in the next days.



Best,
Bruno


On 11/8/23 5:30 PM, Alieh Saeedi wrote:

Thank you, Bruno and Matthias, for keeping the discussion going and for
reviewing the PR.

Here are the KIP updates:

- I removed the `peek()` from the `ValueIterator` interface since we do
not need it.
- Yes, Bruno, the `validTo` field in the `VersionedRecod` class is
exclusive. I updated the javadocs for that.


Very important critical open questions. I list them here based on priority
(descendingly).

- I implemented the `get(key, fromtime, totime)` method here

<https://github.com/aliehsaeedii/kafka/blob/9578b7cb7cdade22cc63f671693f5aeb993937ca/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java#L262>:
the problem is that this implementation does not guarantee consistency
because processing might continue interleaved (no snapshot semantics is
implemented). More over, it materializes all results in memory.
   - Solution 1: Use a lock and release it after retrieving all desired
   records from all segments.
  - positive point: snapshot semantics is implemented
  - negative points: 1) It is expensive since iterating over all
  segments may take a long time. 2) It still requires
materializing results
  on memory
   - Solution 2: use `RocksDbIterator`.
  - positive points: 1) It guarantees snapshot segments. 2) It does
  not require materializing results in memory.
  - negative points: it is expensive because, anyway, we need to
  iterate over all (many) segments.

Do you have any thoughts on this issue? (ref: Matthias's comment
<https://github.com/apache/kafka/pull/14626#pullrequestreview-1709280589>)

- I added the field `validTo` in `VersionedRecord`. Its default value is
MAX. But as Matthias mentioned, for the single-key single-ts
(`VersionedKeyQuery` in KIP-960), it may not always be true. If the
returned record belongs to an old segment, maybe it is not valid any more.
So MAX is not the correct value for `ValidTo`. Two solutions come to mind:
   - Solution 1: make the `ValidTo` as an `Optional` and set it to
   `empty` for the retuned result of `get(key, asOfTimestamp)`.
   - Solution 2: change the implementation of `get(key, asOfTimestamp)`
   so that it finds the correct `validTo` for the returned versionedRecord.

   - In this KIP and the next one, even though the default ordering is
with ascending ts, I added the method `withAscendingTimestamps()` to have
more user readibility (as Bruno suggested), while Hanyu did only add
`withDescending...` methods (he did not need ascending because that's the
default anyway). Matthias believes that we should not have
inconsistencies (he actually hates them:D). Shall I change my KIP or Hanyu?
Thoughts?


That would be maybe helpful to look into the PR
<https://github.com/apache/kafka/pull/14626> for more clarity and even
review that ;-)

Cheers,
Alieh

On Thu, Nov 2, 2023 at 7:13 PM Bruno Cadonna  wrote:


Hi Alieh,

First of all, I like the examples.

Is validTo in VersionedRecord exclusive or inclusive?
In t

Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-11-03 Thread Bruno Cadonna
 "key range
over latest" semantics. -- The issue is of course, that uses need to
know that the query would return `ValueAndTimestamp` and not plain `V`
(or we add a translation step to unwrap the value, but we would lose the
"validFrom" timestamp -- validTo would be `null`). Because type safety
is a general issue in IQv2 it would not make it worse (in the strict
sense), but I am also not sure if we want to dig an even deeper hole...


-Matthias


On 10/10/23 11:55 AM, Alieh Saeedi wrote:

Thanks, Matthias and Bruno, for the feedback on KIP-969. Here is a

summary

of the updates I made to the KIP:

 1.  I liked the idea of renaming methods as Matthias suggested.
 2. I removed the allversions() method as I did in KIP-968. To

retrieve

 the latest value(s), the user must call just the asOf method with

the MAX

 value (asOf(MAX)). The same applies to KIP-968. Do you think it is

clumsy,

 Matthias?
 3. I added a method to the *VersionedKeyValueStore *interface, as I

did

 for KIP-968.
 4. Matthias: I do not get what you mean by your second comment. Isn't
 the KIP already explicit about that?

 > I assume, results are returned by timestamp for each key. The KIP
 should be explicit about it.


Cheers,
Alieh



On Tue, Oct 3, 2023 at 6:07 AM Matthias J. Sax  wrote:


Thanks for updating the KIP.

Not sure if I agree or not with Bruno's idea to split the query types
further? In the end, we split them only because there is three different
return types: single value, value-iterator, key-value-iterator.

What do we gain by splitting out single-ts-range-key? In the end, for
range-ts-range-key the proposed class is necessary and is a superset
(one can set both timestamps to the same value, for single-ts lookup).

The mentioned simplification might apply to "single-ts-range-key" but I
don't see a simplification for the proposed (and necessary) query type?

On the other hand, I see an advantage of a single-ts-range-key for
querying over the "latest version" with a range of keys. For a
single-ts-range-key query, this it would be the default (similar to
VersionedKeyQuery with not asOf-timestamped defined).

In the current version of the KIP, (if we agree that default should
actually return "all versions" not "latest" -- this default was
suggested by Bruno on KIP-968 and makes sense to me, so we would need to
have the same default here to stay consistent), users would need to pass
in `from(Long.MAX).to(Long.MAX)` (if I got this right) to query the
latest point in time only, what seems to be clumsy? Or we could add a
`lastestKeyOnly` option to `MultiVersionedRangeQuery`, but it does seems
a little clumsy, too.




The overall order of the returned records is by Key


I assume, results are returned by timestamp for each key. The KIP should
be explicit about it.



To be very explicit, should we rename the methods to specify the key

bound?


- withRange -> withKeyRange
- withLowerBound -> withLowerKeyBound
    - withUpperBound -> withUpperKeyBound
- withNoBounds -> allKeys (or withNoKeyBounds, but we use
`allVersions` and not `noTimeBound` and should align the naming?)



-Matthias


On 9/6/23 5:25 AM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

One high level comment/question:

I assume you separated single key queries into two classes because
versioned key queries return a single value and multi version key
queries return iterators. Although, range queries always return
iterators, it would make sense to also separate range queries for
versioned state stores into range queries that return one single

version

of the keys within a range and range queries that return multiple
version of the keys within a range, IMO. That would reduce the
meaningless combinations.
WDYT?

Best,
Bruno

On 8/16/23 8:01 PM, Alieh Saeedi wrote:

Hi all,

I splitted KIP-960
<



https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores



into
three separate KIPs. Therefore, please continue discussions about

range

interactive queries here. You can see all the addressed reviews on the
following page. Thanks in advance.

KIP-969: Support range interactive queries (IQv2) for versioned state
stores
<



https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores




I look forward to your feedback!

Cheers,
Alieh











Re: [VOTE] KIP-998: Give ProducerConfig(props, doLog) constructor protected access

2023-11-03 Thread Bruno Cadonna

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 11/3/23 7:55 AM, Chris Egerton wrote:

+1 (binding)

FWIW, I agree that this change should require a KIP. Gating upgrades of
visibility from private or package-private to protected may be cumbersome,
but it comes with the expectation that downgrades in visibility for those
same classes/methods will also be gated behind a KIP, which is IMO clearly
warranted considering the backwards compatibility implications of taking,
e.g., a protected constructor and turning it private.

Cheers,

Chris

On Fri, Nov 3, 2023, 00:55 Sophie Blee-Goldman 
wrote:


Hey all,

This is a trivial one-liner change that it was determined should go through
a KIP during the PR review process (see this thread
 for
context). Since the change itself was already reviewed and approved I'm
skipping the discussion thread and bringing it to a vote right away, but of
course I'm open to feedback and can create a discussion thread if there is
need for it.

The change itself is simply adding the `protected` modifier to the
ProducerConfig constructor that allows for silencing the config logging.
This just brings the ProducerConfig in alignment with the other client
configs, all of which already had this constructor as protected.

KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access
PR: https://github.com/apache/kafka/pull/14681

Thanks!
Sophie





Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-02 Thread Bruno Cadonna
e naming consistent across the board?

If yes, we should also do `from (Instant fromTime)` and use getters
`fromTime()` and `toTime()` -- given that it's range bounds it seems
acceptable to me, to diverge a little bit from KIP-960 `asOfTimestamp()`
-- but we could also rename it to `asOfTime()`? -- Given that we
strongly type with `Instant` I am not worried about semantic ambiguity.



20) About throwing a NPE when time bounds are `null` -- why? (For the
key it makes sense as is mandatory to have a key.) Could we not
interpret `null` as "no bound". We did KIP-941 to add `null` for
open-ended `RangeQueries`, so I am wondering if we should just stick to
the same semantics?

Cf


https://cwiki.apache.org/confluence/display/KAFKA/KIP-941%3A+Range+queries+to+accept+null+lower+and+upper+bounds




30) About the class naming. That's always tricky, and I am not married
to my proposal. I agree with Bruno that the other suggested names are
not really better. -- The underlying idea was, to get some "consistent"
naming across the board.

Existing `KeyQuery`
New `VersionedKeyQuery` (KIP-960; we add a prefix)
New `MultiVersionKeyQuery` (this KIP; extend the prefix with a

pre-prefix)


Existing `RangeQuery`
New `MultiVersionRangeQuery` (KIP-969; add same prefix as above)



40) I am fine with not adding `range(from, to)` -- it was just an idea.





Some more follow up question:

50) You propose to add a new constructor and getter to `VersionedRecord`
-- I am wondering if this implies that `validTo` is optional because the
existing constructor is not deprecated? -- Also, what happens if
`validTo` is not set and `valueTo()` is called? Or do we intent to make
`validTo` mandatory?

Maybe this question can only be answered when working on the code, but I
am wondering if we should make `validTo` mandatory or not... And what
the "blast radius" of changing `VersionedRecord` will be in general. Do
you have already some POC PR that we could look at to get some signals
about this?



60) The new query class is defined to return
`ValueIterator>` -- while I like the idea to add
`ValueIterator` in a generic way on the one hand, I am wondering if
it might be better to change it, and enforce its usage (ie, return type)
of `VersionedRecord` to improve type safety (type erasure is often a
pain, and we could mitigate it this way).

Btw: We actually do a similar thing for `KeyValueIterator`.

Ie,

public interface ValueIterator extends Iterator>

and

ValueAndTimestamp peek();

This would imply that the return type of the new query is
`ValueIterator` on the interface what seems simpler and more elegant?

If we go with the change, I am also wondering if we need to find a
better name for the new iterator class? Maybe `VersionIterator` or
something like this?

Of course it might limit the use of `ValueIterator` for other value
types -- not sure if this a limitation that is prohibitive? My gut
feeling is, that is should not be too limiting.




70) Do we really need the change in `VersionedKeyValueStore` and add a
new method? In the end, the idea of IQv2 is to avoid exactly this... It
was the main issue for IQv1, that the base interface of the store needed
an update and thus all classed implementing the base interface, making
it very cumbersome to add new query types. -- Of course, we need this
new method on the actually implementation (as private method) that can
be called from `query()` method, but adding it to the interface seems to
defeat the purpose of IQv2.

Note, for existing IQv2 queries types that go against others stores, the
public methods already existed when IQv2 was introduces, and thus the
implementation of these query types just pragmatically re-used existing
methods -- but it does not imply that new public method should be added.




-Matthias


On 10/11/23 5:11 AM, Bruno Cadonna wrote:

Thanks for the updates, Alieh!

The example in the KIP uses the allVersions() method which we agreed

to

remove.

Regarding your questions:
1. asOf vs. until: I am fine with both but slightly prefer until.
2. What about KeyMultiVersionsQuery, KeyVersionsQuery (KIP-960 would
then be KeyVersionQuery). However, I am also fine with
MultiVersionedKeyQuery since none of the names sounds better or worse

to

me.
3. I agree with you not to introduce the method with the two bounds to
keep things simple.
4. Forget about fromTime() an asOfTime(), from() and asOf() is fine.
5. The main purpose is to show how to use the API. Maybe make an

example

with just the key to distinguish this query from the single value

query

of KIP-960 and then one with a key and a time range. When you iterate
over the results you could also call validTo(). Maybe add some actual
records in the comments to show what the result might look like.

Regarding the test plan, I hope you also plan to add unit tests in all
of your KIPs. Maybe you could also explain why system tests are not
needed here.

Best,
Bruno

On 10/10/23 5:36 PM, Alieh Sa

Re: [VOTE] KIP-988 Streams StandbyUpdateListener

2023-10-31 Thread Bruno Cadonna

Hi Colt and Eduwer,

Thanks for the KIP!

+1 (binding)

Best,
Bruno

On 10/26/23 7:17 PM, Matthias J. Sax wrote:

+1 (binding)

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

Happy to see this -- that's a +1 (binding) from me

On Mon, Oct 23, 2023 at 6:33 AM Bill Bejeck  wrote:


This is a great addition

+1(binding)

-Bill

On Fri, Oct 20, 2023 at 2:29 PM Almog Gavra  
wrote:



+1 (non-binding) - great improvement, thanks Colt & Eduwer!

On Tue, Oct 17, 2023 at 11:25 AM Guozhang Wang <

guozhang.wang...@gmail.com



wrote:


+1 from me.

On Mon, Oct 16, 2023 at 1:56 AM Lucas Brutschy
 wrote:


Hi,

thanks again for the KIP!

+1 (binding)

Cheers,
Lucas



On Sun, Oct 15, 2023 at 9:13 AM Colt McNealy 

wrote:


Hello there,

I'd like to call a vote on KIP-988 (co-authored by my friend and

colleague

Eduwer Camacaro). We are hoping to get it in before the 3.7.0

release.








https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener


Cheers,
Colt McNealy

*Founder, LittleHorse.dev*










Re: [ANNOUNCE] New Kafka PMC Member: Satish Duggana

2023-10-30 Thread Bruno Cadonna

Congrats, Satish!

Bruno

On 10/29/23 2:42 PM, John Roesler wrote:

Congratulations, Satish!
-John

On Sun, Oct 29, 2023, at 08:09, Randall Hauch wrote:

Congratulations, Satish!

On Sun, Oct 29, 2023 at 1:47 AM Tom Bentley  wrote:


Congratulations!

On Sun, 29 Oct 2023 at 5:41 PM, Guozhang Wang 
wrote:


Congratulations Satish!

On Sat, Oct 28, 2023 at 12:59 AM Luke Chen  wrote:


Congrats Satish!

Luke

On Sat, Oct 28, 2023 at 11:16 AM ziming deng 


wrote:


Congratulations Satish!


On Oct 27, 2023, at 23:03, Jun Rao 

wrote:


Hi, Everyone,

Satish Duggana has been a Kafka committer since 2022. He has been

very

instrumental to the community since becoming a committer. It's my

pleasure

to announce that Satish is now a member of Kafka PMC.

Congratulations Satish!

Jun
on behalf of Apache Kafka PMC










Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-20 Thread Bruno Cadonna

Hi,

Matthias is correct that the end offsets are stored somewhere in the 
metadata of the consumer. More precisely, they are stored in the 
`TopicPartitionState`. However, I could not find public API on the 
consumer other than currentLag() that uses the stored end offsets. If I 
understand the code correctly, method endOffSets() always triggers a 
remote call.


I am a bit concerned about doing remote calls every commit.interval.ms 
(by default 200ms under EOS). At the moment the remote calls are only 
issued if an optimization for KTables is turned on where changelog 
topics are replaced with the input topic of the KTable. The current 
remote calls retrieve all committed offsets of the group at once. If I 
understand correctly, that is one single remote call. Remote calls for 
getting end offsets of changelog topics -- as I understand you are 
planning to issue -- will probably result in multiple remote calls to 
multiple leaders of the changelog topic partitions.


Please correct me if I misunderstood anything of the above.

If my understanding is correct, I propose to modify the consumer in such 
a way to get the end offset from the locally stored metadata whenever 
possible as part of the implementation of this KIP. I do not know what 
the implications are of such a change of the consumer and if a KIP is 
needed for it. Maybe, endOffsets() guarantees to return the freshest end 
offsets possible, which would not be satisfied with the modification.


Regarding the naming, I do not completely agree with Matthias. While the 
pattern might be consistent with onBatchUpdated, what is the meaning of 
onBatchUpdated? Is the batch updated? The names onBatchLoaded or 
onBatchWritten or onBatchAdded are more clear IMO.
With "restore" the pattern works better. If I restore a batch of records 
in a state, the records are not there although they should be there and 
I add them. If I update a batch of records in a state. This sounds like 
the batch of records is in the state and I modify the existing records 
within the state. That is clearly not the meaning of the event for which 
the listener should be called.


Best,
Bruno



On 10/19/23 2:12 AM, Matthias J. Sax wrote:

Thanks for the KIP. Seems I am almost late to the party.

About naming (fun, fun, fun): I like the current proposal overall, 
except `onBachLoaded`, but would prefer `onBatchUpdated`. It better 
aligns to everything else:


  - it's an update-listener, not loaded-listener
  - `StateRestoreListener` has `onRestoreStart`, `onRestoreEnd`, 
`onRestoreSuspended, and `onBachRestored` (it's very consistent
  - `StandbyUpdateListener` should have `onUpdateStart`, 
`onUpdateSuspended` and `onBatchUpdated`  to be equally consistent 
(using "loaded" breaks the pattern)



About the end-offset question: I am relatively sure that the consumer 
gets the latest end-offset as attached metadata in every fetch response. 
(We exploit this behavior to track end-offsets for input topic with 
regard to `max.task.idle.ms` without overhead -- it was also a concern 
when we did the corresponding KIP how we could track lag with no overhead).


Thus, I believe we would "just" need to modify the code accordingly to 
get this information from the restore-consumer 
(`restorConsumer.endOffsets(...)`; should be served w/o RPC but from 
internal metadata cache) for free, and pass into the listener.


Please double check / verify this claim and keep me honest about it.


-Matthias

On 10/17/23 6:38 AM, Eduwer Camacaro wrote:

Hi Bruno,

Thanks for your observation; surely it will require a network call using
the admin client in order to know this "endOffset" and that will have an
impact on performance. We can either find a solution that has a low 
impact
on performance or ideally zero impact; unfortunately, I don't see a 
way to

have zero impact on performance. However, we can leverage the existing
#maybeUpdateLimitOffsetsForStandbyChangelogs method, which uses the admin
client to ask for these "endOffset"s. As far I can understand, this 
update

is done periodically using the "commit.interval.ms" configuration. I
believe this option will force us to invoke StandbyUpdateLister once this
interval is reached.

On Mon, Oct 16, 2023 at 8:52 AM Bruno Cadonna  wrote:


Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for
passing into the callback `currentEndOffset`?

I am asking because the comment here:

https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129

says that the end-offset is only updated once for standby tasks whose
changelog topic is not piggy-backed on input topics. I could also not
find the update of end-offset for those standbys.


Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't hav

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-19 Thread Bruno Cadonna
olation level in the short term, with a future KIP introducing the
capability to change the isolation level on a per-query basis,

falling

back

to the "default" defined by this config. That's why I called it

"default",

for future-proofing.

However, it currently includes the caveat that READ_UNCOMMITTED is

not

available under EOS. I think this is the coupling you are alluding

to?


This isn't intended to be a restriction of the API, but is currently

a

technical limitation. However, after discussing with some users about
use-cases that would require READ_UNCOMMITTED under EOS, I'm

inclined to

remove that clause and put in the necessary work to make that

combination

possible now.

I currently see two possible approaches:

1. Disable TX StateStores internally when the IsolationLevel is
READ_UNCOMMITTED and the processing.mode is EOS. This is more

difficult

than it sounds, as there are many assumptions being made

throughout

the

internals about the guarantees StateStores provide. It would

definitely add

a lot of extra "if (read_uncommitted && eos)" branches,

complicating

maintenance and testing.
2. Invest the time *now* to make READ_UNCOMMITTED of EOS

StateStores

possible. I have some ideas on how this could be achieved, but

they

would

need testing and could introduce some additional issues. The

benefit

of

this approach is that it would make query-time IsolationLevels

much

simpler

to implement in the future.

Unfortunately, both will require considerable work that will further

delay

this KIP, which was the reason I placed the restriction in the KIP

in the

first place.

Regards,
Nick

On Sat, 14 Oct 2023 at 03:30, Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hello Nick,

First of all, thanks a lot for the great effort you've put in

driving

this KIP! I really like it coming through finally, as many people

in

the community have raised this. At the same time I honestly feel a

bit

ashamed for not putting enough of my time supporting it and

pushing it

through the finish line (you raised this KIP almost a year ago).

I briefly passed through the DISCUSS thread so far, not sure I've

100

percent digested all the bullet points. But with the goal of

trying to

help take it through the finish line in mind, I'd want to throw
thoughts on top of my head only on the point #4 above which I felt

may

be the main hurdle for the current KIP to drive to a consensus now.

The general question I asked myself is, whether we want to couple

"IQ

reading mode" with "processing mode". While technically I tend to
agree with you that, it's feels like a bug if some single user

chose

"EOS" for processing mode while choosing "read uncommitted" for IQ
reading mode, at the same time, I'm thinking if it's possible that
there could be two different persons (or even two teams) that

would be

using the stream API to build the app, and the IQ API to query the
running state of the app. I know this is less of a technical thing

but

rather a more design stuff, but if it could be ever the case, I'm
wondering if the personale using the IQ API knows about the risks

of

using read uncommitted but still chose so for the favor of
performance, no matter if the underlying stream processing mode
configured by another personale is EOS or not. In that regard, I'm
leaning towards a "leaving the door open, and close it later if we
found it's a bad idea" aspect with a configuration that we can
potentially deprecate than "shut the door, clean for everyone".

More

specifically, allowing the processing mode / IQ read mode to be
decoupled, and if we found that there's no such cases as I

speculated

above or people started complaining a lot, we can still enforce
coupling them.

Again, just my 2c here. Thanks again for the great patience and
diligence on this KIP.


Guozhang



On Fri, Oct 13, 2023 at 8:48 AM Nick Telford <

nick.telf...@gmail.com>

wrote:


Hi Bruno,

4.
I'll hold off on making that change until we have a consensus as

to

what

configuration to use to control all of this, as it'll be

affected by

the

decision on EOS isolation levels.

5.
Done. I've chosen "committedOffsets".

Regards,
Nick

On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna 

wrote:



Hi Nick,

1.
Yeah, you are probably right that it does not make too much

sense.

Thanks for the clarification!


4.
Yes, sorry for the back and forth, but I think for the sake of

the

KIP

it is better to let the ALOS behavior as it is for now due to

the

possible issues you would run into. Maybe we can find a

solution

in the

future. Now the question returns to whether we really need
default.state.isolation.level. Maybe the config could be the

feature

flag Sophie requested.


5.
There is a guideline in Kafka not to use the get prefix for

getters (at

least in the public API). Thus

[jira] [Created] (KAFKA-15625) Do not flush global state store at each commit

2023-10-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15625:
-

 Summary: Do not flush global state store at each commit
 Key: KAFKA-15625
 URL: https://issues.apache.org/jira/browse/KAFKA-15625
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


Global state stores are flushed at each commit. While that is not a big issue 
with at-least-once processing mode since the commit interval is by default 30s, 
it might become an issue with EOS where the commit interval is 200ms by default.
One option would be to flush and checkpoint global state stores when the delta 
of the content exceeds a given threshold as we do for other stores. See 
https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15624) Reconsider synchronisation of methods in RocksDBStore

2023-10-17 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-15624:
-

 Summary: Reconsider synchronisation of methods in RocksDBStore
 Key: KAFKA-15624
 URL: https://issues.apache.org/jira/browse/KAFKA-15624
 Project: Kafka
  Issue Type: Improvement
Reporter: Bruno Cadonna


The code in {{RocksDBStore}} evolved over time. We should reconsider the 
synchronization of the methods in RocksDBStore. Maybe some synchronizations are 
not needed anymore or can be improved. 
The synchronization of the methods is inconsistent. For example, {{putAll()}} 
is not synchronized whereas {{put()}} is synchronized. That could be because 
once {{putAll()}} was a loop over multiple calls to {{put()}}. Additionally, we 
should reconsider how we validate whether the store is open since that seems to 
be the main reason why we synchronize methods.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-16 Thread Bruno Cadonna

Thanks for the KIP, Colt and Eduwer,

Are you sure there is also not a significant performance impact for 
passing into the callback `currentEndOffset`?


I am asking because the comment here: 
https://github.com/apache/kafka/blob/c32d2338a7e0079e539b74eb16f0095380a1ce85/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L129


says that the end-offset is only updated once for standby tasks whose 
changelog topic is not piggy-backed on input topics. I could also not 
find the update of end-offset for those standbys.



Best,
Bruno

On 10/16/23 10:55 AM, Lucas Brutschy wrote:

Hi all,

it's a nice improvement! I don't have anything to add on top of the
previous comments, just came here to say that it seems to me consensus
has been reached and the result looks good to me.

Thanks Colt and Eduwer!
Lucas

On Sun, Oct 15, 2023 at 9:11 AM Colt McNealy  wrote:


Thanks, Guozhang. I've updated the KIP and will start a vote.

Colt McNealy

*Founder, LittleHorse.dev*


On Sat, Oct 14, 2023 at 10:27 AM Guozhang Wang 
wrote:


Thanks for the summary, that looks good to me.

Guozhang

On Fri, Oct 13, 2023 at 8:57 PM Colt McNealy  wrote:


Hello there!

Thanks everyone for the comments. There's a lot of back-and-forth going

on,

so I'll do my best to summarize what everyone's said in TLDR format:

1. Rename `onStandbyUpdateStart()` -> `onUpdateStart()`,  and do

similarly

for the other methods.
2. Keep `SuspendReason.PROMOTED` and `SuspendReason.MIGRATED`.
3. Remove the `earliestOffset` parameter for performance reasons.

If that's all fine with everyone, I'll update the KIP and we—well, mostly
Edu (:  —will open a PR.

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Fri, Oct 13, 2023 at 7:58 PM Eduwer Camacaro 
wrote:


Hello everyone,

Thanks for all your feedback for this KIP!

I think that the key to choosing proper names for this API is

understanding

the terms used inside the StoreChangelogReader. Currently, this class

has

two possible states: ACTIVE_RESTORING and STANDBY_UPDATING. In my

opinion,

using StandbyUpdateListener for the interface fits better on these

terms.

Same applies for onUpdateStart/Suspended.

StoreChangelogReader uses "the same mechanism" for active task

restoration

and standby task updates, but this is an implementation detail. Under
normal circumstances (no rebalances or task migrations), the changelog
reader will be in STANDBY_UPDATING, which means it will be updating

standby

tasks as long as there are new records in the changelog topic. That's

why I

prefer onStandbyUpdated instead of onBatchUpdated, even if it doesn't

100%

align with StateRestoreListener, but either one is fine.

Edu

On Fri, Oct 13, 2023 at 8:53 PM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hello Colt,

Thanks for writing the KIP! I have read through the updated KIP and
overall it looks great. I only have minor naming comments (well,
aren't naming the least boring stuff to discuss and that takes the
most of the time for KIPs :P):

1. I tend to agree with Sophie regarding whether or not to include
"Standby" in the functions of "onStandbyUpdateStart/Suspended", since
it is also more consistent with the functions of
"StateRestoreListener" where we do not name it as
"onStateRestoreState" etc.

2. I know in community discussions we sometimes say "a standby is
promoted to active", but in the official code / java docs we did not
have a term of "promotion", since what the code does is really

recycle

the task (while keeping its state stores open), and create a new
active task that takes in the recycled state stores and just changing
the other fields like task type etc. After thinking about this for a
bit, I tend to feel that "promoted" is indeed a better name for user
facing purposes while "recycle" is more of a technical detail inside
the code and could be abstracted away from users. So I feel keeping
the name "PROMOTED" is fine.

3. Regarding "earliestOffset", it does feel like we cannot always
avoid another call to the Kafka API. And on the other hand, I also
tend to think that such bookkeeping may be better done at the app
level than from the Streams' public API level. I.e. the app could

keep

a "first ever starting offset" per "topic-partition-store" key, and a
when we have rolling restart and hence some standby task keeps
"jumping" from one client to another via task assignment, the app
would update this value just one when it finds the
""topic-partition-store" was never triggered before. What do you
think?

4. I do not have a strong opinion either, but what about

"onBatchUpdated" ?



Guozhang

On Wed, Oct 11, 2023 at 9:31 PM Colt McNealy 

wrote:


Sohpie—

Thank you very much for such a detailed review of the KIP. It might
actually be longer than the original KIP in the first place!

1. Ack'ed and fixed.

2. Correct, this is a confusing passage and requires context:

One thing on our list of TODO's regarding reliability is to

determine

how


Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Bruno Cadonna

Hi Nick,

1.
Yeah, you are probably right that it does not make too much sense. 
Thanks for the clarification!



4.
Yes, sorry for the back and forth, but I think for the sake of the KIP 
it is better to let the ALOS behavior as it is for now due to the 
possible issues you would run into. Maybe we can find a solution in the 
future. Now the question returns to whether we really need 
default.state.isolation.level. Maybe the config could be the feature 
flag Sophie requested.



5.
There is a guideline in Kafka not to use the get prefix for getters (at 
least in the public API). Thus, could you please rename


getCommittedOffset(TopicPartition partition) -> 
committedOffsetFor(TopicPartition partition)


You can also propose an alternative to committedOffsetFor().


Best,
Bruno


On 10/13/23 3:21 PM, Nick Telford wrote:

Hi Bruno,

Thanks for getting back to me.

1.
I think this should be possible. Are you thinking of the situation where a
user may downgrade to a previous version of Kafka Streams? In that case,
sadly, the RocksDBStore would get wiped by the older version of Kafka
Streams anyway, because that version wouldn't understand the extra column
family (that holds offsets), so the missing Position file would
automatically get rebuilt when the store is rebuilt from the changelog.
Are there other situations than downgrade where a transactional store could
be replaced by a non-transactional one? I can't think of any.

2.
Ahh yes, the Test Plan - my Kryptonite! This section definitely needs to be
fleshed out. I'll work on that. How much detail do you need?

3.
See my previous email discussing this.

4.
Hmm, this is an interesting point. Are you suggesting that under ALOS
READ_COMMITTED should not be supported?

Regards,
Nick

On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna  wrote:


Hi Nick,

I think the KIP is converging!


1.
I am wondering whether it makes sense to write the position file during
close as we do for the checkpoint file, so that in case the state store
is replaced with a non-transactional state store the non-transactional
state store finds the position file. I think, this is not strictly
needed, but would be a nice behavior instead of just deleting the
position file.


2.
The test plan does not mention integration tests. Do you not need to
extend existing ones and add new ones. Also for upgrading and
downgrading you might need integration and/or system tests.


3.
I think Sophie made a point. Although, IQ reading from uncommitted data
under EOS might be considered a bug by some people. Thus, your KIP would
fix a bug rather than changing the intended behavior. However, I also
see that a feature flag would help users that rely on this buggy
behavior (at least until AK 4.0).


4.
This is related to the previous point. I assume that the difference
between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the
former you enable transactions on the state store and in the latter you
disable them. If my assumption is correct, I think that is an issue.
Let's assume under ALOS Streams fails over a couple of times more or
less at the same step in processing after value 3 is added to an
aggregation but the offset of the corresponding input record was not
committed. Without transactions disabled, the aggregation value would
increase by 3 for each failover. With transactions enabled, value 3
would only be added to the aggregation once when the offset of the input
record is committed and the transaction finally completes. So the
content of the state store would change depending on the configuration
for IQ. IMO, the content of the state store should be independent from
IQ. Given this issue, I propose to not use transactions with ALOS at
all. I was a big proponent of using transactions with ALOS, but I
realized that transactions with ALOS is not as easy as enabling
transactions on state stores. Another aspect that is problematic is that
the changelog topic which actually replicates the state store is not
transactional under ALOS. Thus, it might happen that the state store and
the changelog differ in their content. All of this is maybe solvable
somehow, but for the sake of this KIP, I would leave it for the future.


Best,
Bruno



On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote:

Hey Nick! First of all thanks for taking up this awesome feature, I'm

sure

every single
Kafka Streams user and dev would agree that it is sorely needed.

I've just been catching up on the KIP and surrounding discussion, so

please

forgive me
for any misunderstandings or misinterpretations of the current plan and
don't hesitate to
correct me.

Before I jump in, I just want to say that having seen this drag on for so
long, my singular
goal in responding is to help this KIP past a perceived impasse so we can
finally move on
to voting and implementing it. Long discussions are to be expected for
major features like
this but it's completely on us as the Streams devs to make sure there is

an

end in sight
for any ongo

Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores

2023-10-13 Thread Bruno Cadonna
 would be to add the feature flag and leave it off
by default. If all goes well
you can do a quick KIP to enable it by default as soon as the
isolation.level config has been
completed. But feel free to just pick whichever option is easiest or
quickest for you to implement)

Hope this helps move the discussion forward,
Sophie

On Tue, Sep 19, 2023 at 1:57 AM Nick Telford  wrote:


Hi Bruno,

Agreed, I can live with that for now.

In an effort to keep the scope of this KIP from expanding, I'm leaning
towards just providing a configurable default.state.isolation.level and
removing IsolationLevel from the StateStoreContext. This would be
compatible with adding support for query-time IsolationLevels in the
future, whilst providing a way for users to select an isolation level now.

The big problem with this, however, is that if a user selects
processing.mode
= "exactly-once(-v2|-beta)", and default.state.isolation.level =
"READ_UNCOMMITTED", we need to guarantee that the data isn't written to
disk until commit() is called, but we also need to permit IQ threads to
read from the ongoing transaction.

A simple solution would be to (temporarily) forbid this combination of
configuration, and have default.state.isolation.level automatically switch
to READ_COMMITTED when processing.mode is anything other than
at-least-once. Do you think this would be acceptable?

In a later KIP, we can add support for query-time isolation levels and
solve this particular problem there, which would relax this restriction.

Regards,
Nick

On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna  wrote:


Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think
it is perfectly valid to say InMemoryKeyValueStore do not support
READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default
at the moment.

Best,
Bruno

On 9/18/23 7:12 PM, Nick Telford wrote:

Oh! One other concern I haven't mentioned: if we make IsolationLevel a
query-time constraint, then we need to add support for READ_COMMITTED

to

InMemoryKeyValueStore too, which will require some changes to the
implementation.

On Mon, 18 Sept 2023 at 17:24, Nick Telford 

wrote:



Hi everyone,

I agree that having IsolationLevel be determined at query-time is the
ideal design, but there are a few sticking points:

1.
There needs to be some way to communicate the IsolationLevel down to

the

RocksDBStore itself, so that the query can respect it. Since stores

are

"layered" in functionality (i.e. ChangeLoggingStore, MeteredStore,

etc.),

we need some way to deliver that information to the bottom layer. For

IQv2,

we can use the existing State#query() method, but IQv1 has no way to

do

this.

A simple approach, which would potentially open up other options,

would

be

to add something like: ReadOnlyKeyValueStore
readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore

(and

similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.).

2.
As mentioned above, RocksDB WriteBatches are not thread-safe, which

causes

a problem if we want to provide READ_UNCOMMITTED Iterators. I also

had a

look at RocksDB Transactions[1], but they solve a very different

problem,

and have the same thread-safety issue.

One possible approach that I mentioned is chaining WriteBatches: every
time a new Interactive Query is received (i.e. readOnlyView, see

above,

is called) we "freeze" the existing WriteBatch, and start a new one

for

new

writes. The Interactive Query queries the "chain" of previous

WriteBatches

+ the underlying database; while the StreamThread starts writing to

the

*new* WriteBatch. On-commit, the StreamThread would write *all*
WriteBatches in the chain to the database (that have not yet been

written).


WriteBatches would be closed/freed only when they have been both
committed, and all open Interactive Queries on them have been closed.

This

would require some reference counting.

Obviously a drawback of this approach is the potential for increased
memory usage: if an Interactive Query is long-lived, for example by

doing a

full scan over a large database, or even just pausing in the middle of

an

iteration, then the existing chain of WriteBatches could be kept

around

for

a long time, potentially forever.

--

A.
Going off on a tangent, it looks like in addition to supporting
READ_COMMITTED queries, we could go further and support

REPEATABLE_READ

queries (i.e. where subsequent reads to the same key in the same
Interactive Query are guaranteed to yield the same value) by making

use

of

RocksDB Snapshots[2]. These are fairly lightweight, so the performance
impact is likely to be negligible, but they do require that the

Interactive

Query session can be explicitly closed.

This could be achieved if we made the above readOnlyView interface

look

more like:

interface ReadOnlyKeyValueView implements

ReadOnlyKeyValueStore
V>, AutoCloseable {}

interface ReadOnlyKeyValueStore {
  ...
 

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-11 Thread Bruno Cadonna

Thanks for the updates, Alieh!

The example in the KIP uses the allVersions() method which we agreed to 
remove.


Regarding your questions:
1. asOf vs. until: I am fine with both but slightly prefer until.
2. What about KeyMultiVersionsQuery, KeyVersionsQuery (KIP-960 would 
then be KeyVersionQuery). However, I am also fine with 
MultiVersionedKeyQuery since none of the names sounds better or worse to 
me.
3. I agree with you not to introduce the method with the two bounds to 
keep things simple.

4. Forget about fromTime() an asOfTime(), from() and asOf() is fine.
5. The main purpose is to show how to use the API. Maybe make an example 
with just the key to distinguish this query from the single value query 
of KIP-960 and then one with a key and a time range. When you iterate 
over the results you could also call validTo(). Maybe add some actual 
records in the comments to show what the result might look like.


Regarding the test plan, I hope you also plan to add unit tests in all 
of your KIPs. Maybe you could also explain why system tests are not 
needed here.


Best,
Bruno

On 10/10/23 5:36 PM, Alieh Saeedi wrote:

Thank you all for the very exact and constructive comments. I really
enjoyed reading your ideas and all the important points you made me aware
of. I updated KIP-968 as follows:

1. If the key or time bounds are null, the method returns NPE.
2. The "valid" word: I removed the sentence "all the records that are
valid..." and replaced it with an exact explanation. More over, I explained
it with an example in the KIP but not in the javadocs. Do I need to add the
example to the javadocs as well?
3. Since I followed Bruno's suggestion and removed the allVersions()
method, the problem of meaningless combinations is solved, and I do not
need any IllegalArgumentException or something like that. Therefore, the
change is that if no time bound is specified, the query returns the records
with the specified key for all timestamps (all versions).
4. As Victoria suggested, adding a method to the *VersionedKeyValueStore
*interface is essential. So I did that. I had this method only in the
RocksDBVersionedStore class, which was not enough.
5. I added the *validTo* field to the VersionedRecord class to be able
to represent the tombstones. As you suggested, we postpone solving the
problem of retrieving consecutive tombstones for later.
6. I added the "Test Plan" section to all KIPs. I hope what I wrote is
convincing.
7. I added the *withAscendingTimestamp()* method to provide more
code readability
for the user.
8. I removed the evil word "get" from all getter methods.

There have also been some more suggestions which I am still not convinced
or clear about them:

1. Regarding asOf vs until: reading all comments, my conclusion was that
I keep it as "asOf" (following Walker's idea as the native speaker as well
as Bruno's suggestion to be consistent with single-key_single_ts queries).
But I do not have a personal preference. If you insist on "until", I change
it.
2. Bruno suggested renaming the class "MultiVersionedKeyQuery" to sth
else. We already had a long discussion about the name with Matthias. I am
open to renaming it to something else, but do you have any ideas?
3. Matthias suggested having a method with two input parameters that
enables the user to specify both time bounds in the same method. Isn't it
introducing redundancy? It is somehow disrespectful to the idea of having
composable methods.
4. Bruno suggested renaming the methods "asOf" and "from" to "asOfTime"
and "fromTime". If I do that, then it is not consistent with KIP-960.
Moreover, the input parameter is clearly a timestamp, which explains
enough. What do you think about that?
5. I was asked to add more examples to the example section. My question
is, what is the main purpose of that? If I know it clearly, then I can add
what you mean.



Cheers,
Alieh

On Tue, Oct 10, 2023 at 1:13 AM Matthias J. Sax  wrote:


Bruno and I had some background conversation about the `get` prefix
question including a few other committers.

The official policy was never changed, and we should not add the
`get`-prefix. It's a slip on our side in previous KIPs to add the
`get`-prefix and we should actually clean it up doing a follow up KIP.


-Matthias


On 10/5/23 5:26 AM, Bruno Cadonna wrote:

Hi Matthias,

Given all the IQv2 KIPs that use getX and given recent PRs (internal
interfaces mainly) that got merged, I was under the impression that we
moved away from the strict no-getX policy.

I do not think it was an accident using getX in the IQv2 KIPs since
somebody would have brought it up, otherwise.

I am fine with both types of getters.

If we think, we need to discuss this in a broader 

[jira] [Resolved] (KAFKA-15577) Reload4j | CVE-2022-45868

2023-10-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-15577.
---
Resolution: Not A Problem

> Reload4j | CVE-2022-45868
> -
>
> Key: KAFKA-15577
> URL: https://issues.apache.org/jira/browse/KAFKA-15577
> Project: Kafka
>  Issue Type: Bug
>Reporter: masood
>Priority: Critical
>
> Maven indicates 
> [CVE-2022-45868|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-45868]
>  in Reload4j.jar.
> [https://mvnrepository.com/artifact/ch.qos.reload4j/reload4j/1.2.19]
> Could you please verify if this vulnerability affects Kafka?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [VOTE] KIP-960: Support single-key_single-timestamp interactive queries (IQv2) for versioned state stores

2023-10-11 Thread Bruno Cadonna

Thanks for the KIP, Alieh!

+1 (binding)

Best,
Bruno

On 10/10/23 1:14 AM, Matthias J. Sax wrote:
One more nit: as discussed on the related KIP-698 thread, we should not 
use `get` as prefix for the getters.


So it should be `K key()` and `Optional asOfTimestamp()`.


Otherwise the KIP LGTM.


+1 (binding)


-Matthias

On 10/6/23 2:50 AM, Alieh Saeedi wrote:

Hi everyone,

Since KIP-960 is reduced to the simplest IQ type and all further comments
are related to the following-up KIPs, I decided to finalize it at this
point.


A huge thank you to everyone who has reviewed this KIP (and also the
following-up ones), and
participated in the discussion thread!

I'd also like to thank you in advance for taking the time to vote.

Best,
Alieh



Re: KAFKA-15571 Review

2023-10-10 Thread Bruno Cadonna

Hi Levani,

I think you found a bug and you are looking at the right place!

I commented on the PR.

Best,
Bruno

On 10/10/23 3:02 PM, Levani Kokhreidze wrote:

Hello,

We’ve been looking at a https://issues.apache.org/jira/browse/KAFKA-10575 but 
seems implementation has a bug and user defined 
`StateRestoreListener#onRestoreSuspended` is not called because 
DelegatingStateRestoreListener was not updated.

Here’s the PR that fixes it: https://github.com/apache/kafka/pull/14519

I will add the tests, but first wanted to make sure I’m looking at a correct 
place and if I’m missing something.

Issue link: https://issues.apache.org/jira/browse/KAFKA-15571

Best,
Levani


Re: Apache Kafka 3.7.0 Release

2023-10-10 Thread Bruno Cadonna

Thanks Stan!

+1

Best,
Bruno

On 10/10/23 7:24 AM, Luke Chen wrote:

Thanks Stanislav!

On Tue, Oct 10, 2023 at 3:05 AM Josep Prat 
wrote:


Thanks Stanislav!

———
Josep Prat

Aiven Deutschland GmbH

Alexanderufer 3-7, 10117 Berlin

Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen

m: +491715557497

w: aiven.io

e: josep.p...@aiven.io

On Mon, Oct 9, 2023, 20:05 Chris Egerton  wrote:


+1, thanks Stanislav!

On Mon, Oct 9, 2023, 14:02 Bill Bejeck  wrote:


+1

Thanks, Stanislav!

-Bill

On Mon, Oct 9, 2023 at 1:59 PM Ismael Juma  wrote:


Thanks for volunteering Stanislav!

Ismael

On Mon, Oct 9, 2023 at 10:51 AM Stanislav Kozlovski
 wrote:


Hey all!

I would like to volunteer to be the release manager driving the

next

release - Apache Kafka *3.7.0*.

If there are no objections, I will start and share a release plan

soon

enough!

Cheers,
Stanislav













Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-05 Thread Bruno Cadonna

Hi Matthias,

Given all the IQv2 KIPs that use getX and given recent PRs (internal 
interfaces mainly) that got merged, I was under the impression that we 
moved away from the strict no-getX policy.


I do not think it was an accident using getX in the IQv2 KIPs since 
somebody would have brought it up, otherwise.


I am fine with both types of getters.

If we think, we need to discuss this in a broader context, let's start a 
separate thread.



Best,
Bruno





On 10/5/23 7:44 AM, Matthias J. Sax wrote:

I agree to (almost) everything what Bruno said.


In general, we tend to move away from using getters without "get", 
recently. So I would keep the "get".


This is new to me? Can you elaborate on this point? Why do you think 
that's the case?


I actually did realize (after Walker mentioned it) that existing query 
types use `get` prefix, but to me it seems that it was by accident and 
we should consider correcting it? Thus, I would actually prefer to not 
add the `get` prefix for new methods query types.


IMHO, we should do a follow up KIP to deprecate all methods with `get` 
prefix and replace them with new ones without `get` -- it's of course 
always kinda "unnecessary" noise, but if we don't do it, we might get 
into more and more inconsistent naming what would result in a "bad" API.


If we indeed want to change the convention and use the `get` prefix, I 
would strongly advocate to bit the bullet and do KIP to pro-actively add 
the `get` "everywhere" it's missing... But overall, it seems to be a 
much broader decision, and we should get buy in from many committers 
about it -- as long as there is no broad consensus to add `get` 
everywhere, I would strongly prefer not to diverge from the current 
agreement to omit `get`.




-Matthias




On 10/4/23 2:36 AM, Bruno Cadonna wrote:

Hi,

Regarding tombstones:
As far as I understand, we need to add either a validTo field to 
VersionedRecord or we need to return tombstones, otherwise the result 
is not complete, because users could never know a record was deleted 
at some point before the second non-null value was put.
I like more adding the validTo field since it makes the result more 
concise and easier interpretable.


Extending on Victoria's example, with the following puts

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, null, time=15)
put(k, v2, time=20)

the result with tombstones would be

value, timestamp
(v1, 0)
(null, 5)
(null, 10)
(null, 15)
(v2, 20)

instead of

value, timestamp, validTo
(v1, 0, 5)
(v2, 20, null)

The benefit of conciseness would already apply to one single tombstone.

On the other hand, why would somebody write consecutive tombstones 
into a versioned state store? I guess if somebody does that on 
purpose, then there should be a way to retrieve each of those 
tombstones, right?
So maybe we need both -- validTo field and the option to return 
tombstones. The latter might be moved to a future KIP in case we see 
the need.



Regarding .within(fromTs, toTs):
I would keep it simple with .from() and .asOfTimestamp() (or 
.until()). If we go with .within(), I would opt for 
.withinTimeRange(fromTs, toTs), because the query becomes more readable:


MultiVersionedKeyQuery
   .withKey(1)
   .withinTimeRange(Instant.parse(2023-08-03T10:37:30.00Z), 
Instant.parse(2023-08-04T10:37:30.00Z))


If we stay with .from() and .until(), we should consider .fromTime() 
and .untilTime() (or .toTime()):


MultiVersionedKeyQuery
  .withKey(1)
  .fromTime(Instant.parse(2023-08-03T10:37:30.00Z))
  .untilTime(Instant.parse(2023-08-04T10:37:30.00Z))



Regarding asOf vs. until:
I think asOf() is more used in point in time queries as Walker 
mentioned where this KIP specifies a time range. IMO asOf() fits very 
well with KIP-960 where one version is queried, but here I think 
.until() fits better. That might just be a matter of taste and in the 
end I am fine with both as long as it is well documented.



Regarding getters without "get":
In the other IQv2 classes we used getters with "get". In general, we 
tend to move away from using getters without "get", recently. So I 
would keep the "get".



Best,
Bruno

On 10/3/23 7:49 PM, Walker Carlson wrote:

Hey Alieh thanks for the KIP,

Weighing in on the AsOf vs Until debate I think either is fine from a
natural language perspective. Personally AsOf makes more sense to me 
where
until gives me the idea that the query is making a change. It's 
totally a

connotative difference and not that important. I think as of is pretty
frequently used in point of time queries.

Also for these methods it makes sense to drop the "get" We don't
normally use that in getters

    * The key that was specified for this query.
    */
   public K getKey();

   /**
    * The starting time point of the query, if specified
    */
   public Optional getFromTimestamp();

   /**
    * The ending time point of 

[jira] [Resolved] (KAFKA-10199) Separate state restoration into separate threads

2023-10-04 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-10199.
---
Resolution: Done

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>    Assignee: Bruno Cadonna
>Priority: Major
>  Labels: new-streams-runtime-should-fix
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-10-04 Thread Bruno Cadonna

Hi,

Regarding tombstones:
As far as I understand, we need to add either a validTo field to 
VersionedRecord or we need to return tombstones, otherwise the result is 
not complete, because users could never know a record was deleted at 
some point before the second non-null value was put.
I like more adding the validTo field since it makes the result more 
concise and easier interpretable.


Extending on Victoria's example, with the following puts

put(k, v1, time=0)
put(k, null, time=5)
put(k, null, time=10)
put(k, null, time=15)
put(k, v2, time=20)

the result with tombstones would be

value, timestamp
(v1, 0)
(null, 5)
(null, 10)
(null, 15)
(v2, 20)

instead of

value, timestamp, validTo
(v1, 0, 5)
(v2, 20, null)

The benefit of conciseness would already apply to one single tombstone.

On the other hand, why would somebody write consecutive tombstones into 
a versioned state store? I guess if somebody does that on purpose, then 
there should be a way to retrieve each of those tombstones, right?
So maybe we need both -- validTo field and the option to return 
tombstones. The latter might be moved to a future KIP in case we see the 
need.



Regarding .within(fromTs, toTs):
I would keep it simple with .from() and .asOfTimestamp() (or .until()). 
If we go with .within(), I would opt for .withinTimeRange(fromTs, toTs), 
because the query becomes more readable:


MultiVersionedKeyQuery
  .withKey(1)
  .withinTimeRange(Instant.parse(2023-08-03T10:37:30.00Z), 
Instant.parse(2023-08-04T10:37:30.00Z))


If we stay with .from() and .until(), we should consider .fromTime() and 
.untilTime() (or .toTime()):


MultiVersionedKeyQuery
 .withKey(1)
 .fromTime(Instant.parse(2023-08-03T10:37:30.00Z))
 .untilTime(Instant.parse(2023-08-04T10:37:30.00Z))



Regarding asOf vs. until:
I think asOf() is more used in point in time queries as Walker mentioned 
where this KIP specifies a time range. IMO asOf() fits very well with 
KIP-960 where one version is queried, but here I think .until() fits 
better. That might just be a matter of taste and in the end I am fine 
with both as long as it is well documented.



Regarding getters without "get":
In the other IQv2 classes we used getters with "get". In general, we 
tend to move away from using getters without "get", recently. So I would 
keep the "get".



Best,
Bruno

On 10/3/23 7:49 PM, Walker Carlson wrote:

Hey Alieh thanks for the KIP,

Weighing in on the AsOf vs Until debate I think either is fine from a
natural language perspective. Personally AsOf makes more sense to me where
until gives me the idea that the query is making a change. It's totally a
connotative difference and not that important. I think as of is pretty
frequently used in point of time queries.

Also for these methods it makes sense to drop the "get" We don't
normally use that in getters

* The key that was specified for this query.
*/
   public K getKey();

   /**
* The starting time point of the query, if specified
*/
   public Optional getFromTimestamp();

   /**
* The ending time point of the query, if specified
*/
   public Optional getAsOfTimestamp();

Other than that I didn't have too much to add. Overall I like the direction
of the KIP and think the funcatinlyt is all there!
best,
Walker



On Mon, Oct 2, 2023 at 10:46 PM Matthias J. Sax  wrote:


Thanks for the updated KIP. Overall I like it.

Victoria raises a very good point, and I personally tend to prefer (I
believe so does Victoria, but it's not totally clear from her email) if
a range query would not return any tombstones, ie, only two records in
Victoria's example. Thus, it seems best to include a `validTo` ts-field
to `VersionedRecord` -- otherwise, the retrieved result cannot be
interpreted correctly.

Not sure what others think about it.

I would also be open to actually add a `includeDeletes()` (or
`includeTombstones()`) method/flag (disabled by default) to allow users
to get all tombstone: this would only be helpful if there are two
consecutive tombstone though (if I got it right), so not sure if we want
to add it or not -- it seems also possible to add it later if there is
user demand for it, so it might be a premature addition as this point?


Nit:


the public interface ValueIterator is used


"is used" -> "is added" (otherwise it sounds like as if `ValueIterator`
exist already)



Should we also add a `.within(fromTs, toTs)` (or maybe some better
name?) to allow specifying both bounds at once? The existing
`RangeQuery` does the same for specifying the key-range, so might be
good to add for time-range too?



-Matthias


On 9/6/23 5:01 AM, Bruno Cadonna wrote:

In my last e-mail I missed to finish a sentence.

"I think from a KIP"

should be

"I think the KIP looks good!"


On 9/6/23 1:59 PM, Bruno Cadonna wrote:

Hi Alieh,

Thanks for the KIP!

I think from a KIP

1.
I propose to throw an IllegalArgumentException 

Re: [DISCUSS] KIP-985 Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-04 Thread Bruno Cadonna

Hi Hanyu,

I agree with what others said about having a `withDescendingOrder()` 
method and about to document how the results are ordered.


I would not add a reverse flag and adding a parameter to each method in 
RangeQuery. This makes the API less fluent and harder to maintain since 
the flag would change all methods. There is no constraint to only add 
static factory methods to RangeQuery. In fact, if you look into the 
existing class KeyQuery, more precisely at skipCache() and into the 
proposals for queries of versioned state stores, i.e., KIP-969, KIP-968, 
and KIP-960, we already have examples where we set a flag with a 
instance method, for example, asOf(). Such methods make the API more 
fluent and limit the blast radius of the flag to only one public method 
(plus the getter).


So, making a query that reads the state store in reversed order would 
then result in:


final RangeQuery query = RangeQuery.withRange(1, 
1000).withDescendingKeys();


I think this is more readable than:

final RangeQuery query = RangeQuery.withRange(1, 1000, 
true);


Additionally, I think the KIP would benefit from a usage example of the 
newly introduced methods like in KIP-969 etc.


In my opinion, the test plan should also mention that you plan to 
write/adapt unit tests.


Best,
Bruno

On 10/4/23 5:16 AM, Hanyu (Peter) Zheng wrote:

If we use  WithDescendingKeys() to generate a RangeQuery to do the
reveseQuery, how do we achieve the methods like withRange, withUpperBound,
and withLowerBound only in this method?

On Tue, Oct 3, 2023 at 8:01 PM Hanyu (Peter) Zheng 
wrote:


I believe there's no need to introduce a method like WithDescendingKeys().
Instead, we can simply add a reverse flag to RangeQuery. Each method within
RangeQuery would then accept an additional parameter. If the reverse is set
to true, it would indicate the results should be reversed.

Initially, I introduced a reverse variable. When set to false, the
RangeQuery class behaves normally. However, when reverse is set to true,
the RangeQuery essentially takes on the functionality of ReverseRangeQuery.
Further details can be found in the "Rejected Alternatives" section.

In my perspective, RangeQuery is a class responsible for creating a series
of RangeQuery objects. It offers methods such as withRange, withUpperBound,
and withLowerBound, allowing us to generate objects representing different
queries. I'm unsure how adding a withDescendingOrder() method would be
compatible with the other methods, especially considering that, based on
KIP 969, WithDescendingKeys() doesn't appear to take any input variables.
And if withDescendingOrder() doesn't accept any input, how does it return a
RangeQuery?

On Tue, Oct 3, 2023 at 4:37 PM Hanyu (Peter) Zheng 
wrote:


Hi, Colt,
The underlying structure of inMemoryKeyValueStore is treeMap.
Sincerely,
Hanyu

On Tue, Oct 3, 2023 at 4:34 PM Hanyu (Peter) Zheng 
wrote:


Hi Bill,
1. I will update the KIP in accordance with the PR and synchronize their
future updates.
2. I will use that name.
3. you mean add something about ordering at the motivation section?

Sincerely,
Hanyu


On Tue, Oct 3, 2023 at 4:29 PM Hanyu (Peter) Zheng 
wrote:


Hi, Walker,

1. I will update the KIP in accordance with the PR and synchronize
their future updates.
2. I will use that name.
3. I'll provide additional details in that section.
4. I intend to utilize rangeQuery to achieve what we're referring to as
reverseQuery. In essence, reverseQuery is merely a term. To clear up any
ambiguity, I'll make necessary adjustments to the KIP.

Sincerely,
Hanyu



On Tue, Oct 3, 2023 at 4:09 PM Hanyu (Peter) Zheng 
wrote:


Ok, I will change it back to following the code, and update them
together.

On Tue, Oct 3, 2023 at 2:27 PM Walker Carlson
 wrote:


Hello Hanyu,

Looking over your kip things mostly make sense but I have a couple of
comments.


1. You have "withDescandingOrder()". I think you mean "descending"
:)
Also there are still a few places in the do where its called
"setReverse"
2. Also I like "WithDescendingKeys()" better
3. I'm not sure of what ordering guarantees we are offering.
Perhaps we
can add a section to the motivation clearly spelling out the
current
ordering and the new offering?
4. When you say "use unbounded reverseQuery to achieve reverseAll"
do
you mean "use unbounded RangeQuery to achieve reverseAll"? as far
as I can
tell we don't have a reverseQuery as a named object?


Looking good so far

best,
Walker

On Tue, Oct 3, 2023 at 2:13 PM Colt McNealy 
wrote:


Hello Hanyu,

Thank you for the KIP. I agree with Matthias' proposal to keep the

naming

convention consistent with KIP-969. I favor the

`.withDescendingKeys()`

name.

I am curious about one thing. RocksDB guarantees that records

returned

during a range scan are lexicographically ordered by the bytes of

the keys

(either ascending or descending order, as specified in the query).

This

means that results within a single partition 

  1   2   3   4   5   6   7   >