[jira] [Updated] (KAFKA-15348) Range IQs with versioned state stores

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15348:

Labels: kip  (was: )

> Range IQs with versioned state stores
> -
>
> Key: KAFKA-15348
> URL: https://issues.apache.org/jira/browse/KAFKA-15348
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
>
> [KIP-969|https://cwiki.apache.org/confluence/display/KAFKA/KIP-969%3A+Support+range+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers all types of range queries:
> *Range Queries*
>  # key-range latest-value query
>  # key-range with lower bound latest-value query
>  # key-range with upper bound latest-value query
>  # all-keys (no bound) latest-value query
>  # key-range query with timestamp (upper) bound
>  # key-range with lower bound with timestamp (upper) bound 
>  # key-range with upper bound with timestamp (upper) bound
>  # all-keys (no bound) with timestamp (upper) bound
>  # key-range query with timestamp range
>  # key-range query with lower bound with timestamp range
>  # key-range query with upper bound with timestamp range
>  # all-keys (no bound) with timestamp range
>  # key-range query all-versions
>  # key-range query with lower bound all-versions
>  # key-range query with upper bond all-versions
>  # all-keys query (no bound) all-versions (entire store)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15347.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.7.0
>
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15346:

Labels: kip  (was: )

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15347.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.7.0
>
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15347) Single-Key_multi-timestamp IQs with versioned state stores

2023-12-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15347:

Labels: kip  (was: )

> Single-Key_multi-timestamp IQs with versioned state stores
> --
>
> Key: KAFKA-15347
> URL: https://issues.apache.org/jira/browse/KAFKA-15347
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> [KIP-968|https://cwiki.apache.org/confluence/display/KAFKA/KIP-968%3A+Support+single-key_multi-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just four query types:
> *Key Queries with multiple timestamps:*
>  # single-key query with upper bound timestamp
>  # single-key query with lower bound timestamp
>  # single-key query with timestamp range
>  # single-key all versions query



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15957.
-
Resolution: Fixed

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15957.
-
Resolution: Fixed

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15957) ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy broken

2023-12-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15957:
---

Assignee: Lucas Brutschy

> ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy 
> broken
> ---
>
> Key: KAFKA-15957
> URL: https://issues.apache.org/jira/browse/KAFKA-15957
> Project: Kafka
>  Issue Type: Bug
>Reporter: Justine Olshan
>Assignee: Lucas Brutschy
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-12-01 Thread Matthias J. Sax

Thanks for clarifying. Makes sense to me.

On 11/30/23 8:33 PM, Colt McNealy wrote:

Hi Matthias and everyone—

Some clarification is necessary just for posterity. It turns out that, on a
fresh standby task before we start polling for records, we wouldn't be able
to get the current end offset without a network call. This leaves us three
options:

A) Make it an Optional or use a sentinel value to mark that it's not
present.
B) Perform a network call to get the endOffset when it's not there.
C) Remove it.

Option A) seems like it could be a confusing API, especially because in the
strong majority of cases, the Optional would be empty. Option B) is
undesirable because of the performance considerations—if we're going to
make a network round trip, we might as well get some records back! That
leaves us with option C), which is the least-bad of all of them.

At LittleHorse we actually do care about the endOffset in the
onUpdateStart() method, and having it would be useful to us. However, the
work-around isn't horrible, because the endOffset will be passed into the
first call to onBatchLoaded() , which normally follows onUpdateStart()
within <100ms.

Thanks,
Colt McNealy

*Founder, LittleHorse.dev*


On Thu, Nov 30, 2023 at 4:43 PM Matthias J. Sax  wrote:


parameter is somewhat irrelevant to our use case


Sounds like a weird justification to change the KIP. Providing more
information is usually better than less, so it seems it won't hurt to
just keep it (seems useful in general to get the current end offset in
this callback) -- you can always ignore it, if it's not relevant for
your use case.


-Matthias


On 11/30/23 6:56 AM, Eduwer Camacaro wrote:

Hello everyone,

We have come to the conclusion, during our work on this KIP's
implementation, that the #onUpdateStart callback's "currentEndOffset"
parameter is somewhat irrelevant to our use case. When this callback is
invoked, I think this value is usually unknown. Our choice to delete this
parameter from the #onUpdateStart callback requires an update to the KIP.

Please feel free to review the PR and provide any comments you may have.

:)

Thanks in advance

Edu-

On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax 

wrote:



Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 

wrote:



Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a

week

of business travel (and a few days with no time to code). I'd like to

tie

up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out

for a

vote, and b) making sure our English grammar is "correct", let's stick

with

'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote

network

call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not

documented

in the javadoc. As such, I am strongly against modifying the behavior

of

endOffsets() on the consumer as some people may implicitly depend on

the

contract.
3a. The Consumer#currentLag() method gives us exactly what we want

without

a network call (current lag from a cache, from which we can compute

the

offset).

4. I have no opinion about whether we should pass endOffset or

currentLag

to the callback. Either one has the same exact information inside it.

In

the interest of not changing the KIP after the vote has started, I'll

leave

it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure

we're

on track to get it in to 3.7 (we still have a few weeks)  -- looks

like

there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are

onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a

good

balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable

enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth

is

that English grammar can be sticky and while it could be argued that

it

is

the store which is updated, not the batch, I feel that

Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-11-30 Thread Matthias J. Sax

parameter is somewhat irrelevant to our use case


Sounds like a weird justification to change the KIP. Providing more 
information is usually better than less, so it seems it won't hurt to 
just keep it (seems useful in general to get the current end offset in 
this callback) -- you can always ignore it, if it's not relevant for 
your use case.



-Matthias


On 11/30/23 6:56 AM, Eduwer Camacaro wrote:

Hello everyone,

We have come to the conclusion, during our work on this KIP's
implementation, that the #onUpdateStart callback's "currentEndOffset"
parameter is somewhat irrelevant to our use case. When this callback is
invoked, I think this value is usually unknown. Our choice to delete this
parameter from the #onUpdateStart callback requires an update to the KIP.

Please feel free to review the PR and provide any comments you may have. :)
Thanks in advance

Edu-

On Thu, Oct 26, 2023 at 12:17 PM Matthias J. Sax  wrote:


Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy 

wrote:



Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a

week

of business travel (and a few days with no time to code). I'd like to

tie

up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out

for a

vote, and b) making sure our English grammar is "correct", let's stick

with

'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote

network

call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not

documented

in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want

without

a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or

currentLag

to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll

leave

it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure

we're

on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth

is

that English grammar can be sticky and while it could be argued that it

is

the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound

weird

at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in

order

to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to

focus

on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on

with

a followup KIP if/when we find an efficient way to add additional

useful

information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics

API

to

get the current lag and/or end offset for the changelog -- it's

possible

this does not represent

[jira] [Resolved] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15527.
-
Resolution: Fixed

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Add reverseRange and reverseAll query over kv-store in IQv2
> Update an implementation of the Query interface, introduced in [KIP-796: 
> Interactive Query 
> v2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2]
>  , to support reverseRange and reverseAll.
> Use bounded query to achieve reverseRange and use unbounded query to achieve 
> reverseAll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15527.
-
Resolution: Fixed

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> Add reverseRange and reverseAll query over kv-store in IQv2
> Update an implementation of the Query interface, introduced in [KIP-796: 
> Interactive Query 
> v2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2]
>  , to support reverseRange and reverseAll.
> Use bounded query to achieve reverseRange and use unbounded query to achieve 
> reverseAll.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15951) MissingSourceTopicException should include topic names

2023-11-30 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15951:
---

 Summary: MissingSourceTopicException should include topic names
 Key: KAFKA-15951
 URL: https://issues.apache.org/jira/browse/KAFKA-15951
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


As the title say – we don't include topic names in all cases, what make it hard 
for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15951) MissingSourceTopicException should include topic names

2023-11-30 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15951:
---

 Summary: MissingSourceTopicException should include topic names
 Key: KAFKA-15951
 URL: https://issues.apache.org/jira/browse/KAFKA-15951
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


As the title say – we don't include topic names in all cases, what make it hard 
for users to identify the root cause more clearly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15629.
-
Resolution: Fixed

> proposal to introduce IQv2 Query Types: TimestampedKeyQuery and 
> TimestampedRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-992: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15629) proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-11-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15629.
-
Resolution: Fixed

> proposal to introduce IQv2 Query Types: TimestampedKeyQuery and 
> TimestampedRangeQuery
> -
>
> Key: KAFKA-15629
> URL: https://issues.apache.org/jira/browse/KAFKA-15629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> KIP-992: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery
> In the current IQv2 code, there are noticeable differences when interfacing 
> with plain-kv-store and ts-kv-store. Notably, the return type V acts as a 
> simple value for plain-kv-store but evolves into ValueAndTimestamp for 
> ts-kv-store, which presents type safety issues in the API.
> Even if IQv2 hasn't gained widespread adoption, an immediate fix might bring 
> compatibility concerns.
> This brings us to the essence of our proposal: the introduction of distinct 
> query types. One that returns a plain value, another for values accompanied 
> by timestamps.
> While querying a ts-kv-store for a plain value and then extracting it is 
> feasible, it doesn't make sense to query a plain-kv-store for a 
> ValueAndTimestamp.
> Our vision is for plain-kv-store to always return V, while ts-kv-store should 
> return ValueAndTimestamp.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2023-11-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15941:

Labels: flaky-test  (was: )

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15941) Flaky test: shouldRestoreNullRecord() – org.apache.kafka.streams.integration.RestoreIntegrationTest

2023-11-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15941:

Component/s: streams
 unit tests

> Flaky test: shouldRestoreNullRecord() – 
> org.apache.kafka.streams.integration.RestoreIntegrationTest
> ---
>
> Key: KAFKA-15941
> URL: https://issues.apache.org/jira/browse/KAFKA-15941
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/24/tests/
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> Did not receive all [KeyValue(2, \x00\x00\x00)] records from topic output 
> (got []) ==> expected:  but was: 
> Stacktraceorg.opentest4j.AssertionFailedError: Condition not met 
> within timeout 6. Did not receive all [KeyValue(2, \x00\x00\x00)] records 
> from topic output (got []) ==> expected:  but was:   at 
> org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>   at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)  at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) 
>at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) 
> at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:878)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:827)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:790)
>  at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRestoreNullRecord(RestoreIntegrationTest.java:244)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15944) Flaky test - verifyStore[cache=false, log=true, supplier=ROCKS_KV, kind=DSL] – org.apache.kafka.streams.integration.PositionRestartIntegrationTest

2023-11-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15944:

Component/s: unit tests

> Flaky test - verifyStore[cache=false, log=true, supplier=ROCKS_KV, kind=DSL] 
> – org.apache.kafka.streams.integration.PositionRestartIntegrationTest
> --
>
> Key: KAFKA-15944
> URL: https://issues.apache.org/jira/browse/KAFKA-15944
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.7.0
>Reporter: Andrew Schofield
>Priority: Minor
>  Labels: flaky-test
>
> Error
> org.apache.kafka.common.errors.TimeoutException: The query never returned 
> within the bound. Last result: 
> StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@61b360a4,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=FailedQueryResult{failureReason=NOT_PRESENT, failure='The requested 
> partition was not present at the time of the query.', executionInfo=[], 
> position=null}}, globalResult=null}
> Stacktrace
> org.apache.kafka.common.errors.TimeoutException: The query never returned 
> within the bound. Last result: 
> StateQueryResult{partitionResults={0=SucceededQueryResult{result=org.apache.kafka.streams.state.internals.MeteredKeyValueStore$MeteredKeyValueTimestampedIterator@61b360a4,
>  executionInfo=[], position=Position{position={input-topic={0=1, 
> 1=FailedQueryResult{failureReason=NOT_PRESENT, failure='The requested 
> partition was not present at the time of the query.', executionInfo=[], 
> position=null}}, globalResult=null}
> Standard Output
> [2023-11-28 22:52:47,353] INFO [Producer clientId=producer-129] Instantiated 
> an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer:587)
> [2023-11-28 22:52:47,466] INFO [Producer clientId=producer-129] ProducerId 
> set to 0 with epoch 0 
> (org.apache.kafka.clients.producer.internals.TransactionManager:502)
> [2023-11-28 22:52:47,473] INFO [Producer clientId=producer-129] Closing the 
> Kafka producer with timeoutMillis = 9223372036854775807 ms. 
> (org.apache.kafka.clients.producer.KafkaProducer:1332)
> [2023-11-28 22:52:47,531] INFO stream-client 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57]
>  Kafka Streams version: test-version 
> (org.apache.kafka.streams.KafkaStreams:914)
> [2023-11-28 22:52:47,531] INFO stream-client 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57]
>  Kafka Streams commit ID: test-commit-ID 
> (org.apache.kafka.streams.KafkaStreams:915)
> [2023-11-28 22:52:47,532] INFO stream-thread 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1]
>  Creating restore consumer client 
> (org.apache.kafka.streams.processor.internals.StreamThread:365)
> [2023-11-28 22:52:47,537] INFO stream-thread 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1]
>  Creating thread producer client 
> (org.apache.kafka.streams.processor.internals.StreamThread:105)
> [2023-11-28 22:52:47,538] INFO [Producer 
> clientId=app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1-producer]
>  Instantiated an idempotent producer. 
> (org.apache.kafka.clients.producer.KafkaProducer:587)
> [2023-11-28 22:52:47,545] INFO stream-thread 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StreamThread-1]
>  Creating consumer client 
> (org.apache.kafka.streams.processor.internals.StreamThread:432)
> [2023-11-28 22:52:47,545] INFO state-updater 
> [app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a83-9164-8e6d65d1fc57-StateUpdater-1]
>  State updater thread started 
> (org.apache.kafka.streams.processor.internals.DefaultStateUpdater:135)
> [2023-11-28 22:52:47,547] INFO [Producer 
> clientId=app-org.apache.kafka.streams.integration.PositionRestartIntegrationTest-true-true-IN_MEMORY_KV-DSL-456895e1-b230-4a8

Re: Having a Hard Time Unsubscribing

2023-11-28 Thread Matthias J. Sax
When you send an email to unsubscribe..., there should be a response 
from the bot. Check your spam folder.


You need to ack the unsubscribe by replying the the response email the 
bot sent.


HTH.


-Matthias

On 11/28/23 6:10 AM, Dasun Nirmitha wrote:

Hello Kafka devs,
I'm aware that it's a self-process to unsubscribe but I've sent multiple
emails to dev-unsubscr...@kafka.apache.org,
jira-unsubscr...@kafka.apache.org, users-unsubscr...@kafka.apache.org all
to no avail. Any Kafka admins here, could you guys kindly look into this
please? As the so-called self-process of emailing those addresses seems
completely ineffective at least to me.

Thanks & regards
Dasun



[jira] [Commented] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-11-28 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17790835#comment-17790835
 ] 

Matthias J. Sax commented on KAFKA-15768:
-

Thanks for the details [~vvcephei]!
{quote}In other words, it should return the result if and only if all queried 
partitions responded successfully AND at most one partition returned a non-null 
result.
{quote}
This was the unclear piece to me, ie, what's the actual user contract.

About exceptions vs FailedQueryResult – it seems we are in a somewhat mixed 
mode here – My understanding was, that IQv2 tries to avoid throwing exception, 
and that's why `FailedQueryResult` was introduced to begin with. Thus, I am 
wondering if `getOnlyPartitionResult` should ever throw an exception, or always 
return a `FailedQueryResult` (ie, one or multiple partition results returns 
`FailedQueryResult` and/or there was more than one non-empty successful query 
result)?
{quote}since the application isn't in an illegal state 
{quote}
Well, the `StateQueryResult` object is in an illegal state. I never interpreted 
IllegalState as a programming error, but just as "this object is corrupted – 
don't use it any longer"... The interpretation that `this` is an argument 
sounds a little bit far fetched to me... `getOnlyPartitionResult()` is a zero 
argument method IMHO... but yes, it might be philosophical, so let's not dig 
deeper (especially if we would agree to side step this question by not throwing 
an exception as all any longer...)

> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Hanyu Zheng
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if any result is a `FailedQueryResult` (and even 
> if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has no means to avoid getting the 
> underlying error otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15896) Flaky test: shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest

2023-11-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15896:

Labels: flaky-test  (was: )

> Flaky test: shouldQuerySpecificStalePartitionStores() – 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest
> --
>
> Key: KAFKA-15896
> URL: https://issues.apache.org/jira/browse/KAFKA-15896
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>  Labels: flaky-test
>
> Flaky test: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/
>  
>  
> {code:java}
> Error
> org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
> specified partition 1 for store source-table does not 
> exist.Stacktraceorg.apache.kafka.streams.errors.InvalidStateStorePartitionException:
>  The specified partition 1 for store source-table does not exist.  at 
> app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
> at 
> app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
>  at 
> app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores(StoreQueryIntegrationTest.java:347)
>   at 
> java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>  at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15896) Flaky test: shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest

2023-11-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15896:

Component/s: streams
 unit tests

> Flaky test: shouldQuerySpecificStalePartitionStores() – 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest
> --
>
> Key: KAFKA-15896
> URL: https://issues.apache.org/jira/browse/KAFKA-15896
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Apoorv Mittal
>Priority: Major
>
> Flaky test: 
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/
>  
>  
> {code:java}
> Error
> org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The 
> specified partition 1 for store source-table does not 
> exist.Stacktraceorg.apache.kafka.streams.errors.InvalidStateStorePartitionException:
>  The specified partition 1 for store source-table does not exist.  at 
> app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63)
> at 
> app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53)
>  at 
> app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores(StoreQueryIntegrationTest.java:347)
>   at 
> java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>  at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>  {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-8575) Investigate removing EAGER protocol & cleaning up task suspension in Streams rebalancing

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8575:
---
Fix Version/s: 4.0.0

> Investigate removing EAGER protocol &  cleaning up task suspension in Streams 
> rebalancing
> -
>
> Key: KAFKA-8575
> URL: https://issues.apache.org/jira/browse/KAFKA-8575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 4.0.0
>
>
> With KIP-429 the suspend/resume of tasks may have minimal gains while adding 
> a lot of complexity and potential bugs. We should consider removing/cleaning 
> it up and going a step further to remove the EAGER protocol from Streams 
> entirely.
> Plan to remove this in 3.1/4.0, whichever comes after 3.0. This will make 3.0 
> a bridge release for users upgrading from any version below 2.4, but they 
> will still be able to do so in the usual two rolling bounces.
>  
> *The upgrade path from 2.3 and below, to any \{to_version} higher than 3.1 
> will be:*
> 1. During the first rolling bounce, upgrade the jars to a version between 2.4 
> - 3.1 and add the UPGRADE_FROM config for whichever version you are upgrading 
> from
> 2. During the second rolling bounce, upgrade the jars to the desired 
> \{to_version} and remove the UPGRADE_FROM config
>  
> EAGER will be effectively deprecated in 3.0 but not removed until the next 
> version.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-8088) Deprecate `WindowStoreIterator` interface

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-8088:
---
Fix Version/s: 4.0.0

> Deprecate `WindowStoreIterator` interface
> -
>
> Key: KAFKA-8088
> URL: https://issues.apache.org/jira/browse/KAFKA-8088
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>            Reporter: Matthias J. Sax
>        Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 4.0.0
>
>
> The `WindowStore` interface has multiple methods to fetch() data. However, 
> the return types are mixed up. Two methods return `WindowStoreIterator` while 
> all others return `KeyValueIterator`.
> We should align the return types and replace `WindowStoreIterator` with 
> `KeyValueIterator`. For backward compatibility reasons we can only deprecate 
> the interface for now and remove it only later.
> KIP-439: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-439%3A+Deprecate+Interface+WindowStoreIterator]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-12281) Deprecate org.apache.kafka.streams.errors.BrokerNotFoundException

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-12281:

Labels: beginner needs-kip newbie  (was: needs-kip)

> Deprecate org.apache.kafka.streams.errors.BrokerNotFoundException
> -
>
> Key: KAFKA-12281
> URL: https://issues.apache.org/jira/browse/KAFKA-12281
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: beginner, needs-kip, newbie
>
> It's been 3 years since 234ec8a gets rid of usage of BrokerNotFoundException. 
> Hence, it is time to deprecate BrokerNotFoundException.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15768:

Description: 
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if any result is a `FailedQueryResult` (and even if 
there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has no means to avoid getting the underlying 
error otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)

  was:
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if any result is a `FailedQueryResult` (and even if 
there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)


> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>    Reporter: Matthias J. Sax
>Assignee: Hanyu Zheng
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if any result is a `FailedQueryResult` (and even 
> if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has no means to avoid getting the 
> underlying error otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-11-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15768:

Description: 
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if any result is a `FailedQueryResult` (and even if 
there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)

  was:
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the any result is a `FailedQueryResult` (and even 
if there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)


> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>    Reporter: Matthias J. Sax
>Assignee: Hanyu Zheng
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if any result is a `FailedQueryResult` (and even 
> if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has not means to avoid getting an exception 
> otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-21 Thread Matthias J. Sax

Thanks Sophie. Overall I agree with you.

I think 50% is too high as a general rule, and I believe something like 
30% might be more appropriate (going lower, given the infrastructure at 
hand might become too aggressive)?


The difficult part about a policy like this is, that we don't really 
have statistics about it, so it will be up to the reviewers to keep 
monitoring and raising an "alert" (on the dev mailing list?) if we see 
too many failing builds.


How to achieve this? Personally, I think it would be best to agree on a 
"bug bash sprint". Not sure if folks are willing to sign up for this? 
The main question might be "when"? We recently discuss test stability in 
our team, and don't see capacity now, but want to invest more in Q1. 
(Btw: this issue goes beyond the build but also affect system tests.)



How long do we have to take action in the above cases?


You propose one day, but that's tricky? In the end, if a test is flaky, 
we won't know right away? Would we?



How do we track failures?


I also did file tickets and added comments to test in the past when they 
did re-fail. While labor intensive, it seems it worked best in the past. 
Justine mentioned Gradle Enterprise (I did not look into it yet, but 
maybe it can help to reduce manual labor)?



How often does a test need to fail to be considered flaky enough to take action?


Guess it's a judgement call, and I don't have a strong opinion. But I 
agree, that we might want to write down a rule. But again, a rule only 
make sense if we have data. If reviewers don't pay attention and don't 
comment on tickets to we can count how often a test fails, any number we 
put down won't be helpful.



In the end, to me it boils down to the willingness of all of us to 
tackle it, and to _make time_ to address flaky tests. On our side (ie, 
KS teams at Confluent), we want to put more time aside for this in 
quarterly planning, but in the end, without reliable data it's hard to 
know which tests to spent time on for the biggest bang for the bug. 
Thus, to me the second corner stone is to put the manual labor into 
tracking the frequency of flaky tests, what is a group effort.



-Matthias


On 11/21/23 1:33 PM, Sophie Blee-Goldman wrote:

For some concrete data, here are the stats for the latest build on two
community PRs I am currently reviewing:

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14648/16/tests
- 18 unrelated test failures
- 13 unique tests
- only 1 out of 4 JDK builds were green with all tests passing

https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14735/4/tests
- 44(!) unrelated test failures
- not even going to count up the unique tests because there were too many
- 0 out of 4 JDK builds were green
- this particular build seemed to have a number of infra/timeout issues, so
it may have exacerbated the flakiness beyond the "usual", although as
others have noted, unstable infra is not uncommon

My point is, we clearly have a long way to go if we want to start enforcing
this policy and have any hope of merging any PRs and not driving away our
community of contributors.

This isn't meant to discourage anyone, actually the opposite: if we want to
start gating PRs on passing builds, we need to start tackling flaky tests
now!

On Tue, Nov 21, 2023 at 1:19 PM Sophie Blee-Goldman 
wrote:


In the interest of moving things forward, here is what we would need (in
my opinion) to start enforcing this:

1. Get overall build failure percentage under a certain threshold
   1. What is an acceptable number here?
   2. How do we achieve this: wait until all of them are fixed,
   disable everything that's flaky right away, etc
2. Come up with concrete policy rules so there's no confusion. I think
we need to agree on answers for these questions at least:
1. What happens if a new PR introduces a new test that is revealed to
   be flaky?
   2. What happens if a new PR makes an old test become flaky?
   3. How long do we have to take action in the above cases?
   4. How do we track failures?
   5. How often does a test need to fail to be considered flaky enough
   to take action?

Here's my take on these questions, but would love to hear from others:

1.1) Imo the failure rate has to be under 50% at the very highest. At 50
we still have half the builds failing, but 75% would pass after just one
retry, and 87.5% after two retries. Is that acceptable? Maybe we should aim
to kick things off with a higher success rate to give ourselves some wiggle
room over time, and have 50% be the absolute maximum failure rate -- if it
ever gets beyond that we trigger an emergency response (whether that be
blocking all feature work until the test failures are addressed, ending
this policy, etc)
1.2) I think we'd probably all agree that there's no way we'll be able to
triage all of the currently flaky tests in a reasonable time frame, but I'm
also wary of 

Re: [VOTE] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-21 Thread Matthias J. Sax

+1 (binding)

On 11/21/23 4:52 AM, Lucas Brutschy wrote:

Hi Alieh,

thanks for the KIP!

+1 (binding)

Lucas

On Tue, Nov 21, 2023 at 11:26 AM Alieh Saeedi
 wrote:


Thanks, Matthias; I changed it to `ANY` which is the shortest and not
misleading.

Cheers,
Alieh

On Mon, Nov 20, 2023 at 7:42 PM Matthias J. Sax  wrote:


Adding an enum is a good idea!

Wondering if `UNORDERED` is the best name? Want to avoid bike shedding,
just asking.

We could also use `UNDEFINED` / `UNSPECIFIED` / `NONE` / `ANY` ?

In the end, the result _might_ be ordered, we just don't guarantee any
order.


-Matthias

On 11/20/23 9:17 AM, Alieh Saeedi wrote:

Hi all,
I added the public enum `ResultOrder` to the KIP which helps with keeping
three values (unordered, ascending, and descending) for the query

results.

Therefore the method `isAscending()` is changed to `resultOrder()` which
returns either the user specified result order or `unorderd`.
Cheers,
Alieh

On Mon, Nov 20, 2023 at 1:40 PM Alieh Saeedi 

wrote:



Thank you, Guozhag and Bruno, for reviewing the KIP and reading the

whole

discussion thread. I appreciate your help:)
The KIP is now corrected and updated.

Cheers,
Alieh

On Mon, Nov 20, 2023 at 10:43 AM Bruno Cadonna 

wrote:



Thanks Alieh,

I am +1 (binding).

However, although we agreed on not specifying an order of the results

by

default, there is still the following  sentence in the KIP:

"The order of the returned records is by default ascending by

timestamp.

The method withDescendingTimestamps() can reverse the order. Btw,
withAscendingTimestamps() method can be used for code readability
purpose. "

Could you please change it and also fix what Guozhang commented?

Best,
Bruno

On 11/19/23 2:12 AM, Guozhang Wang wrote:

Thanks Alieh,

I read through the wiki page and the DISCUSS thread, all LGTM except a
minor thing in javadoc:

"The query returns the records with a global ascending order of keys.
The records with the same key are ordered based on their insertion
timestamp in ascending order. Both the global and partial ordering are
modifiable with the corresponding methods defined for the class."

Since this KIP is only for a single key, there's no key ordering but
only timestamp ordering right? Maybe the javadoc can be updated
accordingly.

Otherwise, LGTM.

On Fri, Nov 17, 2023 at 2:36 AM Alieh Saeedi
 wrote:


Hi all,
Following my recent message in the discussion thread, I am opening

the

voting for KIP-968. Thanks for your votes in advance.

Cheers,
Alieh










Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-21 Thread Matthias J. Sax

Thanks! SGTM.

Seems all open questions are resolved. Thanks for pushing this through!

-Matthias

On 11/21/23 2:29 AM, Alieh Saeedi wrote:

Yes Matthias,
Based on the discussion we had, it has now been changed to Optional and the
default is empty (for the latest). Also, the `validTo()` method returns an
Optional.

Cheers,
Alieh

On Mon, Nov 20, 2023 at 7:38 PM Matthias J. Sax  wrote:


I think we should also discuss a little more about `validTo()` method?

Given that "latest" version does not have a valid-to TS, should we
change the return type to `Optional` and return `empty()` for "latest"?

ATM the KIP uses `MAX_VALUE` for "latest" what seems to be less clean?
We could also use `-1` (unknown), but both might be less expressive than
`Optional`?


-Matthias

On 11/20/23 1:59 AM, Bruno Cadonna wrote:

Hi Alieh,

Although, I've already voted, I found a minor miss. You should also add
a method isDescending() since the results could also be unordered now
that we agreed that the results are unordered by default. If both --
isDescending() and isAscending -- are false neither
withDescendingTimestamps() nor withAscendingTimestamps() was called.

Best,
Bruno

On 11/17/23 11:25 AM, Alieh Saeedi wrote:

Hi all,
Thank you for the feedback.

So we agreed on no default ordering for keys and TSs. So I must provide
both withAscendingXx() and with DescendingXx() for the class.
Apart from that, I think we can either remove the existing constructor
for
the `VersionedRecord` class or follow the `Optional` thing.

Since many hidden aspects of the KIP are quite clear now and we have

come

to a consensus about them, I think it 's time to vote ;-)
I look forward to your votes. Thanks a lot.

Cheers,
Alieh

On Fri, Nov 17, 2023 at 2:27 AM Matthias J. Sax 

wrote:



Thanks, Alieh.

Overall SGTM. About `validTo` -- wondering if we should make it an
`Optional` and set to `empty()` by default?

I am totally ok with going with the 3-way option about ordering using
default "undefined". For this KIP (as it's all net new) nothing really
changes. -- However, we should amend `RangeQuery`/KIP-985 to align it.

Btw: so far we focused on key-ordering, but I believe the same

"ordering

undefined by default" would apply to time-ordering, too? This might
affect KIP-997, too.


-Matthias

On 11/16/23 12:51 AM, Bruno Cadonna wrote:

Hi,

80)
We do not keep backwards compatibility with IQv1, right? I would even
say that currently we do not need to keep backwards compatibility

among

IQv2 versions since we marked the API "Evolving" (do we only mean code
compatibility here or also behavioral compatibility?). I propose to

try

to not limit ourselves for backwards compatibility that we explicitly
marked as evolving.
I re-read the discussion on KIP-985. In that discussion, we were quite
focused on what the state store provides. I see that for range

queries,

we have methods on the state store interface that specify the order,
but
that should be kind of orthogonal to the IQv2 query type. Let's assume
somebody in the future adds a state store implementation that is not
order based. To account for use cases where the order does not matter,
this person might also add a method to the state store interface that
does not guarantee any order. However, our range query type is
specified
to guarantee order by default. So we need to add something like
withNoOrder() to the query type to allow the use cases that does not
need order and has the better performance in IQ. That does not look
very
nice to me. Having the no-order-guaranteed option does not cost us
anything and it keeps the IQv2 interface flexible. I assume we want to
drop the Evolving annotation at some point.
Sorry for not having brought this up in the discussion about KIP-985.

Best,
Bruno





On 11/15/23 6:56 AM, Matthias J. Sax wrote:

Just catching up on this one.


50) I am also in favor of setting `validTo` in VersionedRecord for
single-key single-ts lookup; it seems better to return the proper
timestamp. The timestamp is already in the store and it's cheap to
extract it and add to the result, and it might be valuable

information

for the user. Not sure though if we should deprecate the existing
constructor though, because for "latest" it's convenient to have?


60) Yes, I meant `VersionedRecord`. Sorry for the mixup.


80) We did discuss this question on KIP-985 (maybe you missed it
Bruno). It's kinda tricky.

Historically, it seems that IQv1, ie, the `ReadOnlyXxx` interfaces
provide a clear contract that `range()` is ascending and
`reverseRange()` is descending.

For `RangeQuery`, the question is, if we did implicitly inherit this
contract? Our conclusion on KIP-985 discussion was, that we did
inherit it. If this holds true, changing the contract would be a
breaking change (what might still be acceptable, given that the
interface is annotated as unstable, and that IQv2 is not widely
adopted yet). I am happy to go

Re: [VOTE] KIP-1000: List Client Metrics Configuration Resources

2023-11-20 Thread Matthias J. Sax

+1 (binding)

On 11/20/23 1:42 PM, Jason Gustafson wrote:

+1 Thanks for the KIP!

On Mon, Nov 20, 2023 at 9:31 AM Jun Rao  wrote:


Hi, Andrew,

Thanks for the KIP. +1

Jun

On Thu, Nov 16, 2023 at 9:12 AM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi Apoorv,
Thanks for your vote.

Initially, I put support for zkBroker in order to be able to control the
error response in this case.
I have validated the error handling for this RPC on a ZK cluster in which
the RPC is not supported,
and the error is entirely understandable. Consequently, I have removed
`zkBroker` for this new RPC.

Thanks,
Andrew


On 16 Nov 2023, at 13:51, Apoorv Mittal 

wrote:


Thanks a lot for writing the KIP Andrew. This is much required to list

all

configured client metrics resources.

I have one minor question related to the zkBroker listener in the new

RPC.

As the client-metrics resource is not supported in Zookeeper mode hence
shouldn't we disallow ListClientMetricsResourcesRequest for
Zookeper in the APIVersion request itself?

+1(non-binding)

Regards,
Apoorv Mittal
+44 7721681581


On Wed, Nov 15, 2023 at 4:58 PM Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:


Hi,
I’d like to start the voting for KIP-1000: List Client Metrics
Configuration Resources.






https://cwiki.apache.org/confluence/display/KAFKA/KIP-1000%3A+List+Client+Metrics+Configuration+Resources


Thanks,
Andrew









Re: [VOTE] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-20 Thread Matthias J. Sax

Adding an enum is a good idea!

Wondering if `UNORDERED` is the best name? Want to avoid bike shedding, 
just asking.


We could also use `UNDEFINED` / `UNSPECIFIED` / `NONE` / `ANY` ?

In the end, the result _might_ be ordered, we just don't guarantee any 
order.



-Matthias

On 11/20/23 9:17 AM, Alieh Saeedi wrote:

Hi all,
I added the public enum `ResultOrder` to the KIP which helps with keeping
three values (unordered, ascending, and descending) for the query results.
Therefore the method `isAscending()` is changed to `resultOrder()` which
returns either the user specified result order or `unorderd`.
Cheers,
Alieh

On Mon, Nov 20, 2023 at 1:40 PM Alieh Saeedi  wrote:


Thank you, Guozhag and Bruno, for reviewing the KIP and reading the whole
discussion thread. I appreciate your help:)
The KIP is now corrected and updated.

Cheers,
Alieh

On Mon, Nov 20, 2023 at 10:43 AM Bruno Cadonna  wrote:


Thanks Alieh,

I am +1 (binding).

However, although we agreed on not specifying an order of the results by
default, there is still the following  sentence in the KIP:

"The order of the returned records is by default ascending by timestamp.
The method withDescendingTimestamps() can reverse the order. Btw,
withAscendingTimestamps() method can be used for code readability
purpose. "

Could you please change it and also fix what Guozhang commented?

Best,
Bruno

On 11/19/23 2:12 AM, Guozhang Wang wrote:

Thanks Alieh,

I read through the wiki page and the DISCUSS thread, all LGTM except a
minor thing in javadoc:

"The query returns the records with a global ascending order of keys.
The records with the same key are ordered based on their insertion
timestamp in ascending order. Both the global and partial ordering are
modifiable with the corresponding methods defined for the class."

Since this KIP is only for a single key, there's no key ordering but
only timestamp ordering right? Maybe the javadoc can be updated
accordingly.

Otherwise, LGTM.

On Fri, Nov 17, 2023 at 2:36 AM Alieh Saeedi
 wrote:


Hi all,
Following my recent message in the discussion thread, I am opening the
voting for KIP-968. Thanks for your votes in advance.

Cheers,
Alieh








Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-20 Thread Matthias J. Sax

I think we should also discuss a little more about `validTo()` method?

Given that "latest" version does not have a valid-to TS, should we 
change the return type to `Optional` and return `empty()` for "latest"?


ATM the KIP uses `MAX_VALUE` for "latest" what seems to be less clean? 
We could also use `-1` (unknown), but both might be less expressive than 
`Optional`?



-Matthias

On 11/20/23 1:59 AM, Bruno Cadonna wrote:

Hi Alieh,

Although, I've already voted, I found a minor miss. You should also add 
a method isDescending() since the results could also be unordered now 
that we agreed that the results are unordered by default. If both -- 
isDescending() and isAscending -- are false neither 
withDescendingTimestamps() nor withAscendingTimestamps() was called.


Best,
Bruno

On 11/17/23 11:25 AM, Alieh Saeedi wrote:

Hi all,
Thank you for the feedback.

So we agreed on no default ordering for keys and TSs. So I must provide
both withAscendingXx() and with DescendingXx() for the class.
Apart from that, I think we can either remove the existing constructor 
for

the `VersionedRecord` class or follow the `Optional` thing.

Since many hidden aspects of the KIP are quite clear now and we have come
to a consensus about them, I think it 's time to vote ;-)
I look forward to your votes. Thanks a lot.

Cheers,
Alieh

On Fri, Nov 17, 2023 at 2:27 AM Matthias J. Sax  wrote:


Thanks, Alieh.

Overall SGTM. About `validTo` -- wondering if we should make it an
`Optional` and set to `empty()` by default?

I am totally ok with going with the 3-way option about ordering using
default "undefined". For this KIP (as it's all net new) nothing really
changes. -- However, we should amend `RangeQuery`/KIP-985 to align it.

Btw: so far we focused on key-ordering, but I believe the same "ordering
undefined by default" would apply to time-ordering, too? This might
affect KIP-997, too.


-Matthias

On 11/16/23 12:51 AM, Bruno Cadonna wrote:

Hi,

80)
We do not keep backwards compatibility with IQv1, right? I would even
say that currently we do not need to keep backwards compatibility among
IQv2 versions since we marked the API "Evolving" (do we only mean code
compatibility here or also behavioral compatibility?). I propose to try
to not limit ourselves for backwards compatibility that we explicitly
marked as evolving.
I re-read the discussion on KIP-985. In that discussion, we were quite
focused on what the state store provides. I see that for range queries,
we have methods on the state store interface that specify the order, 
but

that should be kind of orthogonal to the IQv2 query type. Let's assume
somebody in the future adds a state store implementation that is not
order based. To account for use cases where the order does not matter,
this person might also add a method to the state store interface that
does not guarantee any order. However, our range query type is 
specified

to guarantee order by default. So we need to add something like
withNoOrder() to the query type to allow the use cases that does not
need order and has the better performance in IQ. That does not look 
very

nice to me. Having the no-order-guaranteed option does not cost us
anything and it keeps the IQv2 interface flexible. I assume we want to
drop the Evolving annotation at some point.
Sorry for not having brought this up in the discussion about KIP-985.

Best,
Bruno





On 11/15/23 6:56 AM, Matthias J. Sax wrote:

Just catching up on this one.


50) I am also in favor of setting `validTo` in VersionedRecord for
single-key single-ts lookup; it seems better to return the proper
timestamp. The timestamp is already in the store and it's cheap to
extract it and add to the result, and it might be valuable information
for the user. Not sure though if we should deprecate the existing
constructor though, because for "latest" it's convenient to have?


60) Yes, I meant `VersionedRecord`. Sorry for the mixup.


80) We did discuss this question on KIP-985 (maybe you missed it
Bruno). It's kinda tricky.

Historically, it seems that IQv1, ie, the `ReadOnlyXxx` interfaces
provide a clear contract that `range()` is ascending and
`reverseRange()` is descending.

For `RangeQuery`, the question is, if we did implicitly inherit this
contract? Our conclusion on KIP-985 discussion was, that we did
inherit it. If this holds true, changing the contract would be a
breaking change (what might still be acceptable, given that the
interface is annotated as unstable, and that IQv2 is not widely
adopted yet). I am happy to go with the 3-option contract, but just
want to ensure we all agree it's the right way to go, and we are
potentially willing to pay the price of backward incompatibility.




Do we need a snapshot semantic or can we specify a weaker but still
useful semantic?


I don't think we necessarily need it, but as pointed out by Lucas, all
existing queries provide it. Overall, my main point is r

Re: [DISCUSS] KIP-997 Support fetch(fromKey, toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and WindowRangeQuery

2023-11-16 Thread Matthias J. Sax

Thanks for the KIP.

Given how `WindowRangeQuery` works right now, it's really time to 
improve it.



1) Agree. It's not clear what will be added right now. I think we should 
deprecate existing `getKey()` w/o an actually replacement? For 
`getFromKey` and `getToKey` we should actually be `lowerKeyBound()` and 
`upperKeyBound()` to align to KIP-969?


Also wondering if we should deprecate existing `withKey()` and 
`withWindowStartRange`? `withKey` only works for SessionStores and 
implements a single-key full-time-range query. Similarly, 
`withWindowStartRange` only works for WindowedStore and implements an 
all-key time-range query. Thus, both are rather special and it seems the 
aim of this KIP is to generalize `WindowRangeQuery` to arbitrary 
key-range/time-range queries?


What raises one question about time-range semantics, given that we query 
windows with different semantics.


 - The current `WindowStore` semantics used for 
`WindowRangeQuery#withWindowStartRange` is considering only the window 
start time, ie, the window-start time must fall into the query 
time-range to be returned.


 - In contrast, `SessionStore` time ranges base on `findSession` use 
earliest-session-end-time and latest-session-end-time and thus implement 
an "window-bounds / search-time-range overlap query".


Is there any concern about semantic differences? I would also be 
possible to use the same semantics for both query types, and maybe even 
let the user pick with semantics they want (let users chose might 
actually be the best thing to do)? -- We can also do this incrementally, 
and limit the scope of this KIP (or keep the full KIP scope but 
implement it incrementally only)


Btw: I think we should not add any ordering at this point, and 
explicitly state that no ordering is guarantee whatsoever at this point.




2) Agreed. We should deprecate `getFromTime` and `getToTime` and add new 
method `fromTime` and `toTime`.




3) About the API. If we move forward with general key-range/time-range I 
agree that a more modular approach might be nice. Not sure right now, 
what the best approach would be for this? Looking into KIP-969, we might 
want to have:


 - static withKeyRange
 - static withLowerKeyBound
 - static withUpperKeyBound
 - static withAllKeys (KIP-969 actually uses `allKeys` ?)
 - fromTime
 - toTime

with default-time range would be "all / unbounded" ?



10: you mentioned that `WindowKeyQuery` functionality can be covered by 
`WindowRangeQuery`. I agree. For this case, it seems we want to 
deprecate `WindowKeyQuery` entirely?




-Matthias

On 11/16/23 1:19 AM, Bruno Cadonna wrote:

Hi Hanyu,

Thanks for the KIP!

1)
Could you please mark the pieces that you want to add to the API in the 
code listing in the KIP? You can add a comment like "// newly added" or 
similar. That would make reading the KIP a bit easier because one does 
not need to compare your code with the code in the current codebase.


2)
Could you -- as a side cleanup -- also change the getters to not use the 
get-prefix anymore, please? That was apparently an oversight when those 
methods were added. Although the API is marked as Evolving, I think we 
should still deprecate the getX() methods, since it does not cost us 
anything.


3)
I propose to make the API a bit more fluent. For example, something like

WindowRangeQuery.withKey(key).fromTime(t1).toTime(t2)

and

WindowRangeQuery.withAllKeys().fromTime(t1).toTime(t2)

and

WindowRangeQuery.withKeyRange(key1, key2).fromTime(t1).toTime(t2)

and maybe even in addition to the above add also the option to start 
with the time range


WindowRangeQuery.withWindowStartRange(t1, t2).fromKey(key1).toKey(key2)


4)
Could you also add some usage examples? Alieh did quite a nice job 
regarding usage examples in KIP-986.



Best,
Bruno

On 11/8/23 8:02 PM, Hanyu (Peter) Zheng wrote:

Hello everyone,

I would like to start the discussion for KIP-997: Support fetch(fromKey,
toKey, from, to) to WindowRangeQuery and unify WindowKeyQuery and
WindowRangeQuery
The KIP can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-997%3A++Support+fetch%28fromKey%2C+toKey%2C+from%2C+to%29+to+WindowRangeQuery+and+unify+WindowKeyQuery+and+WindowRangeQuery

Any suggestions are more than welcome.

Many thanks,
Hanyu

On Wed, Nov 8, 2023 at 10:38 AM Hanyu (Peter) Zheng 
wrote:



https://cwiki.apache.org/confluence/display/KAFKA/KIP-997%3A++Support+fetch%28fromKey%2C+toKey%2C+from%2C+to%29+to+WindowRangeQuery+and+unify+WindowKeyQuery+and+WindowRangeQuery

--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
[image:
Twitter] [image: LinkedIn]
[image: Slack]

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-16 Thread Matthias J. Sax

Thanks, Alieh.

Overall SGTM. About `validTo` -- wondering if we should make it an 
`Optional` and set to `empty()` by default?


I am totally ok with going with the 3-way option about ordering using 
default "undefined". For this KIP (as it's all net new) nothing really 
changes. -- However, we should amend `RangeQuery`/KIP-985 to align it.


Btw: so far we focused on key-ordering, but I believe the same "ordering 
undefined by default" would apply to time-ordering, too? This might 
affect KIP-997, too.



-Matthias

On 11/16/23 12:51 AM, Bruno Cadonna wrote:

Hi,

80)
We do not keep backwards compatibility with IQv1, right? I would even 
say that currently we do not need to keep backwards compatibility among 
IQv2 versions since we marked the API "Evolving" (do we only mean code 
compatibility here or also behavioral compatibility?). I propose to try 
to not limit ourselves for backwards compatibility that we explicitly 
marked as evolving.
I re-read the discussion on KIP-985. In that discussion, we were quite 
focused on what the state store provides. I see that for range queries, 
we have methods on the state store interface that specify the order, but 
that should be kind of orthogonal to the IQv2 query type. Let's assume 
somebody in the future adds a state store implementation that is not 
order based. To account for use cases where the order does not matter, 
this person might also add a method to the state store interface that 
does not guarantee any order. However, our range query type is specified 
to guarantee order by default. So we need to add something like 
withNoOrder() to the query type to allow the use cases that does not 
need order and has the better performance in IQ. That does not look very 
nice to me. Having the no-order-guaranteed option does not cost us 
anything and it keeps the IQv2 interface flexible. I assume we want to 
drop the Evolving annotation at some point.

Sorry for not having brought this up in the discussion about KIP-985.

Best,
Bruno





On 11/15/23 6:56 AM, Matthias J. Sax wrote:

Just catching up on this one.


50) I am also in favor of setting `validTo` in VersionedRecord for 
single-key single-ts lookup; it seems better to return the proper 
timestamp. The timestamp is already in the store and it's cheap to 
extract it and add to the result, and it might be valuable information 
for the user. Not sure though if we should deprecate the existing 
constructor though, because for "latest" it's convenient to have?



60) Yes, I meant `VersionedRecord`. Sorry for the mixup.


80) We did discuss this question on KIP-985 (maybe you missed it 
Bruno). It's kinda tricky.


Historically, it seems that IQv1, ie, the `ReadOnlyXxx` interfaces 
provide a clear contract that `range()` is ascending and 
`reverseRange()` is descending.


For `RangeQuery`, the question is, if we did implicitly inherit this 
contract? Our conclusion on KIP-985 discussion was, that we did 
inherit it. If this holds true, changing the contract would be a 
breaking change (what might still be acceptable, given that the 
interface is annotated as unstable, and that IQv2 is not widely 
adopted yet). I am happy to go with the 3-option contract, but just 
want to ensure we all agree it's the right way to go, and we are 
potentially willing to pay the price of backward incompatibility.




Do we need a snapshot semantic or can we specify a weaker but still 
useful semantic? 


I don't think we necessarily need it, but as pointed out by Lucas, all 
existing queries provide it. Overall, my main point is really about 
not implementing something "random", but defining a proper binding 
contract that allows users to reason about it.


I general, I agree that weaker semantics might be sufficient, but I am 
not sure if we can implement anything weaker in a reasonable way? 
Happy to be convinced otherwise. (I have some example, that I will 
omit for now, as I hope we can actually go with snapshot semantics.)


The RocksDB Snaptshot idea from Lucas sounds very promising. I was not 
aware that we could do this with RocksDB (otherwise I might have 
suggested it on the PR right away). I guess the only open question 
remaining would be, if we can provide the same guarantees for a future 
in-memory implementation for VersionedStores? It sounds possible to 
do, but we should have some level of confidence about it?



-Matthias

On 11/14/23 6:33 AM, Lucas Brutschy wrote:

Hi Alieh,

I agree with Bruno that a weaker guarantee could be useful already,
and it's anyway possible to strengthen the guarantees later on. On the
other hand, it would be nice to provide a consistent view across all
segments if it doesn't come with major downsides, because until now IQ
does provide a consistent view (via iterators), and this would be the
first feature that diverges from that guarantee.

I think a consistent could be achieved relatively easily by creating a
snapshot 

Re: [VOTE] KIP-997: Partition-Level Throughput Metrics

2023-11-16 Thread Matthias J. Sax

This is KIP-977, right? Not as the subject says.

Guess we won't be able to fix this now. Hope it does not cause confusion 
down the line...



-Matthias

On 11/16/23 4:43 AM, Kamal Chandraprakash wrote:

+1 (non-binding). Thanks for the KIP!

On Thu, Nov 16, 2023 at 9:00 AM Satish Duggana 
wrote:


Thanks Qichao for the KIP.

+1 (binding)

~Satish.

On Thu, 16 Nov 2023 at 02:20, Jorge Esteban Quilcate Otoya
 wrote:


Qichao, thanks again for leading this proposal!

+1 (non-binding)

Cheers,
Jorge.

On Wed, 15 Nov 2023 at 19:17, Divij Vaidya 

wrote:



+1 (binding)

I was involved in the discussion thread for this KIP and support it in

its

current form.

--
Divij Vaidya



On Wed, Nov 15, 2023 at 10:55 AM Qichao Chu 
wrote:


Hi all,

I'd like to call a vote for KIP-977: Partition-Level Throughput

Metrics.


Please take a look here:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-977%3A+Partition-Level+Throughput+Metrics


Best,
Qichao Chu









[jira] [Resolved] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-11-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15346.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.7.0
>
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15346) Single-Key_single-timestamp IQs with versioned state stores

2023-11-15 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15346.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Single-Key_single-timestamp IQs with versioned state stores
> ---
>
> Key: KAFKA-15346
> URL: https://issues.apache.org/jira/browse/KAFKA-15346
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 3.7.0
>
>
> [KIP-960|https://cwiki.apache.org/confluence/display/KAFKA/KIP-960%3A+Support+single-key_single-timestamp+interactive+queries+%28IQv2%29+for+versioned+state+stores]
> This ticket covers just two query types:
> *Key Queries with single timestamp:*
>  # single-key latest-value lookup
>  # single-key lookup with timestamp (upper) bound



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15834) Subscribing to non-existent topic blocks StreamThread from stopping

2023-11-15 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786564#comment-17786564
 ] 

Matthias J. Sax commented on KAFKA-15834:
-

Thank for reporting and such a detailed analysis, [~gharris1727]! 

> Subscribing to non-existent topic blocks StreamThread from stopping
> ---
>
> Key: KAFKA-15834
> URL: https://issues.apache.org/jira/browse/KAFKA-15834
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Priority: Major
>
> In 
> NamedTopologyIntegrationTest#shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics
>  a topology is created which references an input topic which does not exist. 
> The test as-written passes, but the KafkaStreams#close(Duration) at the end 
> times out, and leaves StreamsThreads running.
> From some cursory investigation it appears that this is happening:
> 1. The consumer calls the StreamsPartitionAssignor, which calls 
> TaskManager#handleRebalanceStart as a side-effect
> 2. handleRebalanceStart sets the rebalanceInProgress flag
> 3. This flag is checked by StreamThread.runLoop, and causes the loop to 
> remain running.
> 4. The consumer never calls StreamsRebalanceListener#onPartitionsAssigned, 
> because the topic does not exist
> 5. Because no partitions are ever assigned, the 
> TaskManager#handleRebalanceComplete never clears the rebalanceInProgress flag
>  
> This log message is printed in a tight loop while the close is ongoing and 
> the consumer is being polled with zero duration:
> {noformat}
> [2023-11-15 11:42:43,661] WARN [Consumer 
> clientId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics-942756f8-5213-4c44-bb6b-5f805884e026-StreamThread-1-consumer,
>  
> groupId=NamedTopologyIntegrationTestshouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics]
>  Received unknown topic or partition error in fetch for partition 
> unique_topic_prefix-topology-1-store-repartition-0 
> (org.apache.kafka.clients.consumer.internals.FetchCollector:321)
> {noformat}
> Practically, this means that this test leaks two StreamsThreads and the 
> associated clients and sockets, and delays the completion of the test until 
> the KafkaStreams#close(Duration) call times out.
> Either we should change the rebalanceInProgress flag to avoid getting stuck 
> in this rebalance state, or figure out a way to shut down a StreamsThread 
> that is in an extended rebalance state during shutdown.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-14 Thread Matthias J. Sax
I agree on the test gap argument. However, my worry is, if we don't 
"force the pain", it won't get fixed at all. -- I also know, that we try 
to find an working approach for many years...


My take is that if we disable a test and file a non-blocking Jira, it's 
basically the same as just deleting the test all together and never talk 
about it again. -- I believe, this is not want we aim for, but we aim 
for good test coverage and a way to get these test fixed?


Thus IMHO we need some forcing function (either keep the tests and feel 
the pain on every PR), or disable the test and file a blocker JIRA so 
the pain surfaces on a release forcing us to do something about it.


If there is no forcing function, it basically means we are willing to 
accept test gaps forever.



-Matthias

On 11/14/23 9:09 PM, Ismael Juma wrote:

Matthias,

Flaky tests are worse than useless. I know engineers find it hard to
disable them because of the supposed test gap (I find it hard too), but the
truth is that the test gap is already there! No-one blocks merging PRs on
flaky tests, but they do get used to ignoring build failures.

The current approach has been attempted for nearly a decade and it has
never worked. I think we should try something different.

When it comes to marking flaky tests as release blockers, I don't think
this should be done as a general rule. We should instead assess on a case
by case basis, same way we do it for bugs.

Ismael

On Tue, Nov 14, 2023 at 5:02 PM Matthias J. Sax  wrote:


Thanks for starting this discussion David! I totally agree to "no"!

I think there is no excuse whatsoever for merging PRs with compilation
errors (except an honest mistake for conflicting PRs that got merged
interleaved). -- Every committer must(!) check the Jenkins status before
merging to avoid such an issue.

Similar for actual permanently broken tests. If there is no green build,
and the same test failed across multiple Jenkins runs, a committer
should detect this and cannot merge a PR.

Given the current state of the CI pipeline, it seems possible to get
green runs, and thus I support the policy (that we actually always had)
to only merge if there is at least one green build. If committers got
sloppy about this, we need to call it out and put a hold on this practice.

(The only exception from the above policy would be a very unstable
status for which getting a green build is not possible at all, due to
too many flaky tests -- for this case, I would accept to merge even
there is no green build, but committer need to manually ensure that
every test did pass in at least one test run. -- We had this in the
past, but we I don't think we are in such a bad situation right now).

About disabling tests: I was never a fan of this, because in my
experience those tests are not fixed any time soon. Especially, because
we do not consider such tickets as release blockers. To me, seeing tests
fails on PR build is actually a good forcing function for people to feel
the pain, and thus get motivated to make time to fix the tests.

I have to admit that I was a little bit sloppy paying attention to flaky
tests recently, and I highly appreciate this effort. Also thanks to
everyone how actually filed a ticket! IMHO, we should file a ticket for
every flaky test, and also keep adding comments each time we see a test
fail to be able to track the frequency at which a tests fails, so we can
fix the most pressing ones first.

Te me, the best forcing function to get test stabilized is to file
tickets and consider them release blockers. Disabling tests does not
really help much IMHO to tackle the problem (we can of course still
disable them to get noise out of the system, but it would only introduce
testing gaps for the time being and also does not help to figure out how
often a test fails, so it's not a solution to the problem IMHO)


-Matthias

On 11/13/23 11:40 PM, Sagar wrote:

Hi Divij,

I think this proposal overall makes sense. My only nit sort of a

suggestion

is that let's also consider a label called newbie++[1] for flaky tests if
we are considering adding newbie as a label. I think some of the flaky
tests need familiarity with the codebase or the test setup so as a first
time contributor, it might be difficult. newbie++ IMO covers that aspect.

[1]


https://issues.apache.org/jira/browse/KAFKA-15406?jql=project%20%3D%20KAFKA%20AND%20labels%20%3D%20%22newbie%2B%2B%22


Let me know what you think.

Thanks!
Sagar.

On Mon, Nov 13, 2023 at 9:11 PM Divij Vaidya 
wrote:


   Please, do it.

We can use specific labels to effectively filter those tickets.

We already have a label and a way to discover flaky tests. They are

tagged

with the label "flaky-test" [1]. There is also a label "newbie" [2]

meant

for folks who are new to Apache Kafka code base.
My suggestion is to send a broader email to the community (since many

will

miss details in this thread) and call for action for committers to
volun

Re: [DISCUSS] KIP-968: Support single-key_multi-timestamp interactive queries (IQv2) for versioned state stores

2023-11-14 Thread Matthias J. Sax
hould not have
 inconsistencies (he actually hates them:D). Shall I change my KIP
or Hanyu?
 Thoughts?


That would be maybe helpful to look into the PR
<https://github.com/apache/kafka/pull/14626> for more clarity and even
review that ;-)

Cheers,
Alieh

On Thu, Nov 2, 2023 at 7:13 PM Bruno Cadonna  wrote:


Hi Alieh,

First of all, I like the examples.

Is validTo in VersionedRecord exclusive or inclusive?
In the javadocs you write:

"@param validTothe latest timestamp that value is valid"

I think that is not true because the validity is defined by the start
time of the new version. The new and the old version cannot both be
valid at that same time.

Theoretically, you could set validTo to the start time of the new
version - 1. However, what is the unit of the 1? Is it nanoseconds?
Milliseconds? Seconds? Sure we could agree on one, but I think it is
more elegant to just make the validTo exclusive. Actually, you used it
as exclusive in your examples.


Thanks for the KIP!

Best,
Bruno

On 11/1/23 9:01 PM, Alieh Saeedi wrote:

Hi all,
@Matthias: I think Victoria was right. I must add the method `get(key,
fromTime, toTime)` to the interface `VersionedKeyValueStore`. Right
now,
having the method only in `RocksDBVersionedStore`, made me to have an
instance of `RocksDBVersionedStore` (instead of
`VersionedKeyValueStore`)
in `StoreQueryUtils.runMultiVersionedKeyQuery()` method. In future, we

are

going to use the same method for In-memory/SPDB/blaBla versioned
stores.
Then either this method won't work any more, or we have to add code (if
clauses) for each type of versioned stores. What do you think about
that?

Bests,
Alieh

On Tue, Oct 24, 2023 at 10:01 PM Alieh Saeedi 

wrote:



Thank you, Matthias, Bruno, and Guozhang for keeping the discussion

going.


Here is the list of changes I made:

  1. I enriched the "Example" section as Bruno suggested. Do you

please

  have a look at that section? I think I devised an interesting one

;-)

  2. As Matthias and Guozhang suggested, I renamed variables and

methods

  as follows:
 - "fromTimestamp" -> "fromTime"
 - "asOfTimestamp" -> "toTime"
 - "from(Instant fromTime)" -> "fromTime(Instant fromTime)"
 - "asOf(Instant toTime)" -> "toTime(Instant toTime)"
  3.

  About throwing an NPE when time bounds are `null`: Ok,
Matthias, I

am

  convinced by your idea. I consider a null timestamp as "no
bound".

But I

  had to change KIP-960 and the corresponding PR to be
consistent as

well.


Answering questions and some more discussion points:

  1.

  For now, I keep the class names as they are.
  2.

  About the new field "validTo" in VersionedRecord. Yes Matthias I

keep

  the old constructor, which does not have `validTo` as an input

parameter.

  But in the body of the old constructor, I consider the default

value MAX

  for the validTo field. I think MAX must be the default value for

`validTo`

  since before inserting a tombstone or a new value for the same
key,

the

  value must be valid till iternity.
  3.

  Regarding changing the ValueIterator interface from `public

interface

  ValueIterator extends Iterator` to `public interface
  ValueIterator extends Iterator>`:
Matthias, I

do not

  know how it improves type safety because the
MultiVersionedKeyQuery

class

  returns a ValueIterator of VersionedRecord any way. But if we
want

to be

  consistent with KeyValueIterator, we must apply the changes you

suggested.

  4.

  Regarding adding the new `get(key, fromTime, toTime)` method
to the
  public interface `VersionedKeyValueStore` or only adding it to
the
  class `RocksDBVersionedStore`: In the KIP, I changed the
interface

as

  Victoria suggested. But still, I am not convinced why we do that.

@Victoria:

  Do you please clarify why we have to define it in the interface?

More

  specifically, why do we need to use generic
VersionedKeyValueStore
  instead of simply using the implementing classes?

Cheers,
Alieh

On Sat, Oct 14, 2023 at 3:35 AM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Thanks Alieh for the KIP, as well as a nice summary of all the
discussions! Just my 2c regarding your open questions:

1. I just checked KIP-889
(


https://cwiki.apache.org/confluence/display/KAFKA/KIP-889%3A+Versioned+State+Stores

)
and we used "VersionedRecord get(K key, long asOfTimestamp);",
so I
feel to be consistent with this API is better compared with being
consistent with "WindowKeyQuery"?

3. I agree with Matthias that naming is always tricky, and I also
tend
to be consistent with what we already have (even if in retro it may
not be the best idea :P and if that was really becoming a complaint,
we would change i

Re: [DISCUSS] Should we continue to merge without a green build? No!

2023-11-14 Thread Matthias J. Sax

Thanks for starting this discussion David! I totally agree to "no"!

I think there is no excuse whatsoever for merging PRs with compilation 
errors (except an honest mistake for conflicting PRs that got merged 
interleaved). -- Every committer must(!) check the Jenkins status before 
merging to avoid such an issue.


Similar for actual permanently broken tests. If there is no green build, 
and the same test failed across multiple Jenkins runs, a committer 
should detect this and cannot merge a PR.


Given the current state of the CI pipeline, it seems possible to get 
green runs, and thus I support the policy (that we actually always had) 
to only merge if there is at least one green build. If committers got 
sloppy about this, we need to call it out and put a hold on this practice.


(The only exception from the above policy would be a very unstable 
status for which getting a green build is not possible at all, due to 
too many flaky tests -- for this case, I would accept to merge even 
there is no green build, but committer need to manually ensure that 
every test did pass in at least one test run. -- We had this in the 
past, but we I don't think we are in such a bad situation right now).


About disabling tests: I was never a fan of this, because in my 
experience those tests are not fixed any time soon. Especially, because 
we do not consider such tickets as release blockers. To me, seeing tests 
fails on PR build is actually a good forcing function for people to feel 
the pain, and thus get motivated to make time to fix the tests.


I have to admit that I was a little bit sloppy paying attention to flaky 
tests recently, and I highly appreciate this effort. Also thanks to 
everyone how actually filed a ticket! IMHO, we should file a ticket for 
every flaky test, and also keep adding comments each time we see a test 
fail to be able to track the frequency at which a tests fails, so we can 
fix the most pressing ones first.


Te me, the best forcing function to get test stabilized is to file 
tickets and consider them release blockers. Disabling tests does not 
really help much IMHO to tackle the problem (we can of course still 
disable them to get noise out of the system, but it would only introduce 
testing gaps for the time being and also does not help to figure out how 
often a test fails, so it's not a solution to the problem IMHO)



-Matthias

On 11/13/23 11:40 PM, Sagar wrote:

Hi Divij,

I think this proposal overall makes sense. My only nit sort of a suggestion
is that let's also consider a label called newbie++[1] for flaky tests if
we are considering adding newbie as a label. I think some of the flaky
tests need familiarity with the codebase or the test setup so as a first
time contributor, it might be difficult. newbie++ IMO covers that aspect.

[1]
https://issues.apache.org/jira/browse/KAFKA-15406?jql=project%20%3D%20KAFKA%20AND%20labels%20%3D%20%22newbie%2B%2B%22

Let me know what you think.

Thanks!
Sagar.

On Mon, Nov 13, 2023 at 9:11 PM Divij Vaidya 
wrote:


  Please, do it.

We can use specific labels to effectively filter those tickets.

We already have a label and a way to discover flaky tests. They are tagged
with the label "flaky-test" [1]. There is also a label "newbie" [2] meant
for folks who are new to Apache Kafka code base.
My suggestion is to send a broader email to the community (since many will
miss details in this thread) and call for action for committers to
volunteer as "shepherds" for these tickets. I can send one out once we have
some consensus wrt next steps in this thread.


[1]

https://issues.apache.org/jira/browse/KAFKA-13421?jql=project%20%3D%20KAFKA%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened%2C%20%22Patch%20Available%22)%20AND%20resolution%20%3D%20Unresolved%20AND%20labels%20%3D%20flaky-test%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC


[2] https://kafka.apache.org/contributing -> Finding a project to work on


Divij Vaidya



On Mon, Nov 13, 2023 at 4:24 PM Николай Ижиков 
wrote:




To kickstart this effort, we can publish a list of such tickets in the

community and assign one or more committers the role of a «shepherd" for
each ticket.

Please, do it.
We can use specific label to effectively filter those tickets.


13 нояб. 2023 г., в 15:16, Divij Vaidya 

написал(а):


Thanks for bringing this up David.

My primary concern revolves around the possibility that the currently
disabled tests may remain inactive indefinitely. We currently have
unresolved JIRA tickets for flaky tests that have been pending for an
extended period. I am inclined to support the idea of disabling these

tests

temporarily and merging changes only when the build is successful,

provided

there is a clear plan for re-enabling them in the future.

To address this issue, I propose the following measures:

1\ Foster a supportive environment for new contributors within the
community, encouraging them to take on tickets associated with flaky

tests.


[jira] [Updated] (KAFKA-15828) Protect clients from broker hostname reuse

2023-11-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15828:

Labels: needs-kip  (was: )

> Protect clients from broker hostname reuse
> --
>
> Key: KAFKA-15828
> URL: https://issues.apache.org/jira/browse/KAFKA-15828
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, producer 
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> In some environments such as k8s, brokers may be assigned to nodes 
> dynamically from an available pool. When a cluster is rolling, it is possible 
> for the client to see the same node advertised for different broker IDs in a 
> short period of time. For example, kafka-1 might be initially assigned to 
> node1. Before the client is able to establish a connection, it could be that 
> kafka-3 is now on node1 instead. Currently there is no protection in the 
> client or in the protocol for this scenario. If the connection succeeds, the 
> client will assume it has a good connection to kafka-1. Until something 
> disrupts the connection, it will continue under this assumption even if the 
> hostname for kafka-1 changes.
> We have observed this scenario in practice. The client connected to the wrong 
> broker through stale hostname information. It was unable to produce data 
> because of persistent NOT_LEADER errors. The only way to recover in the end 
> was by restarting the client to force a reconnection.
> We have discussed a couple potential solutions to this problem:
>  # Let the client be smarter managing the connection/hostname mapping. When 
> it detects that a hostname has changed, it should force a disconnect to 
> ensure it connects to the right node.
>  # We can modify the protocol to verify that the client has connected to the 
> intended broker. For example, we can add a field to ApiVersions to indicate 
> the intended broker ID. The broker receiving the request can return an error 
> if its ID does not match that in the request.
> Are there alternatives? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15828) Protect clients from broker hostname reuse

2023-11-14 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15828?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15828:

Component/s: clients
 consumer
 producer 

> Protect clients from broker hostname reuse
> --
>
> Key: KAFKA-15828
> URL: https://issues.apache.org/jira/browse/KAFKA-15828
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, producer 
>Reporter: Jason Gustafson
>Priority: Major
>
> In some environments such as k8s, brokers may be assigned to nodes 
> dynamically from an available pool. When a cluster is rolling, it is possible 
> for the client to see the same node advertised for different broker IDs in a 
> short period of time. For example, kafka-1 might be initially assigned to 
> node1. Before the client is able to establish a connection, it could be that 
> kafka-3 is now on node1 instead. Currently there is no protection in the 
> client or in the protocol for this scenario. If the connection succeeds, the 
> client will assume it has a good connection to kafka-1. Until something 
> disrupts the connection, it will continue under this assumption even if the 
> hostname for kafka-1 changes.
> We have observed this scenario in practice. The client connected to the wrong 
> broker through stale hostname information. It was unable to produce data 
> because of persistent NOT_LEADER errors. The only way to recover in the end 
> was by restarting the client to force a reconnection.
> We have discussed a couple potential solutions to this problem:
>  # Let the client be smarter managing the connection/hostname mapping. When 
> it detects that a hostname has changed, it should force a disconnect to 
> ensure it connects to the right node.
>  # We can modify the protocol to verify that the client has connected to the 
> intended broker. For example, we can add a field to ApiVersions to indicate 
> the intended broker ID. The broker receiving the request can return an error 
> if its ID does not match that in the request.
> Are there alternatives? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances

2023-11-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783897#comment-17783897
 ] 

Matthias J. Sax commented on KAFKA-14419:
-

[~Carlstedt] – did not look into too many details, but if your processing light 
weight as you describe, the only thing I can recommend is to reduce 
`max.poll.records` configs. – What I also noticed is, that you reduced 
`session.timeout.ms` to 6 seconds (and heartbeat interval to 1.5 sec) – not 
sure why you are doing this? It's very aggressive settings and the defaults (45 
sec session timeout) might work much better.

> Failed SyncGroup leading to partitions lost due to processing during 
> rebalances
> ---
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms]

Re: [VOTE] KIP-998: Give ProducerConfig(props, doLog) constructor protected access

2023-11-07 Thread Matthias J. Sax
On Fri, Nov 3, 2023, at 17:06, Matthias J. Sax wrote:

Sophie reads my mind well, but I also won't object if majority if people
thinks it's desirable to have it public (it's does not really hurt to
have them public).

I just personally think, we should optimize for "end users" and they
should not need it -- and thus, keeping the API surface area as small as
possible seems desirable (and don't generate JavaDocs for protected
methods...). Maybe it's less of an issue for clients, but given my
experience with Kafka Streams, and it large API, I prefer to guide users
by avoiding "leaky" abstractions.

-Matthias



On 11/3/23 4:34 PM, Chris Egerton wrote:

No objections, I'm +1 ether way.

On Fri, Nov 3, 2023, 18:50 Sophie Blee-Goldman 
wrote:


I am fine with making them public. Of course in that case we should

also

change the corresponding constructors in ConsumerConfig, AdminConfig,

and

StreamsConfig from protected to public as well, to be consistent. But
Matthias seems to feel that these should never be instantiated by a

user

and that the correct course of action would be to move in the opposite
direction.

I don't personally feel strongly either way -- honestly I had thought

it

was an abuse of internal APIs to extend the other Config classes in

order

to access the protected constructor and disable logging. So I would be
happy to officially pull it into the public API with all-public
constructors, because I do feel it is valid/useful to be able to
instantiate these objects. We do so in order to access config values

in a

way that accounts for any overrides on top of the default, for example

when

multiple overrides are in play (eg user overrides on top of framework
overrides on top of Kafka Streams overrides on top of
Consumer/Consumer/Admin client defaults). Using them is also (slightly)
more type-safe than going through a Properties or config Map<>

Any objections to expanding the KIP to the ConsumerConfig,

AdminConfig, and

StreamsConfig constructors and making them public as well? From

Matthias or

otherwise?

On Fri, Nov 3, 2023 at 11:09 AM Ismael Juma  wrote:


It seems wrong to require inheritance for this and we already have a

public

constructor. I would make both of them public.

Ismael

On Fri, Nov 3, 2023 at 10:47 AM Matthias J. Sax 

wrote:



+1 (binding)


About "why not public" question:

I think we need to distinguish between "end users" who create a

producer

instance, and "external parties" that might implement their own
`Producer` (or wrap/extend `KafkaProducer`).

In the end, I would not expect an "end user" to actually call `new
ProducerConfig` to begin with. If one creates a `KafkaProducer` they
pass the config via a `Map` or `Properties`, and the producer creates
`ProducerConfig` internally only. -- Thus, there is no need to make

it

`public`. (To this end, I don't actually understand why there is

public

`ProducerConfig` constructors to begin with -- sounds like a leaky
abstraction to me.)

On the other hand, if a "third party" implements `Producer` interface

to

ship their own producer implementation, they might want to create
`ProducerConfig` internally, so for them it's different, but still,

they

don't need public access because they can extend `ProducerConfig`,

too

for this case). -- To me, this falls into the category "simple thing
should be easy, and hard things should be possible).


-Matthias


On 11/3/23 6:06 AM, Ismael Juma wrote:

Hi Sophie,

I was trying to understand the goal of the change and it's not

totally

clear to me. If the goal is to allow third party applications to

customize

the logging behavior, why is the method protected instead of public?

Ismael

On Thu, Nov 2, 2023 at 9:55 PM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Hey all,

This is a trivial one-liner change that it was determined should go

through

a KIP during the PR review process (see this thread
<https://github.com/apache/kafka/pull/14681#discussion_r1378591228



for

context). Since the change itself was already reviewed and approved

I'm

skipping the discussion thread and bringing it to a vote right

away,

but of

course I'm open to feedback and can create a discussion thread if

there

is

need for it.

The change itself is simply adding the `protected` modifier to the
ProducerConfig constructor that allows for silencing the config

logging.

This just brings the ProducerConfig in alignment with the other

client

configs, all of which already had this constructor as protected.

KIP:









https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access

PR: https://github.com/apache/kafka/pull/14681

Thanks!
Sophie

















[jira] [Commented] (KAFKA-13627) Topology changes shouldn't require a full reset of local state

2023-11-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13627?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783894#comment-17783894
 ] 

Matthias J. Sax commented on KAFKA-13627:
-

As you pointed out, the KIP did not make progress for a long time. I don't see 
any reason why it cannot be resurrected though.

> Topology changes shouldn't require a full reset of local state
> --
>
> Key: KAFKA-13627
> URL: https://issues.apache.org/jira/browse/KAFKA-13627
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 3.1.0
>Reporter: Nicholas Telford
>Priority: Major
>
> [KIP-816|https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset]
> When changes are made to a Topology that modifies its structure, users must 
> use the Application Reset tool to reset the local state of their application 
> prior to deploying the change. Consequently, these changes require rebuilding 
> all local state stores from their changelog topics in Kafka.
> The time and cost of rebuilding state stores is determined by the size of the 
> state stores, and their recent write history, as rebuilding a store entails 
> replaying all recent writes to the store. For applications that have very 
> large stores, or stores with extremely high write-rates, the time and cost of 
> rebuilding all state in the application can be prohibitively expensive. This 
> is a significant barrier to building highly scalable applications with good 
> availability.
> Changes to the Topology that do not directly affect a state store should not 
> require the local state of that store to be reset/deleted. This would allow 
> applications to scale to very large data sets, whilst permitting the 
> application behaviour to evolve over time.
> h1. Background
> Tasks in a Kafka Streams Topology are logically grouped by “Topic Group'' 
> (aka. Subtopology). Topic Groups are assigned an ordinal (number), based on 
> their position in the Topology. This Topic Group ordinal is used as the 
> prefix for all Task IDs: {{{}_{}}}, 
> e.g. {{2_14}}
> If new Topic Groups are added, old Topic Groups are removed, or existing 
> Topic Groups are re-arranged, this can cause the assignment of ordinals to 
> change {_}even for Topic Groups that have not been modified{_}.
> When the assignment of ordinals to Topic Groups changes, existing Tasks are 
> invalidated, as they no longer correspond to the correct Topic Groups. Local 
> state is located in directories that include the Task ID (e.g. 
> {{{}/state/dir/2_14/mystore/rocksdb/…{}}}), and since the Tasks have all been 
> invalidated, all existing local state directories are also invalid.
> Attempting to start an application that has undergone these ordinal changes, 
> without first clearing the local state, will cause Kafka Streams to attempt 
> to use the existing local state for the wrong Tasks. Kafka Streams detects 
> this discrepancy and prevents the application from starting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15797) Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true]

2023-11-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15797:

Component/s: streams
 unit tests

> Flaky test EosV2UpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosV2[true] 
> --
>
> Key: KAFKA-15797
> URL: https://issues.apache.org/jira/browse/KAFKA-15797
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I found two recent failures:
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldUpgradeFromEosAlphaToEosV2_true_/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/EosV2UpgradeIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldUpgradeFromEosAlphaToEosV2_true__2/]
>  
> Output generally looks like:
> {code:java}
> java.lang.AssertionError: Did not receive all 138 records from topic 
> multiPartitionOutputTopic within 6 ms, currently accumulated data is 
> [KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 
> 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), 
> KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), 
> KeyValue(0, 91), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), 
> KeyValue(0, 91), KeyValue(0, 105), KeyValue(0, 120), KeyValue(0, 136), 
> KeyValue(0, 153), KeyValue(0, 171), KeyValue(0, 190), KeyValue(3, 0), 
> KeyValue(3, 1), KeyValue(3, 3), KeyValue(3, 6), KeyValue(3, 10), KeyValue(3, 
> 15), KeyValue(3, 21), KeyValue(3, 28), KeyValue(3, 36), KeyValue(3, 45), 
> KeyValue(3, 55), KeyValue(3, 66), KeyValue(3, 78), KeyValue(3, 91), 
> KeyValue(3, 105), KeyValue(3, 120), KeyValue(3, 136), KeyValue(3, 153), 
> KeyValue(3, 171), KeyValue(3, 190), KeyValue(3, 190), KeyValue(3, 210), 
> KeyValue(3, 231), KeyValue(3, 253), KeyValue(3, 276), KeyValue(3, 300), 
> KeyValue(3, 325), KeyValue(3, 351), KeyValue(3, 378), KeyValue(3, 406), 
> KeyValue(3, 435), KeyValue(1, 0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 
> 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), 
> KeyValue(1, 36), KeyValue(1, 45), KeyValue(1, 55), KeyValue(1, 66), 
> KeyValue(1, 78), KeyValue(1, 91), KeyValue(1, 105), KeyValue(1, 120), 
> KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), KeyValue(1, 190), 
> KeyValue(1, 120), KeyValue(1, 136), KeyValue(1, 153), KeyValue(1, 171), 
> KeyValue(1, 190), KeyValue(1, 210), KeyValue(1, 231), KeyValue(1, 253), 
> KeyValue(1, 276), KeyValue(1, 300), KeyValue(1, 325), KeyValue(1, 351), 
> KeyValue(1, 378), KeyValue(1, 406), KeyValue(1, 435), KeyValue(2, 0), 
> KeyValue(2, 1), KeyValue(2, 3), KeyValue(2, 6), KeyValue(2, 10), KeyValue(2, 
> 15), KeyValue(2, 21), KeyValue(2, 28), KeyValue(2, 36), KeyValue(2, 45), 
> KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), KeyValue(2, 91), 
> KeyValue(2, 105), KeyValue(2, 55), KeyValue(2, 66), KeyValue(2, 78), 
> KeyValue(2, 91), KeyValue(2, 105), KeyValue(2, 120), KeyValue(2, 136), 
> KeyValue(2, 153), KeyValue(2, 171), KeyValue(2, 190), KeyValue(2, 210), 
> KeyValue(2, 231), KeyValue(2, 253), KeyValue(2, 276), KeyValue(2, 300), 
> KeyValue(2, 325), KeyValue(2, 351), KeyValue(2, 378), KeyValue(2, 406), 
> KeyValue(0, 210), KeyValue(0, 231), KeyValue(0, 253), KeyValue(0, 276), 
> KeyValue(0, 300), KeyValue(0, 325), KeyValue(0, 351), KeyValue(0, 378), 
> KeyValue(0, 406), KeyValue(0, 435)] Expected: is a value equal to or greater 
> than <138> but: <134> was less than <138>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15798) Flaky Test NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()

2023-11-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15798?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15798:

Component/s: streams
 unit tests

> Flaky Test 
> NamedTopologyIntegrationTest.shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology()
> -
>
> Key: KAFKA-15798
> URL: https://issues.apache.org/jira/browse/KAFKA-15798
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I saw a few examples recently. 2 have the same error, but the third is 
> different
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14629/22/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology___2/]
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_21_and_Scala_2_13___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
>  
> The failure is like
> {code:java}
> java.lang.AssertionError: Did not receive all 5 records from topic 
> output-stream-1 within 6 ms, currently accumulated data is [] Expected: 
> is a value equal to or greater than <5> but: <0> was less than <5>{code}
> The other failure was
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2365/testReport/junit/org.apache.kafka.streams.integration/NamedTopologyIntegrationTest/Build___JDK_8_and_Scala_2_12___shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology__/]
> {code:java}
> java.lang.AssertionError: Expected: <[0, 1]> but: was <[0]>{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15792) Kafka Streams stuck partition fixed after restarting the process

2023-11-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15792:

Issue Type: Bug  (was: New Feature)

> Kafka Streams stuck partition fixed after restarting the process
> 
>
> Key: KAFKA-15792
> URL: https://issues.apache.org/jira/browse/KAFKA-15792
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.1.2
>Reporter: Patrick Pang
>Priority: Major
>
> Our Kafka Streams process often show slow in processing a particular 
> partition on a specific instance. No data skew is detected, i.e. partitions 
> are mostly uniformly distributed. Symptom is huge lag on a specific 
> partition. We do notice network out is higher on problematic process than 
> normal process, often at 3x
> After restarting the process, the lag drains within 5 minutes after startup. 
> This hints at internal processing issue of our streams application instead of 
> cluster or poison message. 
> Is there any metrics you suggest for us to look at, or is this a known issue? 
> Regularly bouncing the application doesn't look like a proper fix for 
> production systems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15792) Kafka Streams stuck partition fixed after restarting the process

2023-11-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783114#comment-17783114
 ] 

Matthias J. Sax edited comment on KAFKA-15792 at 11/6/23 5:04 PM:
--

Can it be related to 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390]. 
However we can't see any metrics that can prove this.


was (Author: JIRAUSER300456):
Can it be related to 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390?]
 However we can't see any metrics that can prove this.

> Kafka Streams stuck partition fixed after restarting the process
> 
>
> Key: KAFKA-15792
> URL: https://issues.apache.org/jira/browse/KAFKA-15792
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 3.1.2
>Reporter: Patrick Pang
>Priority: Major
>
> Our Kafka Streams process often show slow in processing a particular 
> partition on a specific instance. No data skew is detected, i.e. partitions 
> are mostly uniformly distributed. Symptom is huge lag on a specific 
> partition. We do notice network out is higher on problematic process than 
> normal process, often at 3x
> After restarting the process, the lag drains within 5 minutes after startup. 
> This hints at internal processing issue of our streams application instead of 
> cluster or poison message. 
> Is there any metrics you suggest for us to look at, or is this a known issue? 
> Regularly bouncing the application doesn't look like a proper fix for 
> production systems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15792) Kafka Streams stuck partition fixed after restarting the process

2023-11-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783114#comment-17783114
 ] 

Matthias J. Sax edited comment on KAFKA-15792 at 11/6/23 5:04 PM:
--

Can it be related to 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390?]
 However we can't see any metrics that can prove this.


was (Author: JIRAUSER300456):
Can it be related to 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390?]
 However we can't see any metrics that can prove this.

> Kafka Streams stuck partition fixed after restarting the process
> 
>
> Key: KAFKA-15792
> URL: https://issues.apache.org/jira/browse/KAFKA-15792
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 3.1.2
>Reporter: Patrick Pang
>Priority: Major
>
> Our Kafka Streams process often show slow in processing a particular 
> partition on a specific instance. No data skew is detected, i.e. partitions 
> are mostly uniformly distributed. Symptom is huge lag on a specific 
> partition. We do notice network out is higher on problematic process than 
> normal process, often at 3x
> After restarting the process, the lag drains within 5 minutes after startup. 
> This hints at internal processing issue of our streams application instead of 
> cluster or poison message. 
> Is there any metrics you suggest for us to look at, or is this a known issue? 
> Regularly bouncing the application doesn't look like a proper fix for 
> production systems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15792) Kafka Streams stuck partition fixed after restarting the process

2023-11-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17783114#comment-17783114
 ] 

Matthias J. Sax edited comment on KAFKA-15792 at 11/6/23 5:03 PM:
--

Can it be related to 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390?]
 However we can't see any metrics that can prove this.


was (Author: JIRAUSER300456):
Can it be related to 
[https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=186878390?] 
However we can't see any metrics that can prove this.

> Kafka Streams stuck partition fixed after restarting the process
> 
>
> Key: KAFKA-15792
> URL: https://issues.apache.org/jira/browse/KAFKA-15792
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 3.1.2
>Reporter: Patrick Pang
>Priority: Major
>
> Our Kafka Streams process often show slow in processing a particular 
> partition on a specific instance. No data skew is detected, i.e. partitions 
> are mostly uniformly distributed. Symptom is huge lag on a specific 
> partition. We do notice network out is higher on problematic process than 
> normal process, often at 3x
> After restarting the process, the lag drains within 5 minutes after startup. 
> This hints at internal processing issue of our streams application instead of 
> cluster or poison message. 
> Is there any metrics you suggest for us to look at, or is this a known issue? 
> Regularly bouncing the application doesn't look like a proper fix for 
> production systems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-969: Support range interactive queries for versioned state stores

2023-11-03 Thread Matthias J. Sax
ntioned. So I think we must have a `latest()` method for both 
KIP-968 and

    KIP-969. What do you think about that?


Cheers,
Alieh

On Thu, Oct 12, 2023 at 6:33 AM Matthias J. Sax  wrote:


Thanks for the update.



To retrieve

 the latest value(s), the user must call just the asOf method with

the MAX
 value (asOf(MAX)). The same applies to KIP-968. Do you think 
it is

clumsy,

 Matthias?



Well, in KIP-968 calling `asOf` and passing in a timestamp is optional,
and default is "latest", right? So while `asOf(MAX)` does the same
thing, practically users would never call `asOf` for a "latest" query?

In this KIP, we enforce that users give us a key range (we have the 4
static entry point methods to define a query for this), and we say we
default to "no bounds" for time range by default.

The existing `RangeQuery` allows to query a range of keys for existing
stores. It seems to be a common pattern to query a key-range on latest.
-- in the current proposal, users would need to do:

MultiVersionedRangeQuery.withKeyRange(startKey, endKey).asOf(MAX);

Would like to hear from others if we think that's good user experience?
If we agree to accept this, I think we should explain how to do this in
the JavaDocs (and also regular docs... --- otherwise, I can already
anticipate user question on all question-asking-channels how to do a
"normal key range query". IMHO, the problem is not that the code itself
it too clumsy, but that it's totally not obvious to uses how to express
it without actually explaining it to them. It basically violated the API
design rule "make it easy to use / simple things should be easy".

Btw: We could also re-use `RangeQuery` and add am implementation to
`VersionedStateStore` to just accept this query type, with "key range
over latest" semantics. -- The issue is of course, that uses need to
know that the query would return `ValueAndTimestamp` and not plain `V`
(or we add a translation step to unwrap the value, but we would lose the
"validFrom" timestamp -- validTo would be `null`). Because type safety
is a general issue in IQv2 it would not make it worse (in the strict
sense), but I am also not sure if we want to dig an even deeper hole...


-Matthias


On 10/10/23 11:55 AM, Alieh Saeedi wrote:

Thanks, Matthias and Bruno, for the feedback on KIP-969. Here is a

summary

of the updates I made to the KIP:

 1.  I liked the idea of renaming methods as Matthias suggested.
 2. I removed the allversions() method as I did in KIP-968. To

retrieve

 the latest value(s), the user must call just the asOf method with

the MAX

 value (asOf(MAX)). The same applies to KIP-968. Do you think it is

clumsy,

 Matthias?
 3. I added a method to the *VersionedKeyValueStore *interface, 
as I

did

 for KIP-968.
 4. Matthias: I do not get what you mean by your second comment. 
Isn't

 the KIP already explicit about that?

 > I assume, results are returned by timestamp for each key. The 
KIP

 should be explicit about it.


Cheers,
Alieh



On Tue, Oct 3, 2023 at 6:07 AM Matthias J. Sax  
wrote:



Thanks for updating the KIP.

Not sure if I agree or not with Bruno's idea to split the query types
further? In the end, we split them only because there is three 
different

return types: single value, value-iterator, key-value-iterator.

What do we gain by splitting out single-ts-range-key? In the end, for
range-ts-range-key the proposed class is necessary and is a superset
(one can set both timestamps to the same value, for single-ts lookup).

The mentioned simplification might apply to "single-ts-range-key" 
but I
don't see a simplification for the proposed (and necessary) query 
type?


On the other hand, I see an advantage of a single-ts-range-key for
querying over the "latest version" with a range of keys. For a
single-ts-range-key query, this it would be the default (similar to
VersionedKeyQuery with not asOf-timestamped defined).

In the current version of the KIP, (if we agree that default should
actually return "all versions" not "latest" -- this default was
suggested by Bruno on KIP-968 and makes sense to me, so we would 
need to
have the same default here to stay consistent), users would need to 
pass

in `from(Long.MAX).to(Long.MAX)` (if I got this right) to query the
latest point in time only, what seems to be clumsy? Or we could add a
`lastestKeyOnly` option to `MultiVersionedRangeQuery`, but it does 
seems

a little clumsy, too.




The overall order of the returned records is by Key


I assume, results are returned by timestamp for each key. The KIP 
should

be explicit about it.



To be very explicit, should we rename the methods to specify the key

bound?


    - withRange -> withKeyRange
    - withLowerBound -> withLowerKeyBound
    - withUpperBound -> withUpperKeyBound
    - withNoBounds -> allKeys (or withNoKeyBounds, but

Re: [VOTE] KIP-992 Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-11-03 Thread Matthias J. Sax

Thanks for the KIP.

+1 (binding)


-Matthias

On 11/3/23 6:08 AM, Lucas Brutschy wrote:

Hi Hanyu,

Thanks for the KIP!
+1 (binding)

Cheers
Lucas

On Thu, Nov 2, 2023 at 10:19 PM Hao Li  wrote:


Hi Hanyu,

Thanks for the KIP!
+1 (non-binding)

Hao

On Thu, Nov 2, 2023 at 1:29 PM Bill Bejeck  wrote:


Hi Hanyu,

Thanks for the KIP this LGTM.
+1 (binding)

Thanks,
Bill



On Wed, Nov 1, 2023 at 1:07 PM Hanyu (Peter) Zheng
 wrote:


Hello everyone,

I would like to start a vote for KIP-992: Proposal to introduce IQv2

Query

Types: TimestampedKeyQuery and TimestampedRangeQuery.

Sincerely,
Hanyu

On Wed, Nov 1, 2023 at 10:00 AM Hanyu (Peter) Zheng 





https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<



https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:
Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<



https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic






--

[image: Confluent] 
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<


https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:

Twitter] [image: LinkedIn]
[image: Slack]
[image: YouTube]


[image: Try Confluent Cloud for Free]
<


https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic








Re: [VOTE] KIP-998: Give ProducerConfig(props, doLog) constructor protected access

2023-11-03 Thread Matthias J. Sax
Sophie reads my mind well, but I also won't object if majority if people 
thinks it's desirable to have it public (it's does not really hurt to 
have them public).


I just personally think, we should optimize for "end users" and they 
should not need it -- and thus, keeping the API surface area as small as 
possible seems desirable (and don't generate JavaDocs for protected 
methods...). Maybe it's less of an issue for clients, but given my 
experience with Kafka Streams, and it large API, I prefer to guide users 
by avoiding "leaky" abstractions.


-Matthias



On 11/3/23 4:34 PM, Chris Egerton wrote:

No objections, I'm +1 ether way.

On Fri, Nov 3, 2023, 18:50 Sophie Blee-Goldman 
wrote:


I am fine with making them public. Of course in that case we should also
change the corresponding constructors in ConsumerConfig, AdminConfig, and
StreamsConfig from protected to public as well, to be consistent. But
Matthias seems to feel that these should never be instantiated by a user
and that the correct course of action would be to move in the opposite
direction.

I don't personally feel strongly either way -- honestly I had thought it
was an abuse of internal APIs to extend the other Config classes in order
to access the protected constructor and disable logging. So I would be
happy to officially pull it into the public API with all-public
constructors, because I do feel it is valid/useful to be able to
instantiate these objects. We do so in order to access config values in a
way that accounts for any overrides on top of the default, for example when
multiple overrides are in play (eg user overrides on top of framework
overrides on top of Kafka Streams overrides on top of
Consumer/Consumer/Admin client defaults). Using them is also (slightly)
more type-safe than going through a Properties or config Map<>

Any objections to expanding the KIP to the ConsumerConfig, AdminConfig, and
StreamsConfig constructors and making them public as well? From Matthias or
otherwise?

On Fri, Nov 3, 2023 at 11:09 AM Ismael Juma  wrote:


It seems wrong to require inheritance for this and we already have a

public

constructor. I would make both of them public.

Ismael

On Fri, Nov 3, 2023 at 10:47 AM Matthias J. Sax 

wrote:



+1 (binding)


About "why not public" question:

I think we need to distinguish between "end users" who create a

producer

instance, and "external parties" that might implement their own
`Producer` (or wrap/extend `KafkaProducer`).

In the end, I would not expect an "end user" to actually call `new
ProducerConfig` to begin with. If one creates a `KafkaProducer` they
pass the config via a `Map` or `Properties`, and the producer creates
`ProducerConfig` internally only. -- Thus, there is no need to make it
`public`. (To this end, I don't actually understand why there is public
`ProducerConfig` constructors to begin with -- sounds like a leaky
abstraction to me.)

On the other hand, if a "third party" implements `Producer` interface

to

ship their own producer implementation, they might want to create
`ProducerConfig` internally, so for them it's different, but still,

they

don't need public access because they can extend `ProducerConfig`, too
for this case). -- To me, this falls into the category "simple thing
should be easy, and hard things should be possible).


-Matthias


On 11/3/23 6:06 AM, Ismael Juma wrote:

Hi Sophie,

I was trying to understand the goal of the change and it's not

totally

clear to me. If the goal is to allow third party applications to

customize

the logging behavior, why is the method protected instead of public?

Ismael

On Thu, Nov 2, 2023 at 9:55 PM Sophie Blee-Goldman <

sop...@responsive.dev>

wrote:


Hey all,

This is a trivial one-liner change that it was determined should go

through

a KIP during the PR review process (see this thread
<https://github.com/apache/kafka/pull/14681#discussion_r1378591228>

for

context). Since the change itself was already reviewed and approved

I'm

skipping the discussion thread and bringing it to a vote right away,

but of

course I'm open to feedback and can create a discussion thread if

there

is

need for it.

The change itself is simply adding the `protected` modifier to the
ProducerConfig constructor that allows for silencing the config

logging.

This just brings the ProducerConfig in alignment with the other

client

configs, all of which already had this constructor as protected.

KIP:







https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access

PR: https://github.com/apache/kafka/pull/14681

Thanks!
Sophie













Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-11-03 Thread Matthias J. Sax

Thanks. Will take a look into the PR.

I don't have any objection to the goal; contrary! It's very annoying 
what we have right now, and if we can improve it, I am totally in favor 
of it.



-Matthias

On 11/3/23 8:47 AM, Almog Gavra wrote:

Good question :) I have a PR for it already here:
https://github.com/apache/kafka/pull/14659. The concept is to wrap the
suppliers with an interface that allows for delayed creation of the
StoreBuilder instead of creating the StoreBuilder from the suppliers right
away. Happy to get on a quick call to outline the implementation strategy
if you'd like, but hopefully you have no objections to the goal!

On Thu, Nov 2, 2023 at 8:44 PM Matthias J. Sax  wrote:


Almog,

can you explain how you intent to implement this change? It's not clear
to me, how we could do this?

When we call `StreasmBuilder.build()` it will give us a already fully
wired `Topology`, including all store suppliers needed. I don't see a
clean way how we could change the store supplier after the fact?


-Matthias

On 11/2/23 5:11 PM, Almog Gavra wrote:

Hello everyone - I updated the KIP to also include the following
modification:

Both the new dsl.store.suppliers.class  and the old default.dsl.store

will

now respect the configurations when passed in via

KafkaStreams#new(Topology,

StreamsConfig)  (and other related constructors) instead of only being
respected when passed in to the initial StoreBuilder#new(TopologyConfig)
(though it will be respected if passed in via the original path as well).

I was honestly a bit shocked this wasn't the case with the original KIP
that introduced default.dsl.store!

On Fri, Jul 28, 2023 at 4:55 PM Almog Gavra 

wrote:



OK! I think I got everything, but I'll give the KIP another read with
fresh eyes later. Just a reminder that the voting is open, so go out and
exercise your civic duty! ;)

- Almog

On Fri, Jul 28, 2023 at 10:38 AM Almog Gavra 
wrote:


Thanks Guozhang & Sophie:

A2. Will clarify in the KIP
A3. Will change back to the deprecated version!
A5. Seems like I'm outnumbered... DslStoreSuppliers it is.

Will update the KIP today.

- Almog

On Thu, Jul 27, 2023 at 12:42 PM Guozhang Wang <
guozhang.wang...@gmail.com> wrote:


Yes, that sounds right to me. Thanks Sophie.

On Thu, Jul 27, 2023 at 12:35 PM Sophie Blee-Goldman
 wrote:


A2: Guozhang, just to close the book on the ListValue store thing, I

fully

agree it seems like overreach
to expose/force this on users, especially if it's fully internal

today. But

just to make sure we're on the same
page here, you're still ok with this KIP fixing the API gap that

exists

today, in which these stores cannot be
customized by the user at all?

In other words, after this KIP, the new behavior for the ListValue

store in

a stream join will be:

S1: First, check if the user passed in a `DSLStoreSuppliers` (or

whatever

the name will be) to the
 StreamJoined config object, and use that to obtain the
KVStoreSupplier for this ListValue store

S2: If none was provided, check if the user has set a default
DSLStoreSuppliers via the new config,
 and use that to get the KVStoreSupplier if so

S3: If neither is set, fall back to the original logic as it is

today,

which is to pass in a KVStoreSupplier
 that is hard-coded to be either RocksDB or InMemory, based on

what

is returned for the #persistent
 API by the StreamJoined's WindowStoreSupplier

Does that sound right? We can clarify this further in the KIP if need

be


On Thu, Jul 27, 2023 at 10:48 AM Guozhang Wang <

guozhang.wang...@gmail.com>

wrote:


Hi all,

Like Almog's secretary as well! Just following up on that index:

A2: I'm also happy without introducing versioned KV in this KIP as I
would envision it to be introduced as new params into the
KeyValuePluginParams in the future. And just to clarify on Sophie's
previous comment, I think ListStore should not be exposed in this

API

until we see it as a common usage and hence would want to (again, we
need to think very carefully since it would potentially ask all
implementers to adopt) move it from the internal category to the
public interface category. As for now, I think only having kv /

window

/ session as public store types is fine.

A3: Seems I was not making myself very clear at the beginning :P The
major thing that I'd actually like to avoid having two configs
co-exist for the same function since it will be a confusing learning
curve for users, and hence what I was proposing is to just have the
newly introduced interface but not introducing a new config, and I
realized now that it is actually more aligned with the CUSTOM idea
where the ordering would be looking at config first, and then the
interface. I blushed as I read Almog likes it.. After thinking about
it twice, I'm now a bit leaning towards just deprecating the old
config with the new API+config as well.

A5: Among the names we have been discussed so far:

DslStorePlugin
StoreTypeSp

Re: [VOTE] KIP-998: Give ProducerConfig(props, doLog) constructor protected access

2023-11-03 Thread Matthias J. Sax

+1 (binding)


About "why not public" question:

I think we need to distinguish between "end users" who create a producer 
instance, and "external parties" that might implement their own 
`Producer` (or wrap/extend `KafkaProducer`).


In the end, I would not expect an "end user" to actually call `new 
ProducerConfig` to begin with. If one creates a `KafkaProducer` they 
pass the config via a `Map` or `Properties`, and the producer creates 
`ProducerConfig` internally only. -- Thus, there is no need to make it 
`public`. (To this end, I don't actually understand why there is public 
`ProducerConfig` constructors to begin with -- sounds like a leaky 
abstraction to me.)


On the other hand, if a "third party" implements `Producer` interface to 
ship their own producer implementation, they might want to create 
`ProducerConfig` internally, so for them it's different, but still, they 
don't need public access because they can extend `ProducerConfig`, too 
for this case). -- To me, this falls into the category "simple thing 
should be easy, and hard things should be possible).



-Matthias


On 11/3/23 6:06 AM, Ismael Juma wrote:

Hi Sophie,

I was trying to understand the goal of the change and it's not totally
clear to me. If the goal is to allow third party applications to customize
the logging behavior, why is the method protected instead of public?

Ismael

On Thu, Nov 2, 2023 at 9:55 PM Sophie Blee-Goldman 
wrote:


Hey all,

This is a trivial one-liner change that it was determined should go through
a KIP during the PR review process (see this thread
 for
context). Since the change itself was already reviewed and approved I'm
skipping the discussion thread and bringing it to a vote right away, but of
course I'm open to feedback and can create a discussion thread if there is
need for it.

The change itself is simply adding the `protected` modifier to the
ProducerConfig constructor that allows for silencing the config logging.
This just brings the ProducerConfig in alignment with the other client
configs, all of which already had this constructor as protected.

KIP:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-998%3A+Give+ProducerConfig%28props%2C+doLog%29+constructor+protected+access
PR: https://github.com/apache/kafka/pull/14681

Thanks!
Sophie





Re: [DISCUSS] Apache Kafka 3.5.2 release

2023-11-03 Thread Matthias J. Sax

Hey,

Sorry for late reply. We finished our testing, and think we are go.

Thanks for giving us the opportunity to get the RocksDB version bump in. 
Let's ship it!



-Matthias

On 11/2/23 4:37 PM, Luke Chen wrote:

Hi Matthias,

Is there any update about the test status for RocksDB versions bumps?
Could I create a 3.5.2 RC build next week?

Thanks.
Luke

On Sat, Oct 21, 2023 at 1:01 PM Luke Chen  wrote:


Hi Matthias,

I agree it's indeed a blocker for 3.5.2 to address CVE in RocksDB.
Please let me know when the test is completed.

Thank you.
Luke

On Sat, Oct 21, 2023 at 2:12 AM Matthias J. Sax  wrote:


Thanks for the info Luke.

We did backport all but one PR in the mean time. The missing PR is a
RocksDB version bump. We want to consider it for 3.5.2, because it
addresses a CVE.

Cf https://github.com/apache/kafka/pull/14216

However, RocksDB versions bumps are a little bit more tricky, and we
would like to test this properly on 3.5 branch, what would take at least
one week; we could do the cherry-pick on Monday and start testing.

Please let us know if such a delay for 3.5.2 is acceptable or not.

Thanks.

-Matthias


On 10/20/23 5:44 AM, Luke Chen wrote:

Hi Ryan,

OK, I've backported it to 3.5 branch.
I'll be included in v3.5.2.

Thanks.
Luke

On Fri, Oct 20, 2023 at 7:43 AM Ryan Leslie (BLP/ NEW YORK (REMOT) <
rles...@bloomberg.net> wrote:


Hi Luke,

Hope you are well. Can you please include
https://issues.apache.org/jira/browse/KAFKA-15106 in 3.5.2?

Thanks,

Ryan

From: dev@kafka.apache.org At: 10/17/23 05:05:24 UTC-4:00
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] Apache Kafka 3.5.2 release

Thanks Luke for volunteering for 3.5.2 release.

On Tue, 17 Oct 2023 at 11:58, Josep Prat 
wrote:


Hi Luke,

Thanks for taking this one!

Best,

On Tue, Oct 17, 2023 at 8:12 AM Luke Chen  wrote:


Hi all,

I'd like to volunteer as release manager for the Apache Kafka 3.5.2,

to

have an important bug/vulnerability fix release for 3.5.1.

If there are no objections, I'll start building a release plan in

thewiki

in the next couple of weeks.

Thanks,
Luke




--
[image: Aiven] <https://www.aiven.io>

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io | +491715557497
aiven.io <https://www.aiven.io> | <

https://www.facebook.com/aivencloud>

<https://www.linkedin.com/company/aiven/> <

https://twitter.com/aiven_io>

*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B














Re: [DISCUSS] KIP-954: expand default DSL store configuration to custom types

2023-11-02 Thread Matthias J. Sax
l

impact API instantiations. In the ideal world, I would

consider:


* We have (timestamped) kv store, versioned kv store,

window

store,

session store as first-class DSL store types. Some DSL

operators

could

accept multiple store types (e.g. versioned and non

versioned

kv-store) for semantics / efficiency trade-offs. But I

think

we

would

remove un-timestamped kv stores eventually since that

efficiency

trade-off is so minimal compared to its usage

limitations.

* As for list-value store (for stream-stream Join),

memory-lru-cache

(for PAPI use only), memory-time-ordered-buffer (for

suppression),

they would not be exposed as DSL first-class store

types in

the

future. Instead, they would be treated as internal used

stores

(e.g.

list-value store is built on key-value store with

specialized

serde

and putInternal), or continue to be just convenient

OOTB PAPI

used

stores only.
* As we move on, we will continue to be very, very

strict on

what

would be added as DSL store types (and hence requires

changes

to

the

proposed APIs), what to be added as convenient OOTB

PAPI store

impls

only, what to be added as internal used store types that

should

not be

exposed to users nor customizable at all.

3. Some more detailed thoughts below:

3.a) I originally also think that we can extend the

existing

config,

rather than replacing it. The difference was that I was

thinking

that

order-wise, the runtime would look at the API first,

and then

the

config, whereas in your rejected alternative it was

looking at

the

config first, and then the API --- that I think is a

minor

thing

and

either is fine. I'm in agreement that having two configs

would be

more

confusing to users to learn about their precedence

rather than

helpful, but if we keep both a config and a public API,

then

the

precedence ordering would not be so confusing as long

as we

state

them

clearly. For example:

* We have DefaultStoreTypeSpec OOTB, in that impl we

look at

the

config only, and would only expect either ROCKS or

MEMORY, and

return

corresponding OOTB store impls; if any other values

configured,

we

error out.
* Users extend that by having MyStoreTypeSpec, in which

they

could

do

arbituray things without respecting the config at all,

but our

recommended pattern in docs would still say "look into

the

config,

if

it is ROCKS or MEMORY just return fall back to

DefaultStoreTypeSepc;

otherwise if it's some String you recognize, then

return your

customized store based on the string value, otherwise

error

out".


3.b) About the struct-like Params classes, I like the

idea a

lot

and

wished we would pursue this in the first place, but if

we

only do

this

in Spec it would leave some inconsistencies with the

StoreBuilders

though arguably the latter is only for PAPI. I'm

wondering if

we

should consider including the changes in StoreBuilders

(e.g.

WindowStoreBuilder(WindowSupplierParams)), and if yes,

maybe

we

should

also consider renaming that e.g. `WindowSupplierParams`

to

`WindowStoreSpecParams` too? For this one I only have a

"weak

feeling"

so I can be convinced otherwise :P

Thanks,
Guozhang



On Sun, Jul 23, 2023 at 9:52 AM Matthias J. Sax <

mj...@apache.org>

wrote:


Thanks for all the input. My intention was not to

block the

KIP,

but

just to take a step back and try get a holistic

picture and

discussion,

to explore if there are good/viable alternative

designs. As

said

originally, I really like to close this gap, and was

always

aware

that

the current config is not flexible enough.


I guess, my "concern" is that the KIP does increase

the API

surface

area

significantly, as we need all kind of `StoreTypeSpec`

implementations,

and it might also imply that we need follow up KIPs

for new

feature

(like in-memory versioned store) that might not need a

KIP

otherwise.


The second question is if it might make the already

"patchy"

situation

with regard to customization worse.

We did de-scope the original KIP-591 for this reason,

and

given

the

new

situation of the DSL, it seems that it actually got

worse

compared

to

back in the days.

Lastly, I hope to make the new versioned stores the

default

in

the

DSL

and we did not do it in the previous KIP due to

backward

compatibility

issues. Thus, from a DSL point of view, I believe there

should

be

only

"RocksDB", "InMemory", and "Custom" in an ideal world.

Introducing

(I

am

exaggerating to highlight my point) "KvRocksDbSpec",
"TimestampeKvRocksDbSpec", "VersionedRocksDbSpec",

plus the

corresponding in-memory specs seems to head into the

opposite

direction.

-- My goal is to give users a handle of the _physical_

store

(RocksDB

vs

InMemory vs Custom) but not the _logical_ stores

(plain kv,

ts-kv,

versioned) which is "dictated" by the DSL itself 

[jira] [Resolved] (KAFKA-15669) Implement telemetry naming strategy

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15669.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement telemetry naming strategy
> ---
>
> Key: KAFKA-15669
> URL: https://issues.apache.org/jira/browse/KAFKA-15669
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> Define classes and implement telemetry metrics naming strategy for the 
> KIP-714 as defined here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat]
>  
> The naming strategy must also support delta temporality metrics with a suffix 
> in original metric name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15669) Implement telemetry naming strategy

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15669?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15669.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Implement telemetry naming strategy
> ---
>
> Key: KAFKA-15669
> URL: https://issues.apache.org/jira/browse/KAFKA-15669
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> Define classes and implement telemetry metrics naming strategy for the 
> KIP-714 as defined here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Metricsnamingandformat]
>  
> The naming strategy must also support delta temporality metrics with a suffix 
> in original metric name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15668) Add Opentelemetry Proto library with shadowed classes

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15668.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Add Opentelemetry Proto library with shadowed classes
> -
>
> Key: KAFKA-15668
> URL: https://issues.apache.org/jira/browse/KAFKA-15668
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The KIP-714 requires addition of [Java client 
> dependency|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Javaclientdependencies]
>  of {{{}opentelemetry-proto{}}}, also brings transitive dependency of 
> {{protobuf-java.}} The dependencies should be shadowed to avoid JVM 
> versioning conflicts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15668) Add Opentelemetry Proto library with shadowed classes

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15668?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15668.
-
Fix Version/s: 3.7.0
   Resolution: Fixed

> Add Opentelemetry Proto library with shadowed classes
> -
>
> Key: KAFKA-15668
> URL: https://issues.apache.org/jira/browse/KAFKA-15668
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.7.0
>
>
> The KIP-714 requires addition of [Java client 
> dependency|https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Javaclientdependencies]
>  of {{{}opentelemetry-proto{}}}, also brings transitive dependency of 
> {{protobuf-java.}} The dependencies should be shadowed to avoid JVM 
> versioning conflicts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15765) Remove task level metric "commit-latency"

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15765:
---

Assignee: Bruno Cadonna

> Remove task level metric "commit-latency"
> -
>
> Key: KAFKA-15765
> URL: https://issues.apache.org/jira/browse/KAFKA-15765
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Blocker
> Fix For: 4.0.0
>
>
> We stopped tracking this metric with KIP-447, but kept it for backward 
> compatibility reasons. It's time to remove it completely with 4.0 release.
> Cf 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]
> And [https://github.com/apache/kafka/pull/8218/files#r390712211]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14419) Failed SyncGroup leading to partitions lost due to processing during rebalances

2023-11-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781946#comment-17781946
 ] 

Matthias J. Sax commented on KAFKA-14419:
-

[~kirktrue] [~pnee] – will the new consumer threading help with this? Maybe 
it's not something we should try to fix at the Streams layer to begin with?

> Failed SyncGroup leading to partitions lost due to processing during 
> rebalances
> ---
>
> Key: KAFKA-14419
> URL: https://issues.apache.org/jira/browse/KAFKA-14419
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.1
> Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
>Reporter: Mikael
>Priority: Major
>
> Trigger scenario:
> Four Kafka client application instances on separate EC2 instances with a 
> total of 8 active and 8 standby stream tasks for the same stream topology, 
> consuming from an input topic with 8 partitions. Sometimes a handful of 
> messages are consumed twice by one of the stream tasks when stream tasks on 
> another application instance join the consumer group after an application 
> instance restart.
> Additional information:
> Messages are produced to the topic by another Kafka streams topology deployed 
> on the same four application instances. I have verified that each message is 
> only produced once by enabling debug logging in the topology flow right 
> before producing each message to the topic.
> Logs from stream thread with duplicate consumption:
>  
> {code:java}
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
> already rebalancing
> 2022-11-21 15:09:33,677 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
> Input records consumed for the first time
> 2022-11-21 15:09:33,919 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Successfully joined group with 
> generation Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,920 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began 
> another rebalance. Need to re-join the group. Sent generation was 
> Generation{generationId=8017, 
> memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
>  protocol='stream'}
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,922 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
> clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
>  groupId=messages.xms.mt.enqueue.sms] Request joining group due to: 
> encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response
> 2022-11-21 15:09:33,923 INFO 
> [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
>  o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer 
>

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-11-01 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781935#comment-17781935
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

[~luke.kirby] – it seems unlikely that there will be a 3.4.2 bug-fix release – 
I am actually surprised that we do a 3.5.2... Historically, we focus on bug-fix 
releases for the current release minor/major version (ie, currently 3.6) only.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>    Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* log

[jira] [Updated] (KAFKA-15770) org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy is flaky

2023-11-01 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15770:

Component/s: unit tests

> org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
>  is flaky 
> ---
>
> Key: KAFKA-15770
> URL: https://issues.apache.org/jira/browse/KAFKA-15770
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Alok Thatikunta
>Priority: Major
>
> Test fails on CI, passes locally
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldHaveSamePositionBoundActiveAndStandBy/]
> {code:java}
> java.lang.AssertionError: 
> Result:SucceededQueryResult{result=<0,1698511250443>, executionInfo=[], 
> position=Position{position={input-topic={0=50
> Expected: is 
>  but: was  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: ACCESS to Apache Pony Mail

2023-10-31 Thread Matthias J. Sax

Only committers can login using their ASF account.

-Matthias

On 10/30/23 10:19 PM, Arpit Goyal wrote:

Hi
Can anyone help me provide access to Apache Pony Mail. I tried login using
the jira credential but it didn't work.
Thanks and Regards
Arpit Goyal
8861094754



[jira] [Updated] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15768:

Description: 
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the any result is a `FailedQueryResult` (and even 
if there is only a single FailedQueryResult).

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)

  was:
Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the only result is a `FailedQueryResult`.

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)


> StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult
> --
>
> Key: KAFKA-15768
> URL: https://issues.apache.org/jira/browse/KAFKA-15768
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>    Reporter: Matthias J. Sax
>Priority: Major
>
> Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
> `IllegalArgumentException` if the any result is a `FailedQueryResult` (and 
> even if there is only a single FailedQueryResult).
> The issue is the internal `filter(r -> r.getResult() != 0)` step, that 
> blindly (and incorrectly) calls `getResult`.
> Given the semantics of `getOnlyPartitionResult` we should not care if the 
> result is SuccessQueryResult or FailedQueryResult, but only check if there is 
> a single result or not. (The user has not means to avoid getting an exception 
> otherwise.)
> Side-note: why does `FailedQueryResult#getResult` throw an 
> IllegalArgumentException (there is no argument passed into the method – it 
> should rather be an `IllegalStateException` – but I guess we would need a KIP 
> for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15768:
---

 Summary: StateQueryResult#getOnlyPartitionResult should not throw 
for FailedQueryResult
 Key: KAFKA-15768
 URL: https://issues.apache.org/jira/browse/KAFKA-15768
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the only result is a `FailedQueryResult`.

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15768) StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult

2023-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15768:
---

 Summary: StateQueryResult#getOnlyPartitionResult should not throw 
for FailedQueryResult
 Key: KAFKA-15768
 URL: https://issues.apache.org/jira/browse/KAFKA-15768
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


Calling `StateQueryResult#getOnlyPartitionResult` crashes with an incorrect 
`IllegalArgumentException` if the only result is a `FailedQueryResult`.

The issue is the internal `filter(r -> r.getResult() != 0)` step, that blindly 
(and incorrectly) calls `getResult`.

Given the semantics of `getOnlyPartitionResult` we should not care if the 
result is SuccessQueryResult or FailedQueryResult, but only check if there is a 
single result or not. (The user has not means to avoid getting an exception 
otherwise.)

Side-note: why does `FailedQueryResult#getResult` throw an 
IllegalArgumentException (there is no argument passed into the method – it 
should rather be an `IllegalStateException` – but I guess we would need a KIP 
for this fix?)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15765) Remove task level metric "commit-latency"

2023-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15765:
---

 Summary: Remove task level metric "commit-latency"
 Key: KAFKA-15765
 URL: https://issues.apache.org/jira/browse/KAFKA-15765
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


We stopped tracking this metric with KIP-447, but kept it for backward 
compatibility reasons. It's time to remove it completely with 4.0 release.

Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]

And [https://github.com/apache/kafka/pull/8218/files#r390712211]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15765) Remove task level metric "commit-latency"

2023-10-31 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-15765:
---

 Summary: Remove task level metric "commit-latency"
 Key: KAFKA-15765
 URL: https://issues.apache.org/jira/browse/KAFKA-15765
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: Matthias J. Sax
 Fix For: 4.0.0


We stopped tracking this metric with KIP-447, but kept it for backward 
compatibility reasons. It's time to remove it completely with 4.0 release.

Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics]

And [https://github.com/apache/kafka/pull/8218/files#r390712211]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15672.
-
Resolution: Duplicate

> Add 3.6 to streams system tests
> ---
>
> Key: KAFKA-15672
> URL: https://issues.apache.org/jira/browse/KAFKA-15672
> Project: Kafka
>  Issue Type: Test
>  Components: streams, system tests
>            Reporter: Matthias J. Sax
>        Assignee: Matthias J. Sax
>Priority: Critical
>
> 3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
> particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15672.
-
Resolution: Duplicate

> Add 3.6 to streams system tests
> ---
>
> Key: KAFKA-15672
> URL: https://issues.apache.org/jira/browse/KAFKA-15672
> Project: Kafka
>  Issue Type: Test
>  Components: streams, system tests
>            Reporter: Matthias J. Sax
>        Assignee: Matthias J. Sax
>Priority: Critical
>
> 3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
> particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15672) Add 3.6 to streams system tests

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15672:
---

Assignee: Matthias J. Sax

> Add 3.6 to streams system tests
> ---
>
> Key: KAFKA-15672
> URL: https://issues.apache.org/jira/browse/KAFKA-15672
> Project: Kafka
>  Issue Type: Test
>  Components: streams, system tests
>            Reporter: Matthias J. Sax
>        Assignee: Matthias J. Sax
>Priority: Critical
>
> 3.6.0 was released recently. We need to add `3.6.0` to the system tests (in 
> particular upgrade and broker compatibility tests)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15594) Add 3.6.0 to streams upgrade/compatibility tests

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15594:
---

Assignee: Matthias J. Sax

> Add 3.6.0 to streams upgrade/compatibility tests
> 
>
> Key: KAFKA-15594
> URL: https://issues.apache.org/jira/browse/KAFKA-15594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams, system tests
>Reporter: Satish Duggana
>        Assignee: Matthias J. Sax
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15595) Session window aggregate drops records headers

2023-10-31 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-15595:
---

Assignee: Hao Li

> Session window aggregate drops records headers
> --
>
> Key: KAFKA-15595
> URL: https://issues.apache.org/jira/browse/KAFKA-15595
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.5.1
>Reporter: Abdullah alkhawatrah
>Assignee: Hao Li
>Priority: Major
>
> Hey,
> While upgrading to 3.5.1 from 3.2.X I noticed a change in SessionWindow 
> aggregate behaviour, it seems now that custom headers added before the 
> aggregate are dropped.
> I could reproduce the behaviour with the following test topology:
> {code:java}
> // code placeholder
> final StreamsBuilder builder = new StreamsBuilder();
> builder.stream(inputTopic, Consumed.with(EARLIEST))
> .process(() -> new Processor() {
> private ProcessorContext context;
> @Override
> public void init(final ProcessorContext context) {
> this.context = context;
> }
> @Override
> public void process(Record record) {
> record.headers().add("key1", 
> record.value().toString().getBytes());
> context.forward(record);
> }
> })
> .groupByKey()
> 
> .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofDays(1L), 
> Duration.ofDays(1L)))
> .aggregate(() -> 1,
> (key, value, aggregate) -> aggregate,
> (aggKey, aggOne, aggTwo) -> aggTwo)
> .toStream()
> .map((key, value) -> new KeyValue<>(key.key(), value))
> .to(outputTopic); {code}
> Checking evens in the `outputTopic` show that the headers are empty. With 
> 3.2.* the same topology would have propagated the headers.
>  
> I can see here: 
> [https://github.com/apache/kafka/blob/2c6fb6c54472e90ae17439e62540ef3cb0426fe3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L205]
>  that now a new record is created ignoring the headers, while in 3.2.2, the 
> same record was forwarded after changing the key and value while keeping the 
> headers: 
> [https://github.com/apache/kafka/blob/3.2.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java#L196]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15602.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
 Assignee: Matthias J. Sax
   Resolution: Fixed

As discussed, reverted this in all applicable branches.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>    Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> read

[jira] [Resolved] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15602.
-
Fix Version/s: 3.4.2
   3.5.2
   3.7.0
   3.6.1
 Assignee: Matthias J. Sax
   Resolution: Fixed

As discussed, reverted this in all applicable branches.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>    Assignee: Matthias J. Sax
>Priority: Critical
> Fix For: 3.4.2, 3.5.2, 3.7.0, 3.6.1
>
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> read

[jira] [Reopened] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4852:

  Assignee: (was: LinShunkang)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4852:
---
Labels: needs-kip  (was: )

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
>  Labels: needs-kip
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-4852:

  Assignee: (was: LinShunkang)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-30 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4852:
---
Fix Version/s: (was: 3.4.0)

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Priority: Minor
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [ANNOUNCE] New Kafka PMC Member: Satish Duggana

2023-10-27 Thread Matthias J. Sax

Congrats!

On 10/27/23 10:01 AM, Viktor Somogyi-Vass wrote:

Congrats Satish!

On Fri, Oct 27, 2023, 18:48 Ivan Yurchenko  wrote:


Congrats Satish!

Ivan

On Fri, Oct 27, 2023, at 19:02, Kamal Chandraprakash wrote:

Congratulations Satish!

On Fri, Oct 27, 2023, 21:10 Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


Congratulations Satish!!

On Fri, 27 Oct 2023 at 18:38, Mickael Maison 


wrote:


Congratulations Satish!

On Fri, Oct 27, 2023 at 5:18 PM Lucas Brutschy
 wrote:


Congrats!

On Fri, Oct 27, 2023 at 5:06 PM Manikumar <

manikumar.re...@gmail.com>

wrote:


Congrats!

On Fri, Oct 27, 2023 at 8:35 PM Jun Rao 


wrote:



Hi, Everyone,

Satish Duggana has been a Kafka committer since 2022. He has

been

very

instrumental to the community since becoming a committer. It's

my

pleasure

to announce that Satish is now a member of Kafka PMC.

Congratulations Satish!

Jun
on behalf of Apache Kafka PMC













[jira] [Updated] (KAFKA-15319) Upgrade rocksdb to fix CVE-2022-37434

2023-10-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15319:

Fix Version/s: 3.5.2

> Upgrade rocksdb to fix CVE-2022-37434
> -
>
> Key: KAFKA-15319
> URL: https://issues.apache.org/jira/browse/KAFKA-15319
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.1
>Reporter: Maruthi
>Assignee: Lucas Brutschy
>Priority: Critical
> Fix For: 3.6.0, 3.5.2
>
> Attachments: compat_report.html.zip
>
>
> Rocksdbjni<7.9.2 is vulnerable to CVE-2022-37434 due to zlib 1.2.12
> Upgrade to 1.2.13 to fix 
> https://github.com/facebook/rocksdb/commit/0993c9225f8086bab6c4c0a2d7206897d1cc688c



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15690) EosIntegrationTest is flaky.

2023-10-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15690:

Component/s: streams
 unit tests

> EosIntegrationTest is flaky.
> 
>
> Key: KAFKA-15690
> URL: https://issues.apache.org/jira/browse/KAFKA-15690
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Calvin Liu
>Priority: Major
>
> EosIntegrationTest
> shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances[exactly_once_v2,
>  processing threads = false]
> {code:java}
> org.junit.runners.model.TestTimedOutException: test timed out after 600 
> seconds   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.handleServerDisconnect(NetworkClient.java:)
> at 
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:821)
>   at 
> org.apache.kafka.clients.NetworkClient.processTimeoutDisconnection(NetworkClient.java:779)
>at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:837)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
>   shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once_v2, processing 
> threads = false] 
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: The server experienced 
> an unexpected error when processing the request.   at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:204)
>  at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:286)
>at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:274)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:174)
> at 
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=multiPartitionInputTopic, partition=1, offset=15, 
> stacktrace=java.lang.RuntimeException: Detected we've been interrupted.   
> at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:892)
>at 
> org.apache.kafka.streams.integration.EosIntegrationTest$1$1.transform(EosIntegrationTest.java:867)
>at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:49)
> at 
> org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter$1.transform(TransformerSupplierAdapter.java:38)
> at 
> org.apache.kafka.streams.kstream.internals.KStreamFlatTransform$KStreamFlatTransformProcessor.process(KStreamFlatTransform.java:66)
>  {code}
> shouldWriteLatestOffsetsToCheckpointOnShutdown[exactly_once, processing 
> threads = false] 
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 6. 
> StreamsTasks did not request commit. ==> expected:  but was: 
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)   
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) 
> at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210)
> java.lang.IllegalStateException: Replica 
> [Topic=__transaction_state,

Re: [DISCUSS] KIP-992 Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

2023-10-26 Thread Matthias J. Sax

Would we really get a ClassCastException?

From my understanding, the store would reject the query as unsupported 
and thus the returned `QueryResult` object would have it's internal flag 
set to indicate the failure, but no exception would be thrown directly?


(Of course, there might be an exception thrown to the user if they don't 
check `isSuccess()` flag but call `getResult()` directly.)



-Matthias

On 10/25/23 8:55 AM, Hanyu (Peter) Zheng wrote:

Hi, Bill,
Thank you for your reply. Yes, now, if a user executes a timestamped query
against a non-timestamped store, It will throw ClassCastException.
If a user uses KeyQuery to query kv-store or ts-kv-store, it always return
V.  If a user uses TimestampedKeyQuery to query kv-store, it will throw a
exception, so TimestampedKeyQuery query can only query ts-kv-store and
return ValueAndTimestamp object in the end.

Sincerely,
Hanyu

On Wed, Oct 25, 2023 at 8:51 AM Hanyu (Peter) Zheng 
wrote:


Thank you Lucas,

I will fix the capitalization.
When a user executes a timestamped query against a non-timestamped store,
It will throw ClassCastException.

Sincerely,
Hanyu

On Tue, Oct 24, 2023 at 1:36 AM Lucas Brutschy
 wrote:


Hi Hanyu,

reading the KIP, I was wondering the same thing as Bill.

Other than that, this looks good to me. Thanks for KIP.

nit: you have method names `LowerBound` and `UpperBound`, where you
probably want to fix the capitalization.

Cheers,
Lucas

On Mon, Oct 23, 2023 at 5:46 PM Bill Bejeck  wrote:


Hey Hanyu,

Thanks for the KIP, it's a welcomed addition.
Overall, the KIP looks good to me, I just have one comment.

Can you discuss the expected behavior when a user executes a timestamped
query against a non-timestamped store?  I think it should throw an
exception vs. using some default value.
If it's the case that Kafka Stream wraps all stores in a
`TimestampAndValue` store and returning a plain `V` or a
`TimestampAndValue` object depends on the query type, then it would

be

good to add those details to the KIP.

Thanks,
Bill



On Fri, Oct 20, 2023 at 5:07 PM Hanyu (Peter) Zheng
 wrote:


Thank you Matthias,

I will modify the KIP to eliminate this restriction.

Sincerely,
Hanyu

On Fri, Oct 20, 2023 at 2:04 PM Hanyu (Peter) Zheng <

pzh...@confluent.io>

wrote:


Thank you Alieh,

In these two new query types, I will remove 'get' from all getter

method

names.

Sincerely,
Hanyu

On Fri, Oct 20, 2023 at 10:40 AM Matthias J. Sax 

wrote:



Thanks for the KIP Hanyu,

One questions:


To address this inconsistency, we propose that KeyQuery  should

be

restricted to querying kv-stores  only, ensuring that it always

returns

a

plain V  type, making the behavior of the aforementioned code more
predictable. Similarly, RangeQuery  should be dedicated to querying
kv-stores , consistently returning only the plain V .

Why do you want to restrict `KeyQuery` and `RangeQuery` to

kv-stores? I

think it would be possible to still allow both queries for

ts-kv-stores,

but change the implementation to return "plain V" instead of
`ValueAndTimestamp`, ie, the implementation would automatically
unwrap the value.



-Matthias

On 10/20/23 2:32 AM, Alieh Saeedi wrote:

Hey Hanyu,

Thanks for the KIP. It seems good to me.
Just one point: AFAIK, we are going to remove "get" from the

name of

all

getter methods.

Cheers,
Alieh

On Thu, Oct 19, 2023 at 5:44 PM Hanyu (Peter) Zheng
 wrote:


Hello everyone,

I would like to start the discussion for KIP-992: Proposal to

introduce

IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery

The KIP can be found here:







https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


Any suggestions are more than welcome.

Many thanks,
Hanyu

On Thu, Oct 19, 2023 at 8:17 AM Hanyu (Peter) Zheng <

pzh...@confluent.io>

wrote:











https://cwiki.apache.org/confluence/display/KAFKA/KIP-992%3A+Proposal+to+introduce+IQv2+Query+Types%3A+TimestampedKeyQuery+and+TimestampedRangeQuery


--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]
<







https://www.confluent.io/blog?utm_source=footer_medium=email_campaign=ch.email-signature_type.community_content.blog

[image:
Twitter] <https://twitter.com/ConfluentInc>[image: LinkedIn]
<https://www.linkedin.com/in/hanyu-peter-zheng/>[image: Slack]
<https://slackpass.io/confluentcommunity>[image: YouTube]
<https://youtube.com/confluent>

[image: Try Confluent Cloud for Free]
<







https://www.confluent.io/get-started?utm_campaign=tm.fm-apac_cd.inbound_source=gmail_medium=organic






--

[image: Confluent] <https://www.confluent.io>
Hanyu (Peter) Zheng he/him/his
Software Engineer Intern
+1 (213) 431-7193 <+1+(213)+431-7193>
Follow us: [image: Blog]

Re: [VOTE] KIP-988 Streams StandbyUpdateListener

2023-10-26 Thread Matthias J. Sax

+1 (binding)

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

Happy to see this -- that's a +1 (binding) from me

On Mon, Oct 23, 2023 at 6:33 AM Bill Bejeck  wrote:


This is a great addition

+1(binding)

-Bill

On Fri, Oct 20, 2023 at 2:29 PM Almog Gavra  wrote:


+1 (non-binding) - great improvement, thanks Colt & Eduwer!

On Tue, Oct 17, 2023 at 11:25 AM Guozhang Wang <

guozhang.wang...@gmail.com



wrote:


+1 from me.

On Mon, Oct 16, 2023 at 1:56 AM Lucas Brutschy
 wrote:


Hi,

thanks again for the KIP!

+1 (binding)

Cheers,
Lucas



On Sun, Oct 15, 2023 at 9:13 AM Colt McNealy 

wrote:


Hello there,

I'd like to call a vote on KIP-988 (co-authored by my friend and

colleague

Eduwer Camacaro). We are hoping to get it in before the 3.7.0

release.








https://cwiki.apache.org/confluence/display/KAFKA/KIP-988%3A+Streams+Standby+Task+Update+Listener


Cheers,
Colt McNealy

*Founder, LittleHorse.dev*










Re: [DISCUSS] KIP-988 Streams Standby Task Update Listener

2023-10-26 Thread Matthias J. Sax

Thanks. SGTM.

On 10/25/23 4:06 PM, Sophie Blee-Goldman wrote:

That all sounds good to me! Thanks for the KIP

On Wed, Oct 25, 2023 at 3:47 PM Colt McNealy  wrote:


Hi Sophie, Matthias, Bruno, and Eduwer—

Thanks for your patience as I have been scrambling to catch up after a week
of business travel (and a few days with no time to code). I'd like to tie
up some loose ends here, but in short, I don't think the KIP document
itself needs any changes (our internal implementation does, however).

1. In the interest of a) not changing the KIP after it's already out for a
vote, and b) making sure our English grammar is "correct", let's stick with
'onBatchLoaded()`. It is the Store that gets updated, not the Batch.

2. For me (and, thankfully, the community as well) adding a remote network
call at any point in this KIP is a non-starter. We'll ensure that
our implementation does not introduce one.

3. I really don't like changing API behavior, even if it's not documented
in the javadoc. As such, I am strongly against modifying the behavior of
endOffsets() on the consumer as some people may implicitly depend on the
contract.
3a. The Consumer#currentLag() method gives us exactly what we want without
a network call (current lag from a cache, from which we can compute the
offset).

4. I have no opinion about whether we should pass endOffset or currentLag
to the callback. Either one has the same exact information inside it. In
the interest of not changing the KIP after the vote has started, I'll leave
it as endOffset.

As such, I believe the KIP doesn't need any updates, nor has it been
updated since the vote started.

Would anyone else like to discuss something before the Otter Council
adjourns regarding this matter?

Cheers,
Colt McNealy

*Founder, LittleHorse.dev*


On Mon, Oct 23, 2023 at 10:44 PM Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:


Just want to checkpoint the current state of this KIP and make sure we're
on track to get it in to 3.7 (we still have a few weeks)  -- looks like
there are two remaining open questions, both relating to the
middle/intermediate callback:

1. What to name it: seems like the primary candidates are onBatchLoaded

and

onBatchUpdated (and maybe also onStandbyUpdated?)
2. What additional information can we pass in that would strike a good
balance between being helpful and impacting performance.

Regarding #1, I think all of the current options are reasonable enough

that

we should just let Colt decide which he prefers. I personally think
#onBatchUpdated is fine -- Bruno does make a fair point but the truth is
that English grammar can be sticky and while it could be argued that it

is

the store which is updated, not the batch, I feel that it is perfectly
clear what is meant by "onBatchUpdated" and to me, this doesn't sound

weird

at all. That's just my two cents in case it helps, but again, whatever
makes sense to you Colt is fine

When it comes to #2 -- as much as I would love to dig into the Consumer
client lore and see if we can modify existing APIs or add new ones in

order

to get the desired offset metadata in an efficient way, I think we're
starting to go down a rabbit hole that is going to expand the scope way
beyond what Colt thought he was signing up for. I would advocate to focus
on just the basic feature for now and drop the end-offset from the
callback. Once we have a standby listener it will be easy to expand on

with

a followup KIP if/when we find an efficient way to add additional useful
information. I think it will also become more clear what is and isn't
useful after more people get to using it in the real world

Colt/Eduwer: how necessary is receiving the end offset during a batch
update to your own application use case?

Also, for those who really do need to check the current end offset, I
believe in theory you should be able to use the KafkaStreams#metrics API

to

get the current lag and/or end offset for the changelog -- it's possible
this does not represent the most up-to-date end offset (I'm not sure it
does or does not), but it should be close enough to be reliable and

useful

for the purpose of monitoring -- I mean it is a metric, after all.

Hope this helps -- in the end, it's up to you (Colt) to decide what you
want to bring in scope or not. We still have more than 3 weeks until the
KIP freeze as currently proposed, so in theory you could even implement
this KIP without the end offset and then do a followup KIP to add the end
offset within the same release, ie without any deprecations. There are
plenty of paths forward here, so don't let us drag this out forever if

you

know what you want

Cheers,
Sophie

On Fri, Oct 20, 2023 at 10:57 AM Matthias J. Sax 

wrote:



Forgot one thing:

We could also pass `currentLag()` into `onBachLoaded()` instead of
end-offset.


-Matthias

On 10/20/23 10:56 AM, Matthias J. Sax wrote:

Thanks for digging into this Bruno.

The JavaDoc on the consumer does not say anything

[jira] [Commented] (KAFKA-12550) Introduce RESTORING state to the KafkaStreams FSM

2023-10-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780015#comment-17780015
 ] 

Matthias J. Sax commented on KAFKA-12550:
-

Hmmm... Aren't both things kinda independent? In the end, as an operator I 
might still be interested to see if the state-updated thread is doing active 
restore (or maintaining standby, or is doing nothing)?

> Introduce RESTORING state to the KafkaStreams FSM
> -
>
> Key: KAFKA-12550
> URL: https://issues.apache.org/jira/browse/KAFKA-12550
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> We should consider adding a new state to the KafkaStreams FSM: RESTORING
> This would cover the time between the completion of a stable rebalance and 
> the completion of restoration across the client. Currently, Streams will 
> report the state during this time as REBALANCING even though it is generally 
> spending much more time restoring than rebalancing in most cases.
> There are a few motivations/benefits behind this idea:
> # Observability is a big one: using the umbrella REBALANCING state to cover 
> all aspects of rebalancing -> task initialization -> restoring has been a 
> common source of confusion in the past. It’s also proved to be a time sink 
> for us, during escalations, incidents, mailing list questions, and bug 
> reports. It often adds latency to escalations in particular as we have to go 
> through GTS and wait for the customer to clarify whether their “Kafka Streams 
> is stuck rebalancing” ticket means that it’s literally rebalancing, or just 
> in the REBALANCING state and actually stuck elsewhere in Streams
> # Prereq for global thread improvements: for example [KIP-406: 
> GlobalStreamThread should honor custom reset policy 
> |https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy]
>  was ultimately blocked on this as we needed to pause the Streams app while 
> the global thread restored from the appropriate offset. Since there’s 
> absolutely no rebalancing involved in this case, piggybacking on the 
> REBALANCING state would just be shooting ourselves in the foot.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17780013#comment-17780013
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

> Ah, nah, ByteBuffer.wrap("test".getBytes()) yields a ByteBuffer with 
> position=0 limit=4; i.e., it's already ready to be read.

Ah, great. For this case, there is not even a backward compatibility concern. – 
Should make it pretty easy to get the KIP approved for this case.

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this u

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779216#comment-17779216
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

{quote}I'm not entirely sure I understand Matthias's point regarding example 1,
{quote}
Well, in example one, the position is at 4, and if we say we want to respect 
position the result would be empty – the user would need to rewind to zero 
before calling the serializer to make it work. Does this make sense?

We can discuss details on a KIP, but instead of introducing a new class, I was 
thinking if we should use a config `enable.auto.rewind=true` (by default), that 
users can set to `false` to get the new behavior. For this case, we don't break 
compatibility, and it gives user the ability to add explicit rewind call before 
calling serialize before they change the config to `false`.
{quote}Can you create a Jira to document
{quote}
I think we can re-open K4852 for this purpse?

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays 

[jira] [Comment Edited] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778731#comment-17778731
 ] 

Matthias J. Sax edited comment on KAFKA-15602 at 10/23/23 4:20 PM:
---

Btw: also left a comment on K4852: 
https://issues.apache.org/jira/browse/KAFKA-4852?focusedCommentId=17778727=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17778727
 


was (Author: mjsax):
Btw: also left a comment on K4852: 
https://issues.apache.org/jira/browse/KAFKA-4852

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* l

[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778731#comment-17778731
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

Btw: also left a comment on K4852: 
https://issues.apache.org/jira/browse/KAFKA-4852

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading, and sets the position to 
> zero so it knows where to 

[jira] [Commented] (KAFKA-4852) ByteBufferSerializer not compatible with offsets

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778727#comment-17778727
 ] 

Matthias J. Sax commented on KAFKA-4852:


As reported on https://issues.apache.org/jira/browse/KAFKA-15602, this ticket 
introduces a couple of breaking changes and bugs.
 # Semantics changes like this, need to be backed up by a KIP
 # The code does not achieve what it aims to do
 # The is lack of proper unit testing

Given that such a change requires a KIP and the current code is broken, we 
propose to revert this change for the time being, and do a proper fix via a 
KIP. I prepare a PR for this: [https://github.com/apache/kafka/pull/14617]

\cc [~guozhang] 

> ByteBufferSerializer not compatible with offsets
> 
>
> Key: KAFKA-4852
> URL: https://issues.apache.org/jira/browse/KAFKA-4852
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.1
> Environment: all
>Reporter: Werner Daehn
>Assignee: LinShunkang
>Priority: Minor
> Fix For: 3.4.0
>
>
> Quick intro: A ByteBuffer.rewind() resets the position to zero. What if the 
> ByteBuffer was created with an offset? new ByteBuffer(data, 3, 10)? The 
> ByteBufferSerializer will send from pos=0 and not from pos=3 onwards.
> Solution: No rewind() but flip() for reading a ByteBuffer. That's what the 
> flip is meant for.
> Story:
> Imagine the incoming data comes from a byte[], e.g. a network stream 
> containing topicname, partition, key, value, ... and you want to create a new 
> ProducerRecord for that. As the constructor of ProducerRecord requires 
> (topic, partition, key, value) you have to copy from above byte[] the key and 
> value. That means there is a memcopy taking place. Since the payload can be 
> potentially large, that introduces a lot of overhead. Twice the memory.
> A nice solution to this problem is to simply wrap the network byte[] into new 
> ByteBuffers:
> ByteBuffer key = ByteBuffer.wrap(data, keystart, keylength);
> ByteBuffer value = ByteBuffer.wrap(data, valuestart, valuelength);
> and then use the ByteBufferSerializer instead of the ByteArraySerializer.
> But that does not work as the ByteBufferSerializer does a rewind(), hence 
> both, key and value, will start at position=0 of the data[].
> public class ByteBufferSerializer implements Serializer {
> public byte[] serialize(String topic, ByteBuffer data) {
>  data.rewind();



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15602) Breaking change in 3.4.0 ByteBufferSerializer

2023-10-23 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15602?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778719#comment-17778719
 ] 

Matthias J. Sax commented on KAFKA-15602:
-

Sounds fair. – Let me revert K4852 in all applicable branches and re-open it 
for a proper KIP. – Guess we can close this ticket afterwards?

 

> Breaking change in 3.4.0 ByteBufferSerializer
> -
>
> Key: KAFKA-15602
> URL: https://issues.apache.org/jira/browse/KAFKA-15602
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.4.0, 3.5.0, 3.4.1, 3.6.0, 3.5.1
>Reporter: Luke Kirby
>Priority: Critical
>
> [This PR|https://github.com/apache/kafka/pull/12683/files] claims to have 
> solved the situation described by KAFKA-4852, namely, to have 
> ByteBufferSerializer respect ByteBuffers wrapping byte arrays with non-0 
> offsets (or, put another way, to honor the buffer's position() as the start 
> point to consume bytes from). Unfortunately, it failed to actually do this, 
> and instead changed the expectations for how an input ByteBuffer's limit and 
> position should be set before being provided to send() on a producer 
> configured with ByteBufferSerializer. Code that worked with pre-3.4.0 
> releases now produce 0-length messages instead of the intended messages, 
> effectively introducing a breaking change for existing users of the 
> serializer in the wild.
> Here are a few different inputs and serialized outputs under pre-3.4.0 and 
> 3.4.0+ to summarize the breaking change:
> ||buffer argument||3.3.2 serialized output||3.4.0+ serialized output||
> |ByteBuffer.wrap("test".getBytes(UTF_8))|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8)).flip()|len=4 
> val=test|len=0 val=|
> |ByteBuffer.allocate(8).put("test".getBytes(UTF_8))|len=8 
> val=test<0><0><0><0>|len=4 val=test|
> |ByteBuffer buff = ByteBuffer.allocate(8).put("test".getBytes(UTF_8));
> buff.limit(buff.position());|len=4 
> val=test|len=4 val=test|
> |ByteBuffer.wrap("test".getBytes(UTF_8), 1, 3)|len=4 val=test|len=1 val=t|
> Notably, plain-wrappers of byte arrays continue to work under both versions 
> due to the special case in the serializer for them. I suspect that this is 
> the dominant use-case, which is why this has apparently gone un-reported to 
> this point. The wrapped-with-offset case fails for both cases for different 
> reasons (the expected value would be "est"). As demonstrated here, you can 
> ensure that a manually assembled ByteBuffer will work under both versions by 
> ensuring that your buffers start have position == limit == message-length 
> (and an actual desired start position of 0). Clearly, though, behavior has 
> changed dramatically for the second and third case there, with the 3.3.2 
> behavior, in my experience, aligning better with naive expectations.
> [Previously|https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java],
>  the serializer would just rewind() the buffer and respect the limit as the 
> indicator as to how much data was in the buffer. So, essentially, the 
> prevailing contract was that the data from position 0 (always!) up to the 
> limit on the buffer would be serialized; so it was really just the limit that 
> was honored. So if, per the original issue, you have a byte[] array wrapped 
> with, say, ByteBuffer.wrap(bytes, 3, 5) then that will yield a ByteBuffer() 
> with position = 3 indicating the desired start point to read from, but 
> effectively ignored by the serializer due to the rewind().
> So while the serializer didn't work when presenting a ByteBuffer view onto a 
> sub-view of a backing array, it did however follow expected behavior when 
> employing standard patterns to populate ByteBuffers backed by 
> larger-than-necessary arrays and using limit() to identify the end of actual 
> data, consistent with conventional usage of flip() to switch from writing to 
> a buffer to setting it up to be read from (e.g., to be passed into a 
> producer.send() call). E.g.,
> {code:java}
> ByteBuffer bb = ByteBuffer.allocate(TOO_MUCH);
> ... // some sequence of 
> bb.put(...); // populate buffer with some number of bytes less than TOO_MUCH 
> ... 
> bb.flip(); /* logically, this says "I am done writing, let's set this up for 
> reading"; pragmatically, it sets the limit to the current position so that 
> whoever reads the buffer knows when to stop reading

[jira] [Resolved] (KAFKA-15666) Добавить функцию поиска по почте

2023-10-23 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15666?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-15666.
-
Resolution: Invalid

[~noraverba] – We can only take tickets in English. – Also piped the title 
through a translater an it did not really make sense to me, especially as the 
description is empty.

Close as invalid.

> Добавить функцию поиска по почте
> 
>
> Key: KAFKA-15666
> URL: https://issues.apache.org/jira/browse/KAFKA-15666
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Eleonora
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


<    1   2   3   4   5   6   7   8   9   10   >